# ETL Process (Airflow DAG)

Create an Airflow DAG that will daily add data from the previous day to the analytical database.

To track changes in dimensions, we will use Slowly Changing Dimension Type 2.

In [None]:
from datetime import datetime, timedelta
from airflow.sdk import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook
import pandas as pd
import numpy as np
import logging
from typing import Union
from sqlalchemy import text
    
# Constants
SCHEMA_NAME = 'analytics'
DIM_CUSTOMERS_TABLE = 'dim_customers'
DIM_PRODUCTS_TABLE = 'dim_products'

# SCD Configuration for Type 2 dimensions
SCD_CONFIG = {
    'customers': {
        'id_column': 'customer_id',
        'change_columns': ['customer_name', 'customer_category_name', 'city_name', 'state_province_name'],
        'target_table': DIM_CUSTOMERS_TABLE
    },
    'products': {
        'id_column': 'stock_item_id',
        'change_columns': ['stock_item_name', 'stock_group_names', 'color_name',
                          'unit_package_type_name', 'outer_package_type_name', 'brand', 'size'],
        'target_table': DIM_PRODUCTS_TABLE
    }
}

# Database connections
SRC_DB_ID = 'neon_wwi'
DST_DB_ID = 'neon_analytics'
src_hook = PostgresHook(postgres_conn_id=SRC_DB_ID)
dst_hook = PostgresHook(postgres_conn_id=DST_DB_ID)

logger = logging.getLogger(__name__)


default_args = {
    'owner': 'analytics_team',
    'depends_on_past': False,
    'start_date': datetime(2025, 9, 25),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email_on_retry': False,
}

dag_config = {
    'default_args': default_args,
    'description': 'Daily ETL pipeline for loading data from operational DB to analytics warehouse',
    'schedule': '0 3 * * *', # Runs at 3 AM daily
    'catchup': False,
    'tags': ['analytics', 'etl'],
    'max_active_runs': 1,
    'doc_md': """\
    # Analytics Daily ETL Pipeline

    This DAG performs daily incremental load from operational database (WWI) to analytics data warehouse.

    ## Main Tasks:
    - Extract daily orders, invoices, and order lines
    - Process SCD Type 2 for customer and product dimensions
    - Load fact and dimension tables

    ## Dependencies:
    - Requires Postgres connections: `neon_wwi` and `neon_analytics`
    """
}

QUERY_EXTRACT_ORDERS = text("""
    SELECT
        order_id
        , customer_id
        , order_date
        , expected_delivery_date
        , picking_completed_when
    FROM
        sales.orders
    WHERE
        order_date = :yesterday_ds
""")

QUERY_EXTRACT_INVOICES = text("""
    WITH invoice_totals AS (
        SELECT
            i.invoice_id
            , SUM(il.line_profit) AS invoice_profit
            , SUM(il.extended_price) AS invoice_amount
            , COUNT(il.invoice_line_id) AS invoice_lines_count
        FROM
            sales.invoices i
        LEFT JOIN sales.invoice_lines il ON i.invoice_id = il.invoice_id
        WHERE
            i.invoice_date = :yesterday_ds
        GROUP BY
            i.invoice_id
    ),
    payment_totals AS (
        SELECT
            invoice_id
            , SUM(transaction_amount) AS paid_amount
        FROM
            sales.customer_transactions
        WHERE
            is_finalized = TRUE
            AND invoice_id IN (
                SELECT invoice_id
                FROM sales.invoices
                WHERE invoice_date = :yesterday_ds
        )
        GROUP BY
            invoice_id
    )
    SELECT
        i.invoice_id
        , i.order_id
        , COALESCE(pt.paid_amount, 0) AS paid_amount
        , i.invoice_date
        , i.delivery_method_id
        , i.confirmed_delivery_time
        , i.returned_delivery_data
        , CASE WHEN i.confirmed_delivery_time IS NOT NULL THEN TRUE ELSE FALSE END AS is_delivered
        , COALESCE(it.invoice_profit, 0) AS invoice_profit
        , COALESCE(it.invoice_amount, 0) AS invoice_amount
        , COALESCE(it.invoice_lines_count, 0) AS invoice_lines_count
    FROM
        sales.invoices i
    LEFT JOIN invoice_totals it ON i.invoice_id = it.invoice_id
    LEFT JOIN payment_totals pt ON i.invoice_id = pt.invoice_id
    WHERE
        i.invoice_date = :yesterday_ds
""")

QUERY_EXTRACT_ORDER_LINES = text("""
    SELECT
        ol.order_line_id
        , o.order_id
        , ol.stock_item_id
        , ol.quantity
        , ol.unit_price
        , il.quantity AS il_quantity
        , il.line_profit AS il_line_profit
        , il.extended_price AS il_extended_price
    FROM
        sales.orders o
        LEFT JOIN sales.order_lines ol ON o.order_id = ol.order_id
        LEFT JOIN sales.invoices i ON i.order_id = o.order_id
        LEFT JOIN sales.invoice_lines il ON i.invoice_id = il.invoice_id AND ol.stock_item_id = il.stock_item_id
    WHERE
        o.order_date = :yesterday_ds
""")

QUERY_EXTRACT_CUSTOMERS_SRC = """
    SELECT
        c.customer_id
        , c.customer_name
        , cc.customer_category_name
        , ci.city_name
        , sp.state_province_name
    FROM
        sales.customers c
        LEFT JOIN sales.customer_categories cc
            ON c.customer_category_id = cc.customer_category_id
        LEFT JOIN application.cities ci
            ON c.delivery_city_id = ci.city_id
        LEFT JOIN application.state_provinces sp
            ON ci.state_province_id = sp.state_province_id         
"""

QUERY_EXTRACT_CUSTOMERS_DST = """
    SELECT
        *
    FROM
        analytics.dim_customers
    WHERE
        current_record = TRUE
"""

QUERY_EXTRACT_PRODUCTS_SRC = """
    WITH stock_groups_agg AS (
        SELECT
            sisg.stock_item_id
            , STRING_AGG(sg.stock_group_name, ', ') AS stock_group_names
        FROM
            warehouse.stock_item_stock_groups sisg
            LEFT JOIN warehouse.stock_groups sg
                ON sisg.stock_group_id = sg.stock_group_id
        GROUP
            BY sisg.stock_item_id
    )
    SELECT
        si.stock_item_id
        , si.stock_item_name
        , sg.stock_group_names
        , c.color_name
        , ptu.package_type_name AS unit_package_type_name
        , pto.package_type_name AS outer_package_type_name
        , si.brand
        , si.size
    FROM
        warehouse.stock_items si
        LEFT JOIN stock_groups_agg sg
            ON si.stock_item_id = sg.stock_item_id
        LEFT JOIN warehouse.package_types ptu
            ON si.unit_package_id = ptu.package_type_id
        LEFT JOIN warehouse.package_types pto
            ON si.outer_package_id = pto.package_type_id
        LEFT JOIN warehouse.colors c
            ON c.color_id = si.color_id
"""

QUERY_EXTRACT_PRODUCTS_DST = """
    SELECT
        *
    FROM
        analytics.dim_products
    WHERE
        current_record = TRUE
"""

def handle_etl_failure(context):
    """Enhanced error handling for ETL tasks"""
    task_instance = context['task_instance']
    exception = context.get('exception')
    execution_date = context['execution_date']

    logger.error(f"ETL Task {task_instance.task_id} failed on {execution_date}")
    logger.error(f"Exception: {str(exception)}")
    logger.error(f"Task try number: {task_instance.try_number}")

    # Send email notification (uses Airflow's default email config)
    try:
        task_instance.email_on_failure(subject=f"ETL Failure: {task_instance.task_id}", html_content=None)
    except Exception as e:
        logger.error(f"Failed to send failure email: {e}")

def safe_column_comparison(col1: pd.Series, col2: pd.Series) -> pd.Series:
    """
    Safe column comparison handling NULL values appropriately
    Returns boolean series indicating where values differ
    """
    return (col1 != col2) & ~(col1.isna() & col2.isna())

def detect_scd_type2_changes(source_df: pd.DataFrame, target_df: pd.DataFrame, entity_type: str) -> Union[pd.DataFrame, None]:
    """Detect changes for SCD Type 2 dimension processing"""
    if source_df.empty:
        logger.info("Source DataFrame is empty - no changes to process")
        return None

    config = SCD_CONFIG[entity_type]
    id_col = config['id_column']
    change_cols = config['change_columns']

    # Check duplicates
    if source_df.duplicated(subset=[id_col]).any():
        logger.warning(f"Duplicate IDs found in source data for {entity_type}")

    source_df.rename(columns={id_col: f'{id_col}_src'}, inplace=True)
    target_df.rename(columns={id_col: f'{id_col}_dst'}, inplace=True)

    # Merge data
    merged = source_df.merge(
        target_df,
        left_on=f'{id_col}_src',
        right_on=f'{id_col}_dst',
        suffixes=['_src', '_dst'],
        how='left'
    )

    # Build conditions for changes
    change_conditions = [merged[f'{id_col}_dst'].isna()]
    for col in change_cols:
        change_conditions.append(
            safe_column_comparison(merged[f'{col}_src'], merged[f'{col}_dst'])
        )

    # Combine conditions
    changes_mask = np.any(change_conditions, axis=0)
    changes = merged[changes_mask]

    if changes.empty:
        logger.info(f"No dimension changes detected for {id_col}")
        return None

    # Prepare result
    result_columns = {f'{id_col}_src': id_col}
    for col in change_cols:
        result_columns[f'{col}_src'] = col

    result = changes[list(result_columns.keys())].rename(columns=result_columns)

    # Add SCD metadata
    result['valid_from'] = datetime.now().date()
    result['valid_to'] = None
    result['current_record'] = True
    logger.info(f"Detected {len(result)} changes for {id_col}")
    return result

def validate_scd_data(df: pd.DataFrame, entity_type: str) -> bool:
    """Validate dimension data before loading"""
    if df is None:
        return True

    config = SCD_CONFIG[entity_type]
    id_col = config['id_column']

    # Check required columns
    required_columns = [id_col, 'valid_from', 'valid_to', 'current_record']
    missing_columns = set(required_columns) - set(df.columns)
    if missing_columns:
        logger.error(f"Missing required columns: {missing_columns}")
        return False

    # Validate primary key
    if df[id_col].isna().any():
        logger.error(f"NULL values found in primary key column: {id_col}")
        return False

    # Check for duplicates
    duplicate_ids = df[df.duplicated(subset=[id_col])]
    if not duplicate_ids.empty:
        logger.warning(f"Found {len(duplicate_ids)} duplicate IDs in source data")

    return True

def load_scd_type_2(changed_data: pd.DataFrame, entity_type: str) -> None:
    """Load dimension changes using SCD Type 2 approach"""
    if changed_data is None or changed_data.empty:
        logger.info(f"No changes to load for {entity_type}")
        return

    config = SCD_CONFIG[entity_type]
    id_col = config['id_column']
    table_name = f"dim_{entity_type}"

    engine = dst_hook.get_sqlalchemy_engine()
    try:
        with engine.begin() as connection:
            # Update existing records
            ids = changed_data[id_col].tolist()
            if ids:
                update_sql = text(f"""
                    UPDATE {SCHEMA_NAME}.{table_name}
                    SET
                        valid_to = CURRENT_DATE - INTERVAL '1 day'
                        , current_record = FALSE
                    WHERE
                        {id_col} = ANY(:ids)
                        AND current_record = TRUE
                """)
                
                connection.execute(update_sql, {'ids': ids})
                # Insert new records
                changed_data.to_sql(
                    table_name,
                    connection,
                    schema=SCHEMA_NAME,
                    if_exists='append',
                    index=False,
                    method='multi'
                )

                logger.info(f"Successfully loaded {len(ids)} new records for {entity_type}")

    except Exception as e:
        logger.error(f"Failed to load {entity_type} dimension: {str(e)}")
        raise

@dag(**dag_config)
def wwi_to_analytics_daily():

    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def extract_orders(**context) -> pd.DataFrame:
        """Extracts daily orders data from source system for incremental processing."""
        logical_date = context['logical_date']
        yesterday_ds = (logical_date - timedelta(days=1)).strftime('%Y-%m-%d')
        result = src_hook.get_pandas_df(QUERY_EXTRACT_ORDERS, parameters={'yesterday_ds': yesterday_ds})
        return result

    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def extract_invoices(**context) -> pd.DataFrame:
        """Extracts invoice information with calculated totals and payment status for the processing date."""
        logical_date = context['logical_date']
        yesterday_ds = (logical_date - timedelta(days=1)).strftime('%Y-%m-%d')        
        result = src_hook.get_pandas_df(QUERY_EXTRACT_INVOICES, parameters={'yesterday_ds': yesterday_ds})
        return result

    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def extract_order_lines(**context) -> pd.DataFrame:
        """Extracts order line details with associated invoice information for daily sales analysis."""
        logical_date = context['logical_date']
        yesterday_ds = (logical_date - timedelta(days=1)).strftime('%Y-%m-%d')           
        result = src_hook.get_pandas_df(QUERY_EXTRACT_ORDER_LINES, parameters={'yesterday_ds': yesterday_ds})
        return result

    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def extract_customers_src() -> pd.DataFrame:
        """Extracts current customer master data including categories and geographical information from source."""       
        result = src_hook.get_pandas_df(QUERY_EXTRACT_CUSTOMERS_SRC)
        return result

    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def extract_customers_dst() -> pd.DataFrame:
        """Extracts existing customer dimension records from analytics warehouse for change detection."""
        result = dst_hook.get_pandas_df(QUERY_EXTRACT_CUSTOMERS_DST)
        return result

    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def extract_products_src() -> pd.DataFrame:
        """Extracts product master data with attributes and classifications from operational database."""
        result = src_hook.get_pandas_df(QUERY_EXTRACT_PRODUCTS_SRC)
        return result

    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def extract_products_dst() -> pd.DataFrame:
        """Extracts current product dimension records from data warehouse for SCD comparison."""
        result = dst_hook.get_pandas_df(QUERY_EXTRACT_PRODUCTS_DST)
        return result

    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def validate_source_data(df: pd.DataFrame, table_name: str) -> pd.DataFrame:
        """Basic data validation"""
        if df.empty:
            logger.error(f"Empty DataFrame for {table_name}")
            raise ValueError(f"Empty data for {table_name}")

        if df.isnull().any().any():
            null_cols = df.columns[df.isnull().any()].tolist()
            logger.warning(f"NULL values found in columns: {null_cols}")

        if df.duplicated().any():
            logger.warning(f"Found {df.duplicated().sum()} duplicates in {table_name}")

        return df

    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def transform_check_changed_customers(customers_src: pd.DataFrame, customers_dst: pd.DataFrame) -> Union[pd.DataFrame, None]:
        """Identifies customer dimension changes using SCD Type 2 logic for incremental updates."""
        return detect_scd_type2_changes(customers_src, customers_dst, 'customers')

    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def transform_check_changed_products(products_src: pd.DataFrame, products_dst: pd.DataFrame) -> Union[pd.DataFrame, None]:
        """Identifies product dimension modifications requiring new version creation in the warehouse."""
        return detect_scd_type2_changes(products_src, products_dst, 'products')

    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def load_fact_orders(df_orders: pd.DataFrame) -> None:
        """Load orders fact table"""
        if df_orders.empty:
            logger.info("No orders data to load")
            return
        try:
            engine = dst_hook.get_sqlalchemy_engine()
            df_orders.to_sql(
                'fact_orders',
                engine,
                schema=SCHEMA_NAME,
                if_exists='append',
                index=False,
                method='multi'
            )
            logger.info(f"Loaded {len(df_orders)} orders")
        except Exception as e:
            logger.error(f"Failed to load orders: {str(e)}")
            raise
    
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def load_fact_invoices(df_invoices: pd.DataFrame) -> None:
        """Load invoices fact table"""
        if df_invoices.empty:
            logger.info("No invoices data to load")
            return
        try:
            engine = dst_hook.get_sqlalchemy_engine()
            df_invoices.to_sql(
                'fact_invoices',
                engine,
                schema=SCHEMA_NAME,
                if_exists='append',
                index=False,
                method='multi'
            )
            logger.info(f"Loaded {len(df_invoices)} invoices")
        except Exception as e:
            logger.error(f"Failed to load invoices: {str(e)}")
            raise
        
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def load_fact_order_lines(df_order_lines: pd.DataFrame) -> None:
        """Load order lines fact table"""
        if df_order_lines.empty:
            logger.info("No order lines data to load")
            return
        try:
            engine = dst_hook.get_sqlalchemy_engine()
            df_order_lines.to_sql(
                'fact_order_lines',
                engine,
                schema=SCHEMA_NAME,
                if_exists='append',
                index=False,
                method='multi'
            )
            logger.info(f"Loaded {len(df_order_lines)} order lines")
        except Exception as e:
            logger.error(f"Failed to load order lines: {str(e)}")
            raise
        
    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def load_scd_type_2_customers(changed_customers: pd.DataFrame) -> None:
        """Loads customer dimension changes by closing old records and inserting new versions with validation."""
        if validate_scd_data(changed_customers, 'customers'):
            load_scd_type_2(changed_customers, 'customers')

    @task(
        retries=3,
        retry_delay=timedelta(minutes=5),
        on_failure_callback=handle_etl_failure
    )
    def load_scd_type_2_products(changed_products: pd.DataFrame) -> None:
        """Applies product dimension updates using SCD Type 2 pattern with data quality checks."""
        if validate_scd_data(changed_products, 'products'):
            load_scd_type_2(changed_products, 'products')

    # ==========================================================================
    # WORKFLOW
    # ==========================================================================

    # Extract
    df_orders = extract_orders()
    df_invoices = extract_invoices()
    df_order_lines = extract_order_lines()

    df_customers_src = extract_customers_src()
    df_customers_dst = extract_customers_dst()
    df_products_src = extract_products_src()
    df_products_dst = extract_products_dst()

    # Validate
    df_orders_valid = validate_source_data(df_orders, "orders")
    df_invoices_valid = validate_source_data(df_invoices, "invoices")
    df_order_lines_valid = validate_source_data(df_order_lines, "order_lines")

    # Transform
    df_customers_changes = transform_check_changed_customers(df_customers_src, df_customers_dst)
    df_products_changes = transform_check_changed_products(df_products_src, df_products_dst)

    # Load facts
    load_fact_orders(df_orders_valid)
    load_fact_invoices(df_invoices_valid)
    load_fact_order_lines(df_order_lines_valid)
    
    # Load dimensions
    load_scd_type_2_customers(df_customers_changes)
    load_scd_type_2_products(df_products_changes)

# ==========================================================================
# Instantiate the DAG
# ==========================================================================

wwi_to_analytics_daily()