In [None]:



url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) "
                  "Chrome/123.0 Safari/537.36"
}

def getSp500tickers() :
    import pandas as pd
    import requests
    response = requests.get(url, headers=headers)
    response.raise_for_status() 

    # now feed HTML text directly to pandas
    tables = pd.read_html(response.text)
    sp500_df = tables[0]
    symbols = sp500_df['Symbol'].str.replace('.', '-', regex=False)
    return symbols.tolist()[:5] 




In [None]:

def get_dataFromYf(ti):
    import yfinance as yf
    symbols = ti.xcom_pull(task_ids='extract_task_first') 
    results = {}
    for symbol in symbols:  
        try :
            data = yf.download(
                tickers=symbol,        # one ticker or list of tickers
                start='2025-10-10',    # start date (string or datetime)
                end='2025-10-11',      # end date
                interval='1d',         # data frequency: 1m, 2m, 5m, 1h, 1d, 1wk, 1mo, etc.
                auto_adjust=True,      # adjust for splits/dividends
                progress=False,        # show download progress
                ignore_tz= True                  
                )
            if not data.empty:
                results[symbol] = data
            else :
                results[symbol] = {}
        except Exception as e:
            results[symbol] = {"error": str(e)}
        # finally :
        #     results[symbol] = data.reset_index().to_dict(orient='list') # did this as gpt told that data itself is just a python dict which will give serialization error in Airflow
    return results        
# symbols = getSp500tickers()
# print(get_dataFromYf(symbols[:5],'2025-10-10','2025-10-11','1d'))   


In [None]:
def transform(ti):
    import pandas as pd, os
    from datetime import datetime

    map_data = ti.xcom_pull(task_ids='extract_task_second')
    results = []

    for key, val in map_data.items():
        val = pd.DataFrame(val)
        val["Symbol"] = key
        val["close_change"] = val['Close'].diff().fillna(0)
        val["close_pct_change"] = val['Close'].pct_change().fillna(0) * 100
        selected = val[['Symbol', 'Date', 'Open', 'High', 'Low', 'Close', 'Volume',
                        'close_change', 'close_pct_change']]
        results.append(selected)
    final_df = pd.concat(results, ignore_index=True)
    out_csv = os.path.join("/tmp", f"sp500_transformed_{datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')}.csv")
    final_df.to_csv(out_csv, index=False)
    print(f"Saved transformed data to {out_csv}")
    return {
        "csv_path": out_csv,
        "data": final_df.to_dict(orient="records")  # serializable object
    }

In [None]:
from dotenv import load_dotenv
load_dotenv()
import os

aws_key = os.getenv("AWS_ACCESS_KEY_ID")
snowflake_user = os.getenv("SNOWFLAKE_USER")

from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook

def load_to_s3(ti):
    file = ti.xcom_pull(task_ids='transform_task')
    csv_path = file.get('csv_path') if file else None
    if not csv_path or not os.path.exists(csv_path):
        raise FileNotFoundError(f"CSV not found at {csv_path}")

    from datetime import datetime
    import os

    # reuse the timestamped filename written by transform()
    basename = os.path.basename(csv_path)  # e.g. sp500_transformed_20251019T120000Z.csv
    s3_key = f"airflow_dag_project/sp500_data/{basename}"
    s3_hook = S3Hook(aws_conn_id='aws_default')
    s3_hook.load_file(
        filename=csv_path,
        key=s3_key,
        bucket_name='hassan-snowflake-class-project',
        replace=False  
    )
    return {
        "s3_path": f"s3://hassan-snowflake-class-project/{s3_key}",
        "csv_path": csv_path
    }



def load_to_snowflake(ti):
    # pull csv_path from transform_task XCom (same file written in container)
    file = ti.xcom_pull(task_ids='transform_task')
    csv_path = file.get('csv_path') if file else None
    if not csv_path or not os.path.exists(csv_path):
        raise FileNotFoundError(f"CSV not found at {csv_path}")

    snowflake_hook = SnowflakeHook(snowflake_conn_id='snowflake_default')
    conn = snowflake_hook.get_conn()
    cursor = conn.cursor()

    # target identifiers from env (set in your Docker container or Airflow env)
    db = "AIRFLOW_ETL_DATABASE"
    schema = "PUBLIC"
    table = "SP500_TABLE"
    if not db or not schema:
        raise RuntimeError("AIRFLOW_ETL_DATABASE or SNOWFLAKE_SCHEMA not set in env")

    target_table_fqn = f"{db}.{schema}.{table}"

    try:
        # create or use a temporary stage (this will be session-scoped)
        cursor.execute("""
            CREATE OR REPLACE TEMPORARY STAGE airflow_stage
            FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1);
        """)

        # PUT the local file into the temporary stage (requires connector support for PUT)
        # Using @airflow_stage uploads file to that temporary stage
        put_cmd = f"PUT file://{csv_path} @airflow_stage AUTO_COMPRESS=FALSE;"
        cursor.execute(put_cmd)

        # COPY INTO the target table from the temporary stage
        copy_cmd = f"""
            COPY INTO {target_table_fqn}
            FROM @airflow_stage
            FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1)
            ON_ERROR = 'CONTINUE';
        """
        cursor.execute(copy_cmd)

        conn.commit()
        print("✅ Data uploaded to Snowflake (PUT -> COPY successful).")

    except Exception as e:
        print(f"❌ Snowflake load failed: {e}")
        raise
    finally:
        cursor.close()
        conn.close()

In [None]:
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='SP500_Symbol_data_DAG',
    description='ETL for S&P 500 data to transform and prepare for load',
    start_date=datetime(2025, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    default_args={'retries': 1, 'retry_delay': timedelta(minutes=5)},
    tags=['sp500', 'etl', 'finance'],
) as dag:

    extract_task_first = PythonOperator(
        task_id='extract_task_first',
        python_callable=getSp500tickers
    )

    extract_task_second = PythonOperator(
        task_id='extract_task_second',
        python_callable=get_dataFromYf
    )

    transform_task = PythonOperator(
        task_id='transform_task',
        python_callable=transform
    )
    load_task_s3 = PythonOperator(
        task_id='load_task_s3',
        python_callable=load_to_s3
    )
    load_task_Snowflake = PythonOperator(
        task_id='load_task_Snowflake',
        python_callable=load_to_snowflake
    )

    extract_task_first >> extract_task_second >> transform_task >> [load_task_s3, load_task_Snowflake]



# 🧠 ETL Debug History — S&P 500 DAG to Local Script

## 📅 Context
This document captures all major issues, bugs, and debugging steps we encountered while building and stabilizing the **S&P 500 Data ETL Pipeline** (Airflow → S3 → Snowflake). Each error is documented with cause, diagnosis, and resolution.

---

## 🧩 1. XCom Payload Overflow

**Error:**
```python
airflow.exceptions.XComSizeExceeded: XCom payload too large (>48KB)

Cause: We were passing the entire Yahoo Finance DataFrame object between tasks via XCom. Airflow's metadata DB has a 48 KB limit for XCom payloads.

Fix: Instead of pushing full dataframes, we saved the CSV to /tmp and pushed only the file path through XCom

output_csv = "/tmp/sp500_yf_data/sp500_combined.csv"
ti.xcom_push(key="data_path", value=output_csv) 

Date,Close,High,Low,Open,Volume,Symbol,Close,High,Low,Open,Volume,...
,MMM,MMM,MMM,MMM,MMM,,AOS,AOS,...

Cause: You concatenated multiple tickers horizontally instead of vertically.

Fix:all_data.append(data)
combined_df = pd.concat(all_data, ignore_index=True) 

Now each symbol adds rows, not extra columns.

📉 4. Snowflake Load Error: Date 'MMM' is not recognized
Error:

text
Status: Failed
Error: Date 'MMM' is not recognized
Column: "SP500_TABLE"["DATE":2]
Diagnosis: The first CSV line had column headers followed by "MMM" under the Date column — meaning the column order in CSV didn't match the Snowflake table definition.

Cause: Snowflake assumed your second column (Symbol) was actually the Date field.

Fix: We fixed both CSV generation and table definition alignment. 

Old schema:

sql
CREATE TABLE SP500_TABLE (
    SYMBOL VARCHAR,
    DATE DATE,
    OPEN FLOAT,
    HIGH FLOAT,
    LOW FLOAT,
    CLOSE FLOAT,
    VOLUME FLOAT,
    CLOSE_CHANGE FLOAT,
    CLOSE_PCT_CHANGE FLOAT
);
Problem: The CSV had the order: Date, Symbol, Open, High, Low, Close, Volume, close_change, close_pct_change

Fix:

sql
CREATE OR REPLACE TABLE AIRFLOW_ETL_DATABASE.PUBLIC.SP500_TABLE (
    DATE DATE,
    SYMBOL VARCHAR(16777216),
    OPEN FLOAT,
    HIGH FLOAT,
    LOW FLOAT,
    CLOSE FLOAT,
    VOLUME FLOAT,
    CLOSE_CHANGE FLOAT,
    CLOSE_PCT_CHANGE FLOAT
);
✅ Result: Snowflake started parsing rows correctly without misinterpreting 'MMM' as a date.

📦 6. Stage Confusion in Snowflake
Issue: You didn't see the stage (airflow_stage) in Snowflake UI.

Cause: We used a temporary stage:

sql
CREATE OR REPLACE TEMPORARY STAGE airflow_stage
FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1);
Temporary stages are session-scoped and disappear once the connection closes — they won't appear in the UI.

Fix: For a persistent stage:

sql
CREATE OR REPLACE STAGE airflow_stage
FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1);
🧮 7. Field Delimiter Missing
Error: Snowflake read the entire line as one string column.

Cause: The COPY INTO command lacked explicit FIELD_DELIMITER.

Fix:

sql
FILE_FORMAT = (
  TYPE = 'CSV'
  FIELD_DELIMITER = ','
  FIELD_OPTIONALLY_ENCLOSED_BY='"'
  SKIP_HEADER=1
);
🕒 8. Output File Overwrites
Issue: Each DAG run overwrote the same transformed file.

Fix:

python
output_file = f"/tmp/sp500_yf_data/sp500_transformed_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"

In [None]:
import pandas as pd
import requests
import os
import yfinance as yf
from datetime import datetime
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook


# -------------------------------------------------------------------
# CONFIG
# -------------------------------------------------------------------
url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
headers = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/123.0 Safari/537.36"
    )
}


# -------------------------------------------------------------------
# TASK 1 — Get S&P 500 Symbols
# -------------------------------------------------------------------
def getSp500tickers():
    print("🔹 Fetching S&P 500 tickers...")
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    tables = pd.read_html(response.text)
    sp500_df = tables[0]
    symbols = sp500_df["Symbol"].str.replace(".", "-", regex=False)
    print(f"✅ Got {len(symbols)} tickers.")
    return symbols.tolist()[:5]  # for local testing


# -------------------------------------------------------------------
# TASK 2 — Extract Data via yfinance
# -------------------------------------------------------------------
def get_dataFromYf(symbols):
    print("🔹 Downloading Yahoo Finance data...")
    output_dir = "sp500_yf_data"
    os.makedirs(output_dir, exist_ok=True)

    all_data = []
    for symbol in symbols:
        try:
            data = yf.download(
                tickers=symbol,
                start="2025-10-10",
                end="2025-10-11",
                interval="1d",
                auto_adjust=True,
                progress=False,
                ignore_tz=True,
            )
            if isinstance(data.columns, pd.MultiIndex):
                data.columns = [col[0] if isinstance(col, tuple) else col for col in data.columns]

            data.reset_index(inplace=True)
            data["Symbol"] = symbol
            data = data[["Date", "Symbol", "Open", "High", "Low", "Close", "Volume"]]
            all_data.append(data)
            print(f"✅ {symbol} done")
        except Exception as e:
            print(f"❌ Error fetching {symbol}: {e}")

    if not all_data:
        raise RuntimeError("No data downloaded from yfinance.")
    
    combined_df = pd.concat(all_data, ignore_index=True)
    output_csv = os.path.join(output_dir, "sp500_combined.csv")
    combined_df.to_csv(output_csv, index=False)
    print(f"✅ Combined CSV saved at: {output_csv}")
    return output_csv


# -------------------------------------------------------------------
# TASK 3 — Transform
# -------------------------------------------------------------------
def transform(csv_path):
    print(f"🔹 Transforming data: {csv_path}")
    df = pd.read_csv(csv_path)

    for col in ["Open", "High", "Low", "Close", "Volume"]:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    df["close_change"] = df.groupby("Symbol")["Close"].diff()
    df["close_pct_change"] = df.groupby("Symbol")["Close"].pct_change() * 100

    output_file = f"sp500_yf_data/sp500_transformed_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    df.to_csv(output_file, index=False)
    print(f"✅ Transformation complete → {output_file}")
    return output_file


# -------------------------------------------------------------------
# TASK 4 — Load to S3
# -------------------------------------------------------------------
def load_to_s3(csv_path):
    print(f"🔹 Uploading to S3: {csv_path}")
    s3_hook = S3Hook(aws_conn_id="aws_default")
    s3_key = f"airflow_dag_project/sp500_data/{os.path.basename(csv_path)}"
    s3_hook.load_file(
        filename=csv_path,
        key=s3_key,
        bucket_name="hassan-snowflake-class-project",
        replace=True,
    )
    print(f"✅ Uploaded to s3://hassan-snowflake-class-project/{s3_key}")
    return s3_key


# -------------------------------------------------------------------
# TASK 5 — Load to Snowflake
# -------------------------------------------------------------------
def load_to_snowflake(csv_path):
    print(f"🔹 Loading {csv_path} into Snowflake...")

    snowflake_hook = SnowflakeHook(snowflake_conn_id="snowflake_default")
    conn = snowflake_hook.get_conn()
    cursor = conn.cursor()

    db = "AIRFLOW_ETL_DATABASE"
    schema = "PUBLIC"
    table = "SP500_TABLE"
    target_table_fqn = f"{db}.{schema}.{table}"

    try:
        cursor.execute("""
            CREATE OR REPLACE TEMPORARY STAGE airflow_stage
            FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1);
        """)
        cursor.execute(f"PUT file://{csv_path} @airflow_stage AUTO_COMPRESS=FALSE;")
        cursor.execute(f"""
            COPY INTO {target_table_fqn}
            FROM @airflow_stage
            FILE_FORMAT = (TYPE = 'CSV' 
                FIELD_DELIMITER = ',' 
                FIELD_OPTIONALLY_ENCLOSED_BY='"' 
                SKIP_HEADER=1)
            ON_ERROR = 'CONTINUE';
        """)
        conn.commit()
        print("✅ Data uploaded to Snowflake successfully.")
    except Exception as e:
        print(f"❌ Snowflake load failed: {e}")
    finally:
        cursor.close()
        conn.close()


# -------------------------------------------------------------------
# MAIN EXECUTION FLOW
# -------------------------------------------------------------------
if __name__ == "__main__":
    symbols = getSp500tickers()
    csv_path = get_dataFromYf(symbols)
    transformed_csv = transform(csv_path)
    load_to_s3(transformed_csv)
    load_to_snowflake(transformed_csv)
    print("\n🚀 ETL Process Complete.")



🚀 Starting Local ETL Pipeline (No Airflow)...

🔍 Fetching S&P 500 tickers from Wikipedia...


  tables = pd.read_html(response.text)


✅ Retrieved 5 tickers: ['MMM', 'AOS', 'ABT', 'ABBV', 'ACN']
⬇️ Downloading data from Yahoo Finance...
✅ Clean combined CSV saved at: ./tmp/sp500_yf_data\sp500_combined.csv
⚙️ Transforming data...
✅ Transformation complete → ./tmp/sp500_yf_data/sp500_transformed.csv
📤 (Mock) Uploading ./tmp/sp500_yf_data/sp500_transformed.csv to S3...
✅ Mock upload complete: s3://hassan-snowflake-class-project/airflow_dag_project/sp500_data/sp500_transformed.csv
📦 (Mock) Loading ./tmp/sp500_yf_data/sp500_transformed.csv to Snowflake table...
✅ Mock load to Snowflake complete.

✅ Local ETL Pipeline Completed Successfully.



In [None]:
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from datetime import datetime, timedelta

import pandas as pd
import requests
import os
import yfinance as yf



# -------------------------------------------------------------------
# CONFIG
# -------------------------------------------------------------------
url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
headers = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/123.0 Safari/537.36"
    )
}

# -------------------------------------------------------------------
# TASK 1 — Get S&P 500 Symbols
# -------------------------------------------------------------------
def getSp500tickers():
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    tables = pd.read_html(response.text)
    sp500_df = tables[0]
    symbols = sp500_df["Symbol"].str.replace(".", "-", regex=False)
    return symbols.tolist()  # limit for testing


# -------------------------------------------------------------------
# TASK 2 — Extract Data via yfinance
# -------------------------------------------------------------------
def get_dataFromYf(ti):
    import os
    import pandas as pd
    import yfinance as yf

    symbols = ti.xcom_pull(task_ids="extract_task_first")
    if not symbols:
        raise ValueError("❌ No symbols received from previous task")

    output_dir = "/tmp/sp500_yf_data"
    os.makedirs(output_dir, exist_ok=True)

    all_data = []

    for symbol in symbols:
        try:
            data = yf.download(
                tickers=symbol,
                start="2025-10-10",
                end="2025-10-11",
                interval="1d",
                auto_adjust=True,
                progress=False,
                ignore_tz=True,
            )
            if isinstance(data.columns, pd.MultiIndex):
                data.columns = [col[0] if isinstance(col, tuple) else col for col in data.columns]

            data.reset_index(inplace=True)
            data["Symbol"] = symbol

            # ✅ Force correct order and only keep the essentials
            data = data[["Date", "Symbol", "Open", "High", "Low", "Close", "Volume"]]
            all_data.append(data)


        except Exception as e:
            print(f"❌ Error fetching {symbol}: {e}")

    if not all_data:
        raise RuntimeError("No data downloaded from yfinance.")
    combined_df = pd.concat(all_data, ignore_index=True)
    output_csv = os.path.join(output_dir, "sp500_combined.csv")
    combined_df.to_csv(output_csv, index=False)
    print(f"✅ Combined CSV saved at: {output_csv}")
    return output_csv

# -------------------------------------------------------------------
# TASK 3 — Transform
# -------------------------------------------------------------------


def transform(ti):
    import os
    import pandas as pd

    # Pull single CSV path from previous task
    csv_path = ti.xcom_pull(task_ids="extract_task_second")
    if not csv_path or not os.path.exists(csv_path):
        raise ValueError(f"❌ Invalid path received from extractor: {csv_path}")

    print(f"✅ Reading combined CSV: {csv_path}")

    df = pd.read_csv(csv_path)

    # Convert columns to numeric safely
    for col in ["Open", "High", "Low", "Close", "Volume"]:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    # Compute per-symbol close changes
    df["close_change"] = df.groupby("Symbol")["Close"].diff()
    df["close_pct_change"] = df.groupby("Symbol")["Close"].pct_change() * 100

    # Save transformed CSV
    output_file = f"/tmp/sp500_yf_data/sp500_transformed_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
    df.to_csv(output_file, index=False)

    print(f"✅ Transformation complete → {output_file}")
    return output_file



# -------------------------------------------------------------------
# TASK 4 — Load to S3
# -------------------------------------------------------------------
def load_to_s3(ti):
    csv_path = ti.xcom_pull(task_ids="transform_task")
    if not csv_path or not os.path.exists(csv_path):
        raise FileNotFoundError(f"CSV not found at {csv_path}")

    basename = os.path.basename(csv_path)
    s3_key = f"airflow_dag_project/sp500_data/{basename}"

    s3_hook = S3Hook(aws_conn_id="aws_default")
    s3_hook.load_file(
        filename=csv_path,
        key=s3_key,
        bucket_name="hassan-snowflake-class-project",
        replace=True,
    )

    return {"s3_path": f"s3://hassan-snowflake-class-project/{s3_key}", "csv_path": csv_path}


# -------------------------------------------------------------------
# TASK 5 — Load to Snowflake (from local)
# -------------------------------------------------------------------
def load_to_snowflake(ti):
    csv_path = ti.xcom_pull(task_ids="transform_task")
    if not csv_path or not os.path.exists(csv_path):
        raise FileNotFoundError(f"CSV not found at {csv_path}")

    snowflake_hook = SnowflakeHook(snowflake_conn_id="snowflake_default")
    conn = snowflake_hook.get_conn()
    cursor = conn.cursor()

    db = "AIRFLOW_ETL_DATABASE"
    schema = "PUBLIC"
    table = "SP500_TABLE"
    target_table_fqn = f"{db}.{schema}.{table}"

    try:
        cursor.execute("""
            CREATE OR REPLACE TEMPORARY STAGE airflow_stage
            FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"' SKIP_HEADER=1);
        """)
        cursor.execute(f"PUT file://{csv_path} @airflow_stage AUTO_COMPRESS=FALSE;")
        cursor.execute(f"""
            COPY INTO {target_table_fqn}
            FROM @airflow_stage
            FILE_FORMAT = (TYPE = 'CSV' 
            FIELD_DELIMITER = ','
            FIELD_OPTIONALLY_ENCLOSED_BY='"' 
            SKIP_HEADER=1)
            ON_ERROR = 'CONTINUE';
        """)
        conn.commit()
        print("✅ Data uploaded to Snowflake successfully.")
    except Exception as e:
        print(f"❌ Snowflake load failed: {e}")
        raise
    finally:
        cursor.close()
        conn.close()


# -------------------------------------------------------------------
# DAG DEFINITION
# -------------------------------------------------------------------
with DAG(
    dag_id="SP500_Symbol_data_DAG",
    description="ETL for S&P 500 data to S3 + Snowflake",
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
    catchup=False,
    default_args={"retries": 1, "retry_delay": timedelta(minutes=5)},
    tags=["sp500", "etl", "finance"],
) as dag:

    extract_task_first = PythonOperator(
        task_id="extract_task_first",
        python_callable=getSp500tickers,
    )

    extract_task_second = PythonOperator(
        task_id="extract_task_second",
        python_callable=get_dataFromYf,
    )

    transform_task = PythonOperator(
        task_id="transform_task",
        python_callable=transform,
    )

    load_task_s3 = PythonOperator(
        task_id="load_task_s3",
        python_callable=load_to_s3,
    )

    load_task_snowflake = PythonOperator(
        task_id="load_task_snowflake",
        python_callable=load_to_snowflake,
    )

    extract_task_first >> extract_task_second >> transform_task >> [load_task_s3, load_task_snowflake]
