In [None]:
from airflow import DAG
from airflow.decorators import task
from contextlib import closing
import pendulum

from airflow.hooks.snowflake_hook import SnowflakeHook


# snowflake connect
def get_snowflake_connection(autocommit=False):
    hook = SnowflakeHook(snowflake_conn_id='snowflake')
    conn = hook.get_conn()
    cursor = conn.cursor()
    return cursor

# ETL
@task
def extract_congestion_raw_data():
    conn = get_snowflake_connection()
    with closing(conn.cursor()) as cur:
        try:
            cur.execute("SELECT * FROM raw_data_table")
            data = cur.fetchall()
            return data
        except Exception as error:
            print(f"Failed to extract data", error)

@task
def load_congestion_analytics_table(schema,table,data):
    conn = get_snowflake_connection()
    with closing(conn.cursor()) as cur:
        try:
            cur.execute("BEGIN;")
            cur.execute(f"DROP TABLE IF EXISTS {schema}.{table}")
            cur.execute(f"""CREATE TABLE {schema}.{table} (
                    AREA_NM STRING,
                    AREA_CD STRING,
                    AREA_CONGEST_LVL STRING,
                    AREA_CONGEST_MSG STRING,
                    AREA_PPLTN_MIN INT,
                    AREA_PPLTN_MAX INT,
                    FCST_PPLTN TIMESTAMP);""")
            for d in data:
                sql = f"INSERT INTO {schema}.{table} VALUES ({d[0]}, {d[1]}, {d[2]}, {d[3]}, {d[4]}, {d[5]}, {d[19]});"
                cur.execute(sql)
            cur.execute("COMMIT;")
        except Exception as error:
            print(f"Failed to load data", error)
            cur.execute("ROLLBACK;")

# DAG 설정
dag = DAG(
    dag_id='congestion_analytics_dag',
    start_date=pendulum.yesterday(),
    schedule_interval='@hourly',
)


extract_data = extract_congestion_raw_data()
load_data = load_congestion_analytics_table(schema='스키마이름', table='테이블이름', data=extract_data)


# 실행되는 순서
extract_data >> load_data