<a href="https://colab.research.google.com/github/falawar7/AAI_634O/blob/main/Week3/FE_Practical_Exercise__Automating_an_ETL_Workflow_Using_Apache_Airflow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Practical Exercise: Automating an ETL Workflow Using Apache Airflow**


**Step-by-Step Instructions**

**Step 1: Set Up Apache Airflow**


*   Follow the Apache Airflow Installation Guide to install Airflow.
*   Once installed, start the Airflow scheduler and web server:
    *   airflow scheduler airflow webserver- port 8080.
    * Access the Airflow UI at http://localhost:8080.



**Step 2: Set Up MongoDB**

 • Install MongoDB by following the MongoDB installation guide.

 • Start the MongoDB service locally or use MongoDB Atlas, a cloud-based service

**Step 3: Create the DAG File**
 • In the dags/ directory of your Airflow installation, create a DAG file, for example:
 etl pipeline mongodb.py.
 • Define the DAG with three tasks: extract data, transform data, and load data. You
 can use the following code to define the DAG:

 from airflow import DAG

 from airflow.operators.python_operator import PythonOperator

 from datetime import datetime

 import pandas as pd

 from pymongo import MongoClient

 # Define default arguments for the DAG

 default_args = {

 ’owner’: ’airflow’,

 ’start_date’: datetime(2023, 1, 1),

 ’retries’: 1,

 }
 # Define the DAG

 dag = DAG(

 ’etl_pipeline_mongodb’,

 default_args=default_args,

 schedule_interval=’0 6 * * *’, # Run every day at 6:00 AM

)

**Step 4: Create the Sales CSV File**

 Create a file named sales.csv in a directory accessible by your Airflow DAG.

 It should contain

 sales data like the example below:

 transaction_id,customer_id,product_id,quantity,price

 T001,C001,P001,2,100

 T002,C002,P002,1,200

 T003,C003,P003,3,50

 **Step 5: Run the DAG**
 • Start the Airflow web server and navigate to http://localhost:8080.
 • Activate your DAG (etl pipeline mongodb) in the Airflow UI.
 • Manually trigger the DAG to test it. Verify the tasks in the DAG and check the logs
 to ensure successful execution.

**Step 6: Schedule the DAG**
 By default, the DAG will be scheduled to run at 6:00 AM every day (schedule interval=’0 6* * *’). Verify the schedule by checking the DAG’s configuration in the Airflow UI.

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
from pymongo import MongoClient


# Define default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2025, 1, 1),
    'retries': 2,
}

# Define the DAG
dag = DAG(
    'etl_pipeline_mongodb',
    default_args=default_args,
    schedule='0 6 * * *',  # Run every day at 6:00 AM
)

# Define the extract task
def extract_data(**kwargs):
    # Read the CSV file containing sales data
    url = 'https://raw.githubusercontent.com/falawar7/AAI_634O/refs/heads/main/Week3/sales.csv'
    df = pd.read_csv(url)
    kwargs['ti'].xcom_push(key = 'extracted_df', value = df)
    # Print the extracted data (this will be logged in the Airflow UI)
    print("Extracted Data:")
    print(df)
    return

# Define the transform task
def transform_data(**kwargs):
    # Example transformation: calculate total sales for each product
    df1 = kwargs['ti'].xcom_pull(key = 'extracted_df', task_ids = 'extract_data')
    df1['total_sales'] = df1['quantity'] * df1['price']
    # Print the transformed data (this will be logged in the Airflow UI)
    kwargs['ti'].xcom_push(key = 'extracted_df_1', value = df1)
    print("Transformed Data:")
    print(df1)
    return

# Define the load task
def load_data(**kwargs):
    # Load the transformed data into MongoDB
    client = MongoClient("mongodb+srv://faysalelawar:pb6LB2kBPQ5Be5vN@dataengineeringcluster.61mrj.mongodb.net/?retryWrites=true&w=majority&appName=DataEngineeringCluster")
    db = client['sales_db_1']
    collection = db['sales']
    df = kwargs['ti'].xcom_pull(key = 'extracted_df_1', task_ids = 'transform_data')
    # Convert the DataFrame to a dictionary of records
    sales_data = df.to_dict(orient='records')
    # Insert the data into MongoDB
    collection.insert_many(sales_data)
    # Print the loaded data (this will be logged in the Airflow UI)
    print("Loaded Data into MongoDB:")
    for record in sales_data:
        print(record)  # Print each record inserted into MongoDB

# Define the tasks in the DAG
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag,
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag,
)

load_task = PythonOperator(
    task_id='load_data',
    python_callable=load_data,
    dag=dag,
)

# Set the task dependencies
extract_task >> transform_task >> load_task