# airflow 🤦‍♀️

In [None]:
from airflow import DAG
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.operators.python import PythonOperator
from datetime import datetime

# MySQL에서 데이터를 가져오는 함수 정의
def fetch_mysql_data(**kwargs):
    # Airflow의 MySqlHook을 사용해 연결 설정 (Conn Id를 사용)
    mysql_hook = MySqlHook(mysql_conn_id='mysql_default')  # Conn Id는 Airflow UI에서 설정한 값
    
    # SQL 쿼리 실행 (예시: 특정 테이블 데이터를 가져옴)
    sql_query = "SELECT * FROM your_table_name;"  # 실제 테이블 이름으로 변경
    connection = mysql_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(sql_query)
    
    # 결과 가져오기
    result = cursor.fetchall()
    print("Fetched Data:", result)
    
    # 결과를 XCom에 저장하여 다른 태스크와 공유 가능
    kwargs['ti'].xcom_push(key='mysql_data', value=result)

# DAG 정의
with DAG(
    dag_id='mysql_to_python_dag',  # DAG 이름 (Airflow UI에서 표시됨)
    start_date=datetime(2023, 1, 1),  # DAG 시작 날짜
    schedule_interval=None,  # 수동 실행 (스케줄 없음)
    catchup=False,  # 과거 날짜의 실행 방지
) as dag:

    # PythonOperator를 사용해 fetch_mysql_data 함수 실행
    fetch_data_task = PythonOperator(
        task_id='fetch_mysql_data',  # 태스크 이름 (Airflow UI에서 표시됨)
        python_callable=fetch_mysql_data,  # 실행할 함수
        provide_context=True,  # XCom 및 컨텍스트 사용 가능하게 설정
    )

fetch_data_task  # 태스크 등록

# 원재료 👀

In [None]:
import mysql.connector
from mysql.connector import Error
import pandas as pd

def fetch_data_from_mysql():
    try:
        # MySQL 데이터베이스 연결 설정
        connection = mysql.connector.connect(
            host='192.168.0.163',
            user='root',  # MySQL 사용자 이름
            password='andong1234',  # MySQL 비밀번호
            database='analysis'  # 사용할 데이터베이스 이름
        )

        if connection.is_connected():
            print("MySQL에 성공적으로 연결되었습니다.")

            # 커서 생성
            cursor = connection.cursor()

            # 실행할 SQL 쿼리 정의
            sql_query = "SELECT * FROM ods_one;"  # 테이블 이름을 실제로 변경하세요.

            # 쿼리 실행 및 결과 가져오기
            cursor.execute(sql_query)
            records = cursor.fetchall()  # 데이터 가져오기
            columns = [i[0] for i in cursor.description]  # 컬럼 이름 가져오기

            # 결과를 Pandas DataFrame으로 변환
            df = pd.DataFrame(records, columns=columns)

            print(f"총 {cursor.rowcount}개의 행을 가져왔습니다.")
            print(df)  # DataFrame 출력

            return df  # DataFrame 반환

    except Error as e:
        print(f"MySQL에서 데이터를 가져오는 중 오류가 발생했습니다: {e}")
    finally:
        if connection.is_connected():
            cursor.close()
            connection.close()
            print("MySQL 연결이 닫혔습니다.")

# 함수 실행
one_df = fetch_data_from_mysql()


MySQL에 성공적으로 연결되었습니다.
총 977개의 행을 가져왔습니다.
     one_id one_name one_price    one_date
0       647        철    117.05  2023-01-02
1       757      유연탄    151.00  2023-01-02
2       866       규소   2640.00  2023-01-02
3       977       크롬      2.85  2023-01-02
4       536     알루미늄   2337.50  2023-01-03
..      ...      ...       ...         ...
972     758       규소   1545.00  2025-02-10
973     867       크롬      1.53  2025-02-10
974       3     알루미늄   2647.00  2025-02-11
975       2     알루미늄   2628.50  2025-02-12
976       1     알루미늄   2627.00  2025-02-13

[977 rows x 4 columns]
MySQL 연결이 닫혔습니다.


In [13]:
one_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 977 entries, 0 to 976
Data columns (total 4 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   one_id     977 non-null    int64 
 1   one_name   977 non-null    object
 2   one_price  977 non-null    object
 3   one_date   977 non-null    object
dtypes: int64(1), object(3)
memory usage: 30.7+ KB


In [15]:
one_df['one_date'] = pd.to_datetime(one_df['one_date'])

one_df['MoM'] = one_df.groupby('one_name')['one_price'].pct_change() * 100

one_df

Unnamed: 0,one_id,one_name,one_price,one_date,MoM
0,647,철,117.05,2023-01-02,
1,757,유연탄,151.00,2023-01-02,
2,866,규소,2640.00,2023-01-02,
3,977,크롬,2.85,2023-01-02,
4,536,알루미늄,2337.50,2023-01-03,
...,...,...,...,...,...
972,758,규소,1545.00,2025-02-10,0
973,867,크롬,1.53,2025-02-10,2.00
974,3,알루미늄,2647.00,2025-02-11,0.265151515151515151515151500
975,2,알루미늄,2628.50,2025-02-12,-0.6989044200982244049867774800


# 자동차 🚗

In [None]:
import mysql.connector
from mysql.connector import Error
import pandas as pd

def fetch_data_from_mysql():
    try:
        # MySQL 데이터베이스 연결 설정
        connection = mysql.connector.connect(
            host='192.168.0.163',
            user='root',  # MySQL 사용자 이름
            password='andong1234',  # MySQL 비밀번호
            database='analysis'  # 사용할 데이터베이스 이름
        )

        if connection.is_connected():
            print("MySQL에 성공적으로 연결되었습니다.")

            # 커서 생성
            cursor = connection.cursor()

            # 실행할 SQL 쿼리 정의
            sql_query = "SELECT * FROM ods_car;"

            # 쿼리 실행 및 결과 가져오기
            cursor.execute(sql_query)
            records = cursor.fetchall()  # 데이터 가져오기
            columns = [i[0] for i in cursor.description]  # 컬럼 이름 가져오기

            # 결과를 Pandas DataFrame으로 변환
            df = pd.DataFrame(records, columns=columns)

            print(f"총 {cursor.rowcount}개의 행을 가져왔습니다.")
            print(df)  # DataFrame 출력

            return df  # DataFrame 반환

    except Error as e:
        print(f"MySQL에서 데이터를 가져오는 중 오류가 발생했습니다: {e}")
    finally:
        if connection.is_connected():
            cursor.close()
            connection.close()
            print("MySQL 연결이 닫혔습니다.")

# 함수 실행
car_df = fetch_data_from_mysql()
