In [1]:
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import pandas as pd

default_args = {
    'owner': 'you',
    'start_date': datetime(2023, 10, 2),  # Adjust the date accordingly
}

dag = DAG(
    'ecommerce_etl',
    default_args=default_args,
    description='ETL workflow for E-commerce data',
    schedule_interval='@daily',
)


In [None]:
# Extract
def extract(**kwargs):
    data_path = "path_to_your_downloaded_file.csv"
    df = pd.read_csv(data_path)
    return df

# Transform
def transform(**kwargs):
    ti = kwargs['ti']
    df = ti.xcom_pull(task_ids='extract_task')
    
    # Drop missing values (for simplicity)
    df_cleaned = df.dropna()
    return df_cleaned

# Load (For this example, we'll just save the cleaned data to a new CSV)
def load(**kwargs):
    ti = kwargs['ti']
    df_cleaned = ti.xcom_pull(task_ids='transform_task')
    df_cleaned.to_csv("path_where_you_want_to_save_cleaned_data.csv", index=False)


In [None]:
# Define the extract task
extract_task = PythonOperator(
    task_id='extract_task',
    python_callable=extract,
    provide_context=True,
    dag=dag,
)

# Define the transform task
transform_task = PythonOperator(
    task_id='transform_task',
    python_callable=transform,
    provide_context=True,
    dag=dag,
)

# Define the load task
load_task = PythonOperator(
    task_id='load_task',
    python_callable=load,
    provide_context=True,
    dag=dag,
)

# Set the task order
extract_task >> transform_task >> load_task


In [None]:
# Define the extract task with retries
extract_task = PythonOperator(
    task_id='extract_task',
    python_callable=extract,
    provide_context=True,
    retries=3,  # Number of retries if the task fails
    retry_delay=timedelta(minutes=5),  # Delay between retries
    dag=dag,
)