## 트랜잭션이란? 동시에 Atomic하게 실행되어야 하는 SQL들을 묶어서 하나의 작업처럼 처리하는 방법

In [50]:
import psycopg2

# Redshift connection 함수
# - user와 password를 본인 것으로 수정!
def get_Redshift_connection(autocommit):
    host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
    user = "wnsfuf0121"
    password = "*******"
    port = 5439
    dbname = "dev"
    conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}")
    conn.set_session(autocommit=autocommit)
    return conn

# INSERT SQL을 autocommit=False로 실행

In [None]:
conn = get_Redshift_connection(False)
cur = conn.cursor()

In [None]:
cur.execute("SELECT * FROM wnsfuf0121.name_gender LIMIT 10;")
res = cur.fetchall()
for r in res:
  print(r)

('Adaleigh', 'F')
('Amryn', 'Unisex')
('Apurva', 'Unisex')
('Aryion', 'M')
('Alixia', 'F')
('Alyssarose', 'F')
('Arvell', 'M')
('Aibel', 'M')
('Atiyyah', 'F')
('Adlie', 'F')


In [None]:
cur.execute("DELETE FROM wnsfuf0121.name_gender;")

In [None]:
cur.execute("INSERT INTO wnsfuf0121.name_gender VALUES ('Keeyong', 'Male');")

In [None]:
 # Autocommit이 False이기 때문에 DELETE 된 내용이 반영되지 않음
 # 여기에서만 반영된 것으로 보이는 것임
cur.execute("SELECT * FROM wnsfuf0121.name_gender LIMIT 10;")
res = cur.fetchall()
for r in res:
  print(r)

('Keeyong', 'Male')


In [None]:
 # conn.commit()와 동일. cur.execute("ROLLBACK;")와 conn.rollback()도 동일
cur.execute("COMMIT;")  

In [None]:
conn.close()

# INSERT SQL을 autocommit=False로 실행하고 try/except로 컨트롤하기

In [None]:
conn = get_Redshift_connection(False)
cur = conn.cursor()

In [None]:
# DB 반영 전 에러가 나타난다면 해당 내용을 모두 Rollback한다

try:
  cur.execute("DELETE FROM wnsfuf0121.name_gender;") 
  cur.execute("INSERT INTO wnsfuf0121.name_gender VALUES ('Claire', 'Female');")
  conn.commit()  # cur.execute("COMMIT;")와 동일
except (Exception, psycopg2.DatabaseError) as error:
  print(error)
  conn.rollback()  # cur.execute("ROLLBACK;")와 동일
finally :
  conn.close()

# INSERT SQL을 autocommit=True로 실행하고 try/except로 컨트롤하기

In [None]:
conn = get_Redshift_connection(True)
cur = conn.cursor()

In [None]:
cur.execute("DELETE FROM wnsfuf0121.name_gender;") 

In [None]:
try:
  cur.execute("BEGIN;") # END 전에 내용들은 반영되지 않도록 한다
  cur.execute("DELETE FROM wnsfuf0121.name_gender;") 
  cur.execute("INSERT INTO wnsfuf0121.name_gender VALUES ('Claire', 'Female');")
  cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
  print(error)
  cur.execute("ROLLBACK;")
finally :
  conn.close()

# INSERT SQL을 autocommit=True로 실행하고 SQL로 컨트롤하기

In [None]:
conn = get_Redshift_connection(True)
cur = conn.cursor()

In [None]:
cur.execute("SELECT * FROM wnsfuf0121.name_gender;")
res = cur.fetchall()
for r in res:
  print(r)

('Claire', 'Female')


In [None]:
cur.execute("BEGIN;")
cur.execute("DELETE FROM wnsfuf0121.name_gender;")
cur.execute("INSERT INTO wnsfuf0121.name_gender VALUES ('Benjamin', 'Male');")

In [None]:
cur.execute("END;")

In [None]:
cur.execute("SELECT * FROM wnsfuf0121.name_gender;")
res = cur.fetchall()
for r in res:
  print(r)

('Benjamin', 'Male')


# 잘못된 SQL을 중간에 실행해보기

In [None]:
cur.execute("BEGIN;")
cur.execute("DELETE FROM wnsfuf0121.name_gender;")
cur.execute("INSERT INTO wnsfuf0121.name_gender3 VALUES ('Andrew', 'Male');")
cur.execute("END;")

UndefinedTable: ignored

In [None]:
cur.execute("SELECT * FROM wnsfuf0121.name_gender;")    # 트랜잭션이 이상하게 끝나서 에러가 나타난다
res = cur.fetchall()
for r in res:
  print(r)

InFailedSqlTransaction: ignored

In [None]:
cur.execute("ROLLBACK;")
cur.execute("SELECT * FROM wnsfuf0121.name_gender;")
res = cur.fetchall()
for r in res:
  print(r)

('Benjamin', 'Male')


# Open Weather forecast DAG 작성하기

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook

from datetime import datetime
from datetime import timedelta
# from plugins import slack

import requests
import logging
import psycopg2
import requests

def get_Redshift_connection(autocommit=False):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

def extract(**context):
    api_key = context["params"]["api_key"]
    lat = 37.5683
    lon = 126.9778
    part= 'minutely,hourly'
    link = f'http://api.openweathermap.org/data/3.0/onecall?lat={lat}&lon={lon}&exclude={part}&appid={api_key}'

    f = requests.get(link)
    f_js = f.json()
    return f_js

def transform(**context):
    weather_js = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    result = []
    for daily_info in weather_js['daily']:
        temp = []
        temp.append(datetime.fromtimestamp(daily_info['dt']).strftime('%Y-%m-%d'))
        temp.append(daily_info['temp']['day'])
        temp.append(daily_info['temp']['min'])
        temp.append(daily_info['temp']['max'])
        result.append(temp)
    return result

def full_refresh_load(**context):
    schema = context["params"]["schema"]
    table = context["params"]["table"]

    result =  context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
    cur = get_Redshift_connection()

    try:
        sql = f'''
        BEGIN;
        DROP TABLE IF EXISTS {schema}.{table};
        CREATE TABLE {schema}.{table} (
            date date primary key,
            temp float,
            min_temp float,
            max_temp float,
            created_date timestamp default GETDATE()
        );
        '''
        for daily_info in result:
            sql += f"INSERT INTO {schema}.{table} VALUES ('{daily_info[0]}', {daily_info[1]}, {daily_info[2]}, {daily_info[3]});"
        sql += 'END;'
        cur.execute(sql)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")

weather_forecast_dag = DAG(
    dag_id = 'weather_forecast_dag_full_refresh',
    start_date = datetime(2022,10,14), # 날짜가 미래인 경우 실행이 안됨
    schedule_interval = '0 2 * * *',  # 적당히 조절
    catchup = False,
    max_active_runs = 1,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'api_key':  Variable.get("weather_api")
    },
    dag = weather_forecast_dag)

transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    dag = weather_forecast_dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = full_refresh_load,
    params = {
        'schema': 'wnsfuf0121',
        'table': 'weather_forecast'
    },
    dag = weather_forecast_dag)

extract >> transform >> load

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.hooks.postgres_hook import PostgresHook

from datetime import datetime
from datetime import timedelta
# from plugins import slack

import requests
import logging
import psycopg2
import requests

def get_Redshift_connection(autocommit=False):
    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    conn = hook.get_conn()
    conn.autocommit = autocommit
    return conn.cursor()

def extract(**context):
    api_key = context["params"]["api_key"]
    lat = 37.5683
    lon = 126.9778
    part= 'minutely,hourly'
    link = f'http://api.openweathermap.org/data/3.0/onecall?lat={lat}&lon={lon}&exclude={part}&appid={api_key}'

    f = requests.get(link)
    f_js = f.json()
    return f_js

def transform(**context):
    weather_js = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
    result = []
    for daily_info in weather_js['daily']:
        temp = []
        temp.append(datetime.fromtimestamp(daily_info['dt']).strftime('%Y-%m-%d'))
        temp.append(daily_info['temp']['day'])
        temp.append(daily_info['temp']['min'])
        temp.append(daily_info['temp']['max'])
        result.append(temp)
    return result

def incremental_update_load(**context):
    schema = context["params"]["schema"]
    table = context["params"]["table"]

    result =  context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
    cur = get_Redshift_connection()

    try:
        sql = f'''
        BEGIN;
        CREATE TABLE IF NOT EXISTS {schema}.{table} (
            date date primary key,
            temp float,
            min_temp float,
            max_temp float,
            created_date timestamp default GETDATE()
        );
        
        CREATE TABLE {schema}.temp_table AS SELECT * FROM {schema}.{table};
        DELETE FROM {schema}.{table};
        '''
        now = datetime.now()
        for daily_info in result:
            sql += f"INSERT INTO {schema}.temp_table VALUES ('{daily_info[0]}', {daily_info[1]}, {daily_info[2]}, {daily_info[3]}, '{now}');"

        sql += f"""
        INSERT INTO {schema}.{table}
        SELECT date, temp, min_temp, max_temp, created_date
        FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
        FROM {schema}.temp_table
        )
        WHERE seq = 1;
        DROP TABLE IF EXISTS {schema}.temp_table;
        END;
        """
        cur.execute(sql)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        cur.execute("ROLLBACK;")

weather_forecast_dag = DAG(
    dag_id = 'weather_forecast_dag_incremental_update',
    start_date = datetime(2022,10,14), # 날짜가 미래인 경우 실행이 안됨
    schedule_interval = '0 2 * * *',  # 적당히 조절
    catchup = False,
    max_active_runs = 1,
    default_args = {
        'retries': 1,
        'retry_delay': timedelta(minutes=3),
    }
)

extract = PythonOperator(
    task_id = 'extract',
    python_callable = extract,
    params = {
        'api_key':  Variable.get("weather_api")
    },
    dag = weather_forecast_dag)

transform = PythonOperator(
    task_id = 'transform',
    python_callable = transform,
    dag = weather_forecast_dag)

load = PythonOperator(
    task_id = 'load',
    python_callable = incremental_update_load,
    params = {
        'schema': 'wnsfuf0121',
        'table': 'weather_forecast'
    },
    dag = weather_forecast_dag)

extract >> transform >> load