In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import sys, os

# Add scripts folder to path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../scripts")))
from weather_etl import fetch_weather_data, transform_data, load_data, get_cities

# Default DAG arguments
default_args = {
    "owner": "moyo",
    "depends_on_past": False,
    "email": ["your_email@example.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# Define DAG
with DAG(
    "weather_etl_dag",
    default_args=default_args,
    description="Weather ETL Pipeline DAG",
    schedule_interval="@daily",   # Run once per day
    start_date=datetime(2025, 9, 18),
    catchup=False,
    tags=["etl", "weather"],
) as dag:

    def extract_task(**kwargs):
        cities = get_cities()
        raw_data = fetch_weather_data(cities)
        kwargs["ti"].xcom_push(key="raw_data", value=raw_data)

    def transform_task(**kwargs):
        raw_data = kwargs["ti"].xcom_pull(key="raw_data", task_ids="extract")
        df = transform_data(raw_data)
        kwargs["ti"].xcom_push(key="transformed_data", value=df.to_dict())

    def load_task(**kwargs):
        df_dict = kwargs["ti"].xcom_pull(key="transformed_data", task_ids="transform")
        import pandas as pd
        df = pd.DataFrame.from_dict(df_dict)
        load_data(df)

    # Airflow tasks
    extract = PythonOperator(task_id="extract", python_callable=extract_task, provide_context=True)
    transform = PythonOperator(task_id="transform", python_callable=transform_task, provide_context=True)
    load = PythonOperator(task_id="load", python_callable=load_task, provide_context=True)

    # Task dependencies (flow)
    extract >> transform >> load