etl_pipeline/
├── dags/
│   └── etl_dag.py
├── plugins/
│   └── custom_transform_operator.py
├── utils/
│   └── transformations.py


In [None]:
#Dependencies
!pip install apache-airflow

In [7]:
# 1. etl_dag.py: Define the Airflow DAG

# dags/etl_dag.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from plugins.custom_transform_operator import CustomTransformOperator
from utils.transformations import standardize_and_clean_data


def extract_from_api(**kwargs):
    import requests
    response = requests.get('https://api.example.com/data')
    data = response.json()
    kwargs['ti'].xcom_push(key='api_data', value=data)


def extract_from_s3(**kwargs):
    import boto3
    import json
    s3 = boto3.client('s3')
    response = s3.get_object(Bucket='my-bucket', Key='data/file.json')
    data = json.loads(response['Body'].read())
    kwargs['ti'].xcom_push(key='s3_data', value=data)


def load_to_warehouse(**kwargs):
    import pandas as pd
    from sqlalchemy import create_engine

    transformed_data = kwargs['ti'].xcom_pull(key='final_data')
    df = pd.DataFrame(transformed_data)

    engine = create_engine('postgresql://user:pass@host:5432/warehouse')
    df.to_sql('target_table', con=engine, if_exists='replace', index=False)


default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='modular_etl_pipeline',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
    tags=['etl', 'airflow', 'data-pipeline']
) as dag:

    extract_api = PythonOperator(
        task_id='extract_from_api',
        python_callable=extract_from_api,
        provide_context=True
    )

    extract_s3 = PythonOperator(
        task_id='extract_from_s3',
        python_callable=extract_from_s3,
        provide_context=True
    )

    transform = CustomTransformOperator(
        task_id='transform_data'
    )

    clean_data = PythonOperator(
        task_id='standardize_and_clean',
        python_callable=standardize_and_clean_data,
        provide_context=True
    )

    load = PythonOperator(
        task_id='load_to_warehouse',
        python_callable=load_to_warehouse,
        provide_context=True
    )

    [extract_api, extract_s3] >> transform >> clean_data >> load


ModuleNotFoundError: No module named 'plugins'

In [6]:
## Custom Transformation Operator

# plugins/custom_transform_operator.py

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class CustomTransformOperator(BaseOperator):
    @apply_defaults
    def __init__(self, *args, **kwargs):
        super(CustomTransformOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        api_data = context['ti'].xcom_pull(key='api_data')
        s3_data = context['ti'].xcom_pull(key='s3_data')

        # Combine and transform
        combined = api_data + s3_data
        filtered = [record for record in combined if record.get('status') == 'active']

        totals = {}
        for item in filtered:
            category = item['category']
            totals[category] = totals.get(category, 0) + 1

        context['ti'].xcom_push(key='transformed_data', value=totals)


In [None]:
##Standardize and Clean Function

# utils/transformations.py

def standardize_and_clean_data(**kwargs):
    import pandas as pd

    transformed_data = kwargs['ti'].xcom_pull(key='transformed_data')
    df = pd.DataFrame([
        {'category': key, 'count': value}
        for key, value in transformed_data.items()
    ])

    df['category'] = df['category'].str.lower().str.strip()
    df['count'] = pd.to_numeric(df['count'], errors='coerce').fillna(0)

    kwargs['ti'].xcom_push(key='final_data', value=df.to_dict(orient='records'))
