In [5]:
# Mysql -> S3 -> Redshift

In [6]:
import sys

In [7]:
print(sys.executable)

/Users/eddy/workspace/ETL_tutorial/.venv/bin/python


In [26]:
from airflow.providers.standard.operators.python import PythonOperator

In [8]:
from airflow import DAG

In [9]:
# Airflow에서 Python 함수를 Task로 실행하기 위한 PythonOperator 클래스 불러오기
from airflow.operators.python import PythonOperator

In [10]:
from airflow.providers.amazon.aws.hooks.s3 import S3Hook


In [11]:
from airflow.providers.postgres.hooks.postgres import PostgresHook


In [12]:
from airflow.operators.python import PythonOperator

In [13]:
from airflow.providers.mysql.hooks.mysql import MySqlHook

In [14]:
from datetime import datetime

In [15]:
import pandas as pd

In [20]:
import os
from dotenv import load_dotenv

In [None]:
file_path = 'redshift_users.csv'

In [22]:
# .env 쓰기
load_dotenv()
aws_iam = os.getenv("IAM")

type(aws_iam)

str

In [None]:
# 운영 DB에서 extract
def extract_msyql(**context):
    mysql = MySqlHook(mysql_conn_ind = 'mysql_default') # 
    df = mysql.get_pandas_df('select * from users;')
    df.to_csv(file_path)
    context['ti'].xcom_push(key='file_path', value = file_path)
    

In [2]:
def transform_data(**context):
    file_path = context['ti'].xcom_pull(key='file_path', task_ids='extract')
    df = pd.read_csv(file_path)
    df["name"] = df["name"].str.upper()
    df = df.fillna({"country":"UNKNOWN"})
    transformed_path = "users_clean.csv"
    df.to_csv(transform_data, index = False)
    context['ti'].xcom_push(key="transformed_path", value = transformed_path)

In [None]:
# load s3
def load_to_s3(**context):
    s3 = S3Hook(aws_conn_id="aws_default")
    bucket_name = 'etl-tutorial-of-eddy' # 실제 object storage bucket 이름
    key = 'users/users_clean.csv' # / 뒤 경로
    s3.load_file(filename=transform_data, bucket_name=bucket_name, key=key, replace=True)
    context['ti'].xcom_push(key='s3_path', value=f's3://{bucket_name}/{key}') 

In [None]:
# s3 -> redshift
def copy_to_redshift(**context):
    s3_path = context['ti'].xcom_pull(key = 's3_path', task_ids='load')
    redshift = PostgresHook(postgres_conn_id='redshift_default')
    conn = redshift.get_conn()
    cur = conn.cursor()
    cur.execute("""
                CREATE TABLE IF NOT EXISTS users(
                    user_id INT,
                    name VARCHAR(50),
                    age INT,
                    country VARCHAR(50)
                );
                
                """)
    copy_sql = f"""
    COPY users
    FROM '{s3_path}'
    IAM_ROLE '{aws_iam}'
    CSV IGNOREHEADER 1;
    """
    cur.execute(copy_sql)
    conn.commit()
    cur.close()

In [25]:
with DAG(
    dag_id="etl_redshift",
    start_date=datetime(2025,1,1),
    schedule= "@daily",
    catchup=False
) as dag:
    extract = PythonOperator(task_id = "extract", python_callable=extract_msyql)
    transform = PythonOperator(task_id = "transform", python_callable=transform_data)
    load = PythonOperator(task_id="load", python_callable=load_to_s3)
    copy = PythonOperator(task_id="copy", python_callable=copy_to_redshift)
    extract >> transform >> load >> copy