In [None]:
# CARTS Aggregates Workflow - PySpark Implementation

This notebook converts the Informatica PowerCenter workflow `wf_CARTS_AGGREGATES_02.XML` to PySpark implementation.

## Original Workflow Overview
- **Workflow Name**: wf_CARTS_AGGREGATES_02
- **Description**: CARTS (Commissary Automated Report Transaction System) data aggregation workflow
- **Schedule**: Runs every 6 hours starting at 15:00 on 3/10/2025
- **Server**: INT_SVC_PROD (Domain_PROD)

## Workflow Tasks Sequence
The original workflow executes the following tasks in dependency order:

1. **Data Preparation**: `cmd_aggts_prep_carts` - Prepares CARTS data for aggregation
2. **Dashboard Processing**: 
   - `cmd_storeops_dshbrd_mnthly` - Monthly store operations dashboard
   - `cmd_storeops_dshbrd_fytd` - Fiscal year-to-date store operations dashboard
3. **Department Sales Aggregations**:
   - `cmd_carts_dept_daily_sales_STR` - Store level department daily sales
   - `cmd_carts_dept_daily_sales_MNR` - Minor level department daily sales  
   - `cmd_carts_dept_daily_sales_MJR` - Major level department daily sales
   - `cmd_carts_dept_daily_sales_MBU` - Major business unit level department daily sales
4. **Customer Count Aggregations**:
   - `cmd_carts_customer_count_STR` - Store level customer counts
   - `cmd_carts_customer_count_MNR` - Minor level customer counts
   - `cmd_carts_customer_count_MJR` - Major level customer counts
   - `cmd_carts_customer_count_MBU` - Major business unit level customer counts
5. **Activity Aggregations**:
   - `cmd_carts_daily_terminal_activity_aggt` - Daily terminal activity aggregates
   - `cmd_carts_daily_item_sales_aggt` - Daily item sales aggregates
6. **Store Statistics**: `cmd_carts_store_daily_stats_incremental` - Incremental store daily statistics
7. **Validation**: `cmd_sales_validation_carts` - Sales data validation

**Error Handling**: All tasks link to `email_failure` task on failure, which sends notifications to edw_infa_etl@deca.mil and service.desk@deca.mil

In [None]:
# Import required libraries
import os
import sys
from datetime import datetime, timedelta
from typing import List, Dict, Any
import logging
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

# PySpark imports
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf

# Initialize logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('carts_aggregates_workflow.log'),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

print("Libraries imported successfully")

In [None]:
# Spark Configuration
def create_spark_session(app_name: str = "CARTS_Aggregates_Workflow") -> SparkSession:
    """
    Create and configure Spark session for CARTS aggregates processing
    """
    conf = SparkConf().setAppName(app_name)
    
    # Configure Spark for EDW environment
    conf.set("spark.sql.adaptive.enabled", "true")
    conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
    conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
    conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
    # Memory and executor configuration for EDW workloads
    conf.set("spark.executor.memory", "4g")
    conf.set("spark.executor.cores", "4")
    conf.set("spark.sql.shuffle.partitions", "200")
    
    # Oracle database configuration (assuming Oracle backend)
    conf.set("spark.jars", "/opt/spark/jars/ojdbc8.jar")  # Update path as needed
    
    spark = SparkSession.builder.config(conf=conf).getOrCreate()
    
    # Set log level
    spark.sparkContext.setLogLevel("WARN")
    
    logger.info(f"Spark session created: {app_name}")
    logger.info(f"Spark version: {spark.version}")
    
    return spark

# Initialize Spark session
spark = create_spark_session()
print(f"Spark session initialized: {spark.version}")

In [None]:
# Configuration and Constants
class CARTSConfig:
    """Configuration class for CARTS aggregates workflow"""
    
    # Database connection parameters
    DB_URL = "jdbc:oracle:thin:@//your-oracle-host:1521/service_name"  # Update with actual connection
    DB_USER = "edw_user"  # Update with actual username
    DB_PASSWORD = "your_password"  # Update with actual password or use secret management
    
    # Table names
    CARTS_SALES_FACT = "edw.carts_sales_fact"
    CARTS_CUSTOMER_DIM = "edw.carts_customer_dim"
    CARTS_STORE_DIM = "edw.carts_store_dim"
    CARTS_PRODUCT_DIM = "edw.carts_product_dim"
    CARTS_TIME_DIM = "edw.carts_time_dim"
    
    # Output tables
    CARTS_DEPT_DAILY_SALES = "edw.carts_dept_daily_sales_aggt"
    CARTS_CUSTOMER_COUNT = "edw.carts_customer_count_aggt"
    CARTS_ITEM_SALES = "edw.carts_daily_item_sales_aggt"
    CARTS_TERMINAL_ACTIVITY = "edw.carts_daily_terminal_activity_aggt"
    CARTS_STORE_STATS = "edw.carts_store_daily_stats"
    
    # Email configuration
    SMTP_SERVER = "smtp.deca.mil"  # Update with actual SMTP server
    SMTP_PORT = 587
    EMAIL_FROM = "edw_infa_etl@deca.mil"
    EMAIL_TO = ["edw_infa_etl@deca.mil", "service.desk@deca.mil"]
    
    # Processing date (default to yesterday)
    PROCESS_DATE = datetime.now() - timedelta(days=1)

# Utility Functions
def get_db_connection_properties() -> Dict[str, str]:
    """Get database connection properties"""
    return {
        "user": CARTSConfig.DB_USER,
        "password": CARTSConfig.DB_PASSWORD,
        "driver": "oracle.jdbc.driver.OracleDriver"
    }

def read_table(spark: SparkSession, table_name: str, condition: str = None) -> DataFrame:
    """
    Read table from database with optional condition
    """
    try:
        df = spark.read.jdbc(
            url=CARTSConfig.DB_URL,
            table=table_name,
            properties=get_db_connection_properties()
        )
        
        if condition:
            df = df.filter(condition)
        
        logger.info(f"Successfully read table: {table_name}")
        return df
    except Exception as e:
        logger.error(f"Error reading table {table_name}: {str(e)}")
        raise

def write_table(df: DataFrame, table_name: str, mode: str = "overwrite") -> None:
    """
    Write DataFrame to database table
    """
    try:
        df.write.jdbc(
            url=CARTSConfig.DB_URL,
            table=table_name,
            mode=mode,
            properties=get_db_connection_properties()
        )
        logger.info(f"Successfully wrote to table: {table_name}")
    except Exception as e:
        logger.error(f"Error writing to table {table_name}: {str(e)}")
        raise

print("Configuration and utility functions defined")

In [None]:
# Error Handling and Notification Functions
class TaskStatus:
    """Task status constants"""
    SUCCEEDED = "SUCCEEDED"
    FAILED = "FAILED"
    RUNNING = "RUNNING"
    
class WorkflowError(Exception):
    """Custom exception for workflow errors"""
    pass

def send_failure_notification(task_name: str, error_message: str) -> None:
    """
    Send email notification on task failure
    Replicates the email_failure task from original workflow
    """
    try:
        subject = f"CARTS - {task_name} failed"
        body = f"""EDW has experienced a PRD load failure of {task_name}.

Refer to EDW On-Call Schedule

Call EDW primary, secondary, and alternate on-call personnel until contact is made

---------------------------------------------------------------------------------------------------

Assign priority 2 Remedy ticket

Error Details:
{error_message}

Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
        
        msg = MIMEMultipart()
        msg['From'] = CARTSConfig.EMAIL_FROM
        msg['To'] = ", ".join(CARTSConfig.EMAIL_TO)
        msg['Subject'] = subject
        
        msg.attach(MIMEText(body, 'plain'))
        
        # Note: In production, you would configure SMTP server properly
        # For now, we'll just log the notification
        logger.error(f"FAILURE NOTIFICATION - {subject}")
        logger.error(f"Recipients: {CARTSConfig.EMAIL_TO}")
        logger.error(f"Message: {body}")
        
        print(f"FAILURE NOTIFICATION SENT: {subject}")
        
    except Exception as e:
        logger.error(f"Failed to send notification: {str(e)}")

def execute_with_error_handling(func, task_name: str, *args, **kwargs) -> str:
    """
    Execute a function with error handling and notification
    """
    try:
        logger.info(f"Starting task: {task_name}")
        start_time = datetime.now()
        
        result = func(*args, **kwargs)
        
        end_time = datetime.now()
        duration = end_time - start_time
        logger.info(f"Task {task_name} completed successfully in {duration}")
        
        return TaskStatus.SUCCEEDED
        
    except Exception as e:
        error_msg = f"Task {task_name} failed: {str(e)}"
        logger.error(error_msg)
        send_failure_notification(task_name, str(e))
        return TaskStatus.FAILED

print("Error handling and notification functions defined")

In [None]:
# Task 1: Data Preparation (cmd_aggts_prep_carts)
def aggts_prep_carts(spark: SparkSession, process_date: datetime = None) -> None:
    """
    Prepare CARTS data for aggregation processing
    Equivalent to: ksh /opt/edw/prod/prep/bin/aggts_prep_carts.sh date-1
    """
    if process_date is None:
        process_date = CARTSConfig.PROCESS_DATE
    
    logger.info(f"Preparing CARTS data for date: {process_date.strftime('%Y-%m-%d')}")
    
    # Read raw CARTS sales data
    sales_raw = read_table(
        spark, 
        "edw.carts_sales_raw",
        f"sales_date = '{process_date.strftime('%Y-%m-%d')}'"
    )
    
    # Data quality checks and cleansing
    sales_cleaned = sales_raw.filter(
        (col("sales_amount").isNotNull()) &
        (col("sales_amount") >= 0) &
        (col("store_id").isNotNull()) &
        (col("item_id").isNotNull())
    )
    
    # Add derived columns
    sales_prepared = sales_cleaned.withColumn(
        "fiscal_year", 
        when(month(col("sales_date")) >= 10, year(col("sales_date")) + 1)
        .otherwise(year(col("sales_date")))
    ).withColumn(
        "fiscal_quarter",
        when(month(col("sales_date")).isin([10, 11, 12]), 1)
        .when(month(col("sales_date")).isin([1, 2, 3]), 2)
        .when(month(col("sales_date")).isin([4, 5, 6]), 3)
        .otherwise(4)
    ).withColumn(
        "process_timestamp",
        current_timestamp()
    )
    
    # Write prepared data to staging table
    write_table(sales_prepared, "edw.carts_sales_staging", "overwrite")
    
    # Log statistics
    total_records = sales_prepared.count()
    logger.info(f"Prepared {total_records} CARTS sales records for {process_date.strftime('%Y-%m-%d')}")
    
    print(f"Data preparation completed: {total_records} records processed")

# Execute data preparation task
task_status = execute_with_error_handling(
    aggts_prep_carts, 
    "cmd_aggts_prep_carts", 
    spark
)
print(f"Data preparation task status: {task_status}")

In [None]:
# Task 2: Store Operations Dashboard - Monthly (cmd_storeops_dshbrd_mnthly)
def storeops_dashboard_monthly(spark: SparkSession) -> None:
    """
    Generate monthly store operations dashboard metrics
    Equivalent to: ksh /opt/edw/powercenter/server/nm/Scripts/storeops_dshbrd_mnthly.sh
    """
    logger.info("Generating monthly store operations dashboard")
    
    # Read sales data for current month
    current_month = datetime.now().replace(day=1)
    previous_month = (current_month - timedelta(days=1)).replace(day=1)
    
    sales_data = read_table(
        spark,
        "edw.carts_sales_staging",
        f"sales_date >= '{previous_month.strftime('%Y-%m-%d')}' AND sales_date < '{current_month.strftime('%Y-%m-%d')}'"
    )
    
    # Join with store dimension
    store_dim = read_table(spark, CARTSConfig.CARTS_STORE_DIM)
    
    # Calculate monthly metrics by store
    monthly_metrics = sales_data.join(store_dim, "store_id").groupBy(
        "store_id", "store_name", "region", "district"
    ).agg(
        sum("sales_amount").alias("total_sales"),
        sum("sales_quantity").alias("total_quantity"),
        countDistinct("transaction_id").alias("transaction_count"),
        countDistinct("customer_id").alias("unique_customers"),
        avg("sales_amount").alias("avg_transaction_value")
    ).withColumn("report_month", lit(previous_month.strftime('%Y-%m')))
    
    # Write to dashboard table
    write_table(monthly_metrics, "edw.storeops_dashboard_monthly", "append")
    
    record_count = monthly_metrics.count()
    logger.info(f"Generated monthly dashboard for {record_count} stores")
    print(f"Monthly dashboard completed: {record_count} store records")

# Task 3: Store Operations Dashboard - Fiscal Year to Date (cmd_storeops_dshbrd_fytd)
def storeops_dashboard_fytd(spark: SparkSession) -> None:
    """
    Generate fiscal year to date store operations dashboard metrics
    Equivalent to: ksh /opt/edw/powercenter/server/nm/Scripts/storeops_dshbrd_fytd.sh
    """
    logger.info("Generating fiscal year to date store operations dashboard")
    
    # Calculate fiscal year start date
    current_date = datetime.now()
    if current_date.month >= 10:
        fy_start = datetime(current_date.year, 10, 1)
    else:
        fy_start = datetime(current_date.year - 1, 10, 1)
    
    # Read sales data for fiscal year to date
    sales_data = read_table(
        spark,
        "edw.carts_sales_staging",
        f"sales_date >= '{fy_start.strftime('%Y-%m-%d')}'"
    )
    
    # Join with store dimension
    store_dim = read_table(spark, CARTSConfig.CARTS_STORE_DIM)
    
    # Calculate FYTD metrics by store
    fytd_metrics = sales_data.join(store_dim, "store_id").groupBy(
        "store_id", "store_name", "region", "district", "fiscal_year"
    ).agg(
        sum("sales_amount").alias("fytd_sales"),
        sum("sales_quantity").alias("fytd_quantity"),
        countDistinct("transaction_id").alias("fytd_transactions"),
        countDistinct("customer_id").alias("fytd_unique_customers"),
        avg("sales_amount").alias("fytd_avg_transaction")
    )
    
    # Write to dashboard table
    write_table(fytd_metrics, "edw.storeops_dashboard_fytd", "overwrite")
    
    record_count = fytd_metrics.count()
    logger.info(f"Generated FYTD dashboard for {record_count} stores")
    print(f"FYTD dashboard completed: {record_count} store records")

# Execute dashboard tasks in sequence
if task_status == TaskStatus.SUCCEEDED:
    monthly_status = execute_with_error_handling(
        storeops_dashboard_monthly,
        "cmd_storeops_dshbrd_mnthly",
        spark
    )
    print(f"Monthly dashboard task status: {monthly_status}")
    
    if monthly_status == TaskStatus.SUCCEEDED:
        fytd_status = execute_with_error_handling(
            storeops_dashboard_fytd,
            "cmd_storeops_dshbrd_fytd",
            spark
        )
        print(f"FYTD dashboard task status: {fytd_status}")
else:
    print("Skipping dashboard tasks due to data preparation failure")

In [None]:
# Task 4: Department Daily Sales Aggregations
def carts_dept_daily_sales(spark: SparkSession, aggregation_level: str) -> None:
    """
    Generate daily department sales aggregates at different organizational levels
    
    Args:
        aggregation_level: STR (Store), MNR (Minor), MJR (Major), MBU (Major Business Unit)
    """
    logger.info(f"Generating department daily sales aggregates - {aggregation_level} level")
    
    # Read sales staging data
    sales_data = read_table(spark, "edw.carts_sales_staging")
    
    # Join with product dimension for department information
    product_dim = read_table(spark, CARTSConfig.CARTS_PRODUCT_DIM)
    store_dim = read_table(spark, CARTSConfig.CARTS_STORE_DIM)
    
    # Base aggregation with product and store dimensions
    base_agg = sales_data.join(product_dim, "item_id") \
                        .join(store_dim, "store_id")
    
    # Define grouping columns based on aggregation level
    if aggregation_level == "STR":  # Store level
        group_cols = ["store_id", "store_name", "dept_id", "dept_name", "sales_date"]
    elif aggregation_level == "MNR":  # Minor level (Department grouping)
        group_cols = ["minor_dept_id", "minor_dept_name", "sales_date"]
    elif aggregation_level == "MJR":  # Major level (Major department grouping)
        group_cols = ["major_dept_id", "major_dept_name", "sales_date"]
    elif aggregation_level == "MBU":  # Major Business Unit level
        group_cols = ["business_unit_id", "business_unit_name", "sales_date"]
    else:
        raise ValueError(f"Invalid aggregation level: {aggregation_level}")
    
    # Perform aggregation
    dept_sales_agg = base_agg.groupBy(*group_cols).agg(
        sum("sales_amount").alias("total_sales"),
        sum("sales_quantity").alias("total_quantity"),
        sum("cost_amount").alias("total_cost"),
        countDistinct("transaction_id").alias("transaction_count"),
        countDistinct("customer_id").alias("unique_customers"),
        avg("sales_amount").alias("avg_sales_per_transaction"),
        (sum("sales_amount") - sum("cost_amount")).alias("gross_margin")
    ).withColumn(
        "aggregation_level", lit(aggregation_level)
    ).withColumn(
        "process_timestamp", current_timestamp()
    )
    
    # Write to department sales aggregate table
    table_name = f"edw.carts_dept_daily_sales_{aggregation_level.lower()}"
    write_table(dept_sales_agg, table_name, "overwrite")
    
    record_count = dept_sales_agg.count()
    logger.info(f"Generated {record_count} department sales records for {aggregation_level} level")
    print(f"Department sales {aggregation_level} completed: {record_count} records")

# Execute department sales tasks in dependency order
if 'fytd_status' in locals() and fytd_status == TaskStatus.SUCCEEDED:
    # STR -> MNR -> MJR -> MBU (based on workflow dependencies)
    
    str_status = execute_with_error_handling(
        carts_dept_daily_sales,
        "cmd_carts_dept_daily_sales_STR",
        spark, "STR"
    )
    print(f"Department sales STR status: {str_status}")
    
    if str_status == TaskStatus.SUCCEEDED:
        mnr_status = execute_with_error_handling(
            carts_dept_daily_sales,
            "cmd_carts_dept_daily_sales_MNR",
            spark, "MNR"
        )
        print(f"Department sales MNR status: {mnr_status}")
        
        if mnr_status == TaskStatus.SUCCEEDED:
            mjr_status = execute_with_error_handling(
                carts_dept_daily_sales,
                "cmd_carts_dept_daily_sales_MJR",
                spark, "MJR"
            )
            print(f"Department sales MJR status: {mjr_status}")
            
            if mjr_status == TaskStatus.SUCCEEDED:
                mbu_status = execute_with_error_handling(
                    carts_dept_daily_sales,
                    "cmd_carts_dept_daily_sales_MBU",
                    spark, "MBU"
                )
                print(f"Department sales MBU status: {mbu_status}")
else:
    print("Skipping department sales tasks due to dashboard task failures")

In [None]:
# Task 5: Customer Count Aggregations
def carts_customer_count(spark: SparkSession, aggregation_level: str) -> None:
    """
    Generate customer count aggregates at different organizational levels
    
    Args:
        aggregation_level: STR (Store), MNR (Minor), MJR (Major), MBU (Major Business Unit)
    """
    logger.info(f"Generating customer count aggregates - {aggregation_level} level")
    
    # Read sales staging data
    sales_data = read_table(spark, "edw.carts_sales_staging")
    
    # Join with dimensions
    customer_dim = read_table(spark, CARTSConfig.CARTS_CUSTOMER_DIM)
    store_dim = read_table(spark, CARTSConfig.CARTS_STORE_DIM)
    
    # Base data with dimensions
    base_data = sales_data.join(customer_dim, "customer_id") \
                         .join(store_dim, "store_id")
    
    # Define grouping columns based on aggregation level
    if aggregation_level == "STR":  # Store level
        group_cols = ["store_id", "store_name", "region", "district", "sales_date"]
    elif aggregation_level == "MNR":  # Minor level
        group_cols = ["region", "sales_date"]
    elif aggregation_level == "MJR":  # Major level
        group_cols = ["district", "sales_date"]
    elif aggregation_level == "MBU":  # Major Business Unit level
        group_cols = ["business_unit_id", "business_unit_name", "sales_date"]
    else:
        raise ValueError(f"Invalid aggregation level: {aggregation_level}")
    
    # Customer count aggregations
    customer_counts = base_data.groupBy(*group_cols).agg(
        countDistinct("customer_id").alias("unique_customers"),
        countDistinct("transaction_id").alias("total_transactions"),
        count("customer_id").alias("total_customer_visits"),
        sum("sales_amount").alias("total_sales_amount"),
        avg("sales_amount").alias("avg_sales_per_visit")
    ).withColumn(
        "customers_per_transaction", 
        col("total_customer_visits") / col("total_transactions")
    ).withColumn(
        "sales_per_customer",
        col("total_sales_amount") / col("unique_customers")
    ).withColumn(
        "aggregation_level", lit(aggregation_level)
    ).withColumn(
        "process_timestamp", current_timestamp()
    )
    
    # Add customer segmentation metrics
    customer_segments = base_data.withColumn(
        "customer_segment",
        when(col("sales_amount") >= 100, "High Value")
        .when(col("sales_amount") >= 50, "Medium Value")
        .otherwise("Low Value")
    ).groupBy(*group_cols, "customer_segment").agg(
        countDistinct("customer_id").alias("segment_customer_count")
    )
    
    # Pivot customer segments
    customer_segments_pivot = customer_segments.groupBy(*group_cols).pivot("customer_segment").sum("segment_customer_count")
    
    # Join main aggregation with customer segments
    final_customer_counts = customer_counts.join(
        customer_segments_pivot,
        group_cols,
        "left"
    ).fillna(0)
    
    # Write to customer count aggregate table
    table_name = f"edw.carts_customer_count_{aggregation_level.lower()}"
    write_table(final_customer_counts, table_name, "overwrite")
    
    record_count = final_customer_counts.count()
    logger.info(f"Generated {record_count} customer count records for {aggregation_level} level")
    print(f"Customer count {aggregation_level} completed: {record_count} records")

# Execute customer count tasks in dependency order
if 'mbu_status' in locals() and mbu_status == TaskStatus.SUCCEEDED:
    # STR -> MNR -> MJR -> MBU (based on workflow dependencies)
    
    str_customer_status = execute_with_error_handling(
        carts_customer_count,
        "cmd_carts_customer_count_STR",
        spark, "STR"
    )
    print(f"Customer count STR status: {str_customer_status}")
    
    if str_customer_status == TaskStatus.SUCCEEDED:
        mnr_customer_status = execute_with_error_handling(
            carts_customer_count,
            "cmd_carts_customer_count_MNR",
            spark, "MNR"
        )
        print(f"Customer count MNR status: {mnr_customer_status}")
        
        if mnr_customer_status == TaskStatus.SUCCEEDED:
            mjr_customer_status = execute_with_error_handling(
                carts_customer_count,
                "cmd_carts_customer_count_MJR",
                spark, "MJR"
            )
            print(f"Customer count MJR status: {mjr_customer_status}")
            
            if mjr_customer_status == TaskStatus.SUCCEEDED:
                mbu_customer_status = execute_with_error_handling(
                    carts_customer_count,
                    "cmd_carts_customer_count_MBU",
                    spark, "MBU"
                )
                print(f"Customer count MBU status: {mbu_customer_status}")
else:
    print("Skipping customer count tasks due to department sales task failures")

In [None]:
# Task 6: Daily Terminal Activity Aggregates
def carts_daily_terminal_activity_aggt(spark: SparkSession) -> None:
    """
    Generate daily terminal activity aggregates
    Equivalent to: ksh /opt/edw/prod/sales_aggts/bin/carts_daily_terminal_activity_aggt.sh
    """
    logger.info("Generating daily terminal activity aggregates")
    
    # Read sales staging data with terminal information
    sales_data = read_table(spark, "edw.carts_sales_staging")
    store_dim = read_table(spark, CARTSConfig.CARTS_STORE_DIM)
    
    # Terminal activity aggregation
    terminal_activity = sales_data.join(store_dim, "store_id").groupBy(
        "store_id", "store_name", "terminal_id", "sales_date"
    ).agg(
        sum("sales_amount").alias("terminal_sales"),
        sum("sales_quantity").alias("terminal_quantity"),
        countDistinct("transaction_id").alias("terminal_transactions"),
        countDistinct("customer_id").alias("terminal_customers"),
        min("transaction_timestamp").alias("first_transaction_time"),
        max("transaction_timestamp").alias("last_transaction_time"),
        avg("transaction_duration_seconds").alias("avg_transaction_duration")
    ).withColumn(
        "operating_hours",
        (unix_timestamp("last_transaction_time") - unix_timestamp("first_transaction_time")) / 3600
    ).withColumn(
        "transactions_per_hour",
        col("terminal_transactions") / col("operating_hours")
    ).withColumn(
        "sales_per_hour",
        col("terminal_sales") / col("operating_hours")
    ).withColumn(
        "process_timestamp", current_timestamp()
    )
    
    # Write to terminal activity table
    write_table(terminal_activity, CARTSConfig.CARTS_TERMINAL_ACTIVITY, "overwrite")
    
    record_count = terminal_activity.count()
    logger.info(f"Generated {record_count} terminal activity records")
    print(f"Terminal activity aggregation completed: {record_count} records")

# Task 7: Daily Item Sales Aggregates
def carts_daily_item_sales_aggt(spark: SparkSession) -> None:
    """
    Generate daily item sales aggregates
    Equivalent to: ksh /opt/edw/prod/sales_aggts/bin/carts_daily_item_sales_aggt.sh
    """
    logger.info("Generating daily item sales aggregates")
    
    # Read sales staging data
    sales_data = read_table(spark, "edw.carts_sales_staging")
    product_dim = read_table(spark, CARTSConfig.CARTS_PRODUCT_DIM)
    store_dim = read_table(spark, CARTSConfig.CARTS_STORE_DIM)
    
    # Item sales aggregation
    item_sales = sales_data.join(product_dim, "item_id") \
                          .join(store_dim, "store_id") \
                          .groupBy(
        "item_id", "item_name", "item_category", "item_subcategory",
        "store_id", "store_name", "sales_date"
    ).agg(
        sum("sales_amount").alias("item_sales"),
        sum("sales_quantity").alias("item_quantity_sold"),
        sum("cost_amount").alias("item_cost"),
        countDistinct("transaction_id").alias("transactions_with_item"),
        countDistinct("customer_id").alias("customers_bought_item"),
        avg("sales_amount").alias("avg_item_price"),
        (sum("sales_amount") - sum("cost_amount")).alias("item_gross_margin")
    ).withColumn(
        "margin_percentage",
        (col("item_gross_margin") / col("item_sales")) * 100
    ).withColumn(
        "sales_velocity",
        col("item_quantity_sold") / col("transactions_with_item")
    ).withColumn(
        "process_timestamp", current_timestamp()
    )
    
    # Add ranking by sales performance
    item_sales_ranked = item_sales.withColumn(
        "sales_rank",
        row_number().over(
            Window.partitionBy("store_id", "sales_date")
                  .orderBy(desc("item_sales"))
        )
    ).withColumn(
        "quantity_rank",
        row_number().over(
            Window.partitionBy("store_id", "sales_date")
                  .orderBy(desc("item_quantity_sold"))
        )
    )
    
    # Write to item sales aggregate table
    write_table(item_sales_ranked, CARTSConfig.CARTS_ITEM_SALES, "overwrite")
    
    record_count = item_sales_ranked.count()
    logger.info(f"Generated {record_count} item sales records")
    print(f"Item sales aggregation completed: {record_count} records")

# Execute terminal activity and item sales tasks
if 'mbu_customer_status' in locals() and mbu_customer_status == TaskStatus.SUCCEEDED:
    
    terminal_status = execute_with_error_handling(
        carts_daily_terminal_activity_aggt,
        "cmd_carts_daily_terminal_activity_aggt",
        spark
    )
    print(f"Terminal activity task status: {terminal_status}")
    
    if terminal_status == TaskStatus.SUCCEEDED:
        item_sales_status = execute_with_error_handling(
            carts_daily_item_sales_aggt,
            "cmd_carts_daily_item_sales_aggt",
            spark
        )
        print(f"Item sales task status: {item_sales_status}")
else:
    print("Skipping terminal activity and item sales tasks due to customer count task failures")

In [None]:
# Task 8: Store Daily Statistics Incremental
def carts_store_daily_stats_incremental(spark: SparkSession) -> None:
    """
    Generate incremental store daily statistics
    Equivalent to: ksh /opt/edw/prod/sales_aggts/bin/carts_store_daily_stats_incremental.sh
    """
    logger.info("Generating incremental store daily statistics")
    
    # Read required data
    sales_data = read_table(spark, "edw.carts_sales_staging")
    store_dim = read_table(spark, CARTSConfig.CARTS_STORE_DIM)
    
    # Calculate comprehensive store statistics
    store_stats = sales_data.join(store_dim, "store_id").groupBy(
        "store_id", "store_name", "region", "district", "store_type", "sales_date"
    ).agg(
        # Sales metrics
        sum("sales_amount").alias("daily_sales"),
        sum("sales_quantity").alias("daily_quantity"),
        sum("cost_amount").alias("daily_cost"),
        
        # Transaction metrics
        countDistinct("transaction_id").alias("daily_transactions"),
        countDistinct("customer_id").alias("daily_customers"),
        countDistinct("item_id").alias("items_sold"),
        
        # Performance metrics
        avg("sales_amount").alias("avg_transaction_value"),
        min("sales_amount").alias("min_transaction_value"),
        max("sales_amount").alias("max_transaction_value"),
        
        # Operational metrics
        countDistinct("terminal_id").alias("active_terminals"),
        min("transaction_timestamp").alias("first_sale_time"),
        max("transaction_timestamp").alias("last_sale_time")
    ).withColumn(
        # Calculated metrics
        "gross_margin", col("daily_sales") - col("daily_cost")
    ).withColumn(
        "margin_percentage", (col("gross_margin") / col("daily_sales")) * 100
    ).withColumn(
        "sales_per_customer", col("daily_sales") / col("daily_customers")
    ).withColumn(
        "transactions_per_customer", col("daily_transactions") / col("daily_customers")
    ).withColumn(
        "items_per_transaction", col("items_sold") / col("daily_transactions")
    ).withColumn(
        "operating_hours",
        (unix_timestamp("last_sale_time") - unix_timestamp("first_sale_time")) / 3600
    ).withColumn(
        "sales_per_hour", col("daily_sales") / col("operating_hours")
    ).withColumn(
        "process_timestamp", current_timestamp()
    )
    
    # Add comparative metrics (vs previous day, week, month)
    previous_day_stats = read_table(
        spark, 
        CARTSConfig.CARTS_STORE_STATS,
        f"sales_date = '{(CARTSConfig.PROCESS_DATE - timedelta(days=1)).strftime('%Y-%m-%d')}'"
    ).select(
        col("store_id").alias("prev_store_id"),
        col("daily_sales").alias("prev_day_sales"),
        col("daily_transactions").alias("prev_day_transactions")
    )
    
    # Join with previous day data for comparison
    store_stats_with_comparison = store_stats.join(
        previous_day_stats,
        store_stats.store_id == previous_day_stats.prev_store_id,
        "left"
    ).withColumn(
        "sales_growth_pct",
        ((col("daily_sales") - col("prev_day_sales")) / col("prev_day_sales")) * 100
    ).withColumn(
        "transaction_growth_pct",
        ((col("daily_transactions") - col("prev_day_transactions")) / col("prev_day_transactions")) * 100
    ).drop("prev_store_id", "prev_day_sales", "prev_day_transactions")
    
    # Write to store statistics table
    write_table(store_stats_with_comparison, CARTSConfig.CARTS_STORE_STATS, "append")
    
    record_count = store_stats_with_comparison.count()
    logger.info(f"Generated {record_count} store statistics records")
    print(f"Store statistics completed: {record_count} records")

# Task 9: Sales Validation
def sales_validation_carts(spark: SparkSession) -> None:
    """
    Perform sales data validation checks
    Equivalent to: ksh /opt/edw/management/etlchecks/scripts/sales_validation_carts.sh
    """
    logger.info("Performing CARTS sales data validation")
    
    # Read staging and aggregate data for validation
    sales_staging = read_table(spark, "edw.carts_sales_staging")
    store_stats = read_table(spark, CARTSConfig.CARTS_STORE_STATS)
    
    # Validation checks
    validation_results = []
    
    # Check 1: Total sales amount consistency
    staging_total = sales_staging.agg(sum("sales_amount")).collect()[0][0]
    stats_total = store_stats.filter(
        col("sales_date") == CARTSConfig.PROCESS_DATE.strftime('%Y-%m-%d')
    ).agg(sum("daily_sales")).collect()[0][0]
    
    sales_variance = abs(staging_total - stats_total) if stats_total else float('inf')
    sales_variance_pct = (sales_variance / staging_total) * 100 if staging_total > 0 else 0
    
    validation_results.append({
        "check_name": "Sales Amount Consistency",
        "staging_value": staging_total,
        "aggregate_value": stats_total,
        "variance": sales_variance,
        "variance_pct": sales_variance_pct,
        "status": "PASS" if sales_variance_pct < 1.0 else "FAIL"
    })
    
    # Check 2: Record count validation
    staging_count = sales_staging.count()
    expected_min_records = 1000  # Configure based on business rules
    
    validation_results.append({
        "check_name": "Record Count Validation",
        "staging_value": staging_count,
        "expected_min": expected_min_records,
        "status": "PASS" if staging_count >= expected_min_records else "FAIL"
    })
    
    # Check 3: Data quality checks
    null_sales = sales_staging.filter(col("sales_amount").isNull()).count()
    null_customers = sales_staging.filter(col("customer_id").isNull()).count()
    negative_sales = sales_staging.filter(col("sales_amount") < 0).count()
    
    validation_results.append({
        "check_name": "Data Quality",
        "null_sales": null_sales,
        "null_customers": null_customers,
        "negative_sales": negative_sales,
        "status": "PASS" if (null_sales + null_customers + negative_sales) == 0 else "FAIL"
    })
    
    # Log validation results
    for result in validation_results:
        logger.info(f"Validation Check: {result}")
        if result["status"] == "FAIL":
            logger.error(f"VALIDATION FAILURE: {result['check_name']}")
    
    # Create validation summary DataFrame
    validation_df = spark.createDataFrame([
        {
            "validation_date": CARTSConfig.PROCESS_DATE.strftime('%Y-%m-%d'),
            "total_checks": len(validation_results),
            "passed_checks": len([r for r in validation_results if r["status"] == "PASS"]),
            "failed_checks": len([r for r in validation_results if r["status"] == "FAIL"]),
            "staging_records": staging_count,
            "staging_sales_total": staging_total,
            "process_timestamp": datetime.now()
        }
    ])
    
    # Write validation results
    write_table(validation_df, "edw.carts_validation_results", "append")
    
    # Check if any validations failed
    failed_checks = [r for r in validation_results if r["status"] == "FAIL"]
    if failed_checks:
        raise WorkflowError(f"Validation failed: {len(failed_checks)} checks failed")
    
    print(f"Sales validation completed: All {len(validation_results)} checks passed")

# Execute store statistics and validation tasks
if 'item_sales_status' in locals() and item_sales_status == TaskStatus.SUCCEEDED:
    
    store_stats_status = execute_with_error_handling(
        carts_store_daily_stats_incremental,
        "cmd_carts_store_daily_stats_incremental",
        spark
    )
    print(f"Store statistics task status: {store_stats_status}")
    
    if store_stats_status == TaskStatus.SUCCEEDED:
        validation_status = execute_with_error_handling(
            sales_validation_carts,
            "cmd_sales_validation_carts",
            spark
        )
        print(f"Sales validation task status: {validation_status}")
else:
    print("Skipping store statistics and validation tasks due to item sales task failures")

In [None]:
# Workflow Summary and Cleanup
def generate_workflow_summary(spark: SparkSession) -> None:
    """
    Generate a summary of the entire workflow execution
    """
    logger.info("Generating workflow execution summary")
    
    # Collect task statuses
    task_summary = {
        "workflow_name": "wf_CARTS_AGGREGATES_02_PySpark",
        "execution_date": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        "process_date": CARTSConfig.PROCESS_DATE.strftime('%Y-%m-%d'),
        "total_tasks": 9,
        "completed_tasks": 0,
        "failed_tasks": 0,
        "task_details": []
    }
    
    # Check each task status (if variables exist)
    task_checks = [
        ("Data Preparation", 'task_status', "cmd_aggts_prep_carts"),
        ("Monthly Dashboard", 'monthly_status', "cmd_storeops_dshbrd_mnthly"),
        ("FYTD Dashboard", 'fytd_status', "cmd_storeops_dshbrd_fytd"),
        ("Dept Sales STR", 'str_status', "cmd_carts_dept_daily_sales_STR"),
        ("Dept Sales MNR", 'mnr_status', "cmd_carts_dept_daily_sales_MNR"),
        ("Dept Sales MJR", 'mjr_status', "cmd_carts_dept_daily_sales_MJR"),
        ("Dept Sales MBU", 'mbu_status', "cmd_carts_dept_daily_sales_MBU"),
        ("Customer Count STR", 'str_customer_status', "cmd_carts_customer_count_STR"),
        ("Customer Count MNR", 'mnr_customer_status', "cmd_carts_customer_count_MNR"),
        ("Customer Count MJR", 'mjr_customer_status', "cmd_carts_customer_count_MJR"),
        ("Customer Count MBU", 'mbu_customer_status', "cmd_carts_customer_count_MBU"),
        ("Terminal Activity", 'terminal_status', "cmd_carts_daily_terminal_activity_aggt"),
        ("Item Sales", 'item_sales_status', "cmd_carts_daily_item_sales_aggt"),
        ("Store Statistics", 'store_stats_status', "cmd_carts_store_daily_stats_incremental"),
        ("Sales Validation", 'validation_status', "cmd_sales_validation_carts")
    ]
    
    for task_name, status_var, original_task in task_checks:
        if status_var in globals():
            status = globals()[status_var]
            task_summary["task_details"].append({
                "task_name": task_name,
                "original_task": original_task,
                "status": status
            })
            if status == TaskStatus.SUCCEEDED:
                task_summary["completed_tasks"] += 1
            else:
                task_summary["failed_tasks"] += 1
    
    # Calculate success rate
    if task_summary["total_tasks"] > 0:
        success_rate = (task_summary["completed_tasks"] / len(task_summary["task_details"])) * 100
        task_summary["success_rate"] = success_rate
    
    # Log summary
    logger.info("="*60)
    logger.info("CARTS AGGREGATES WORKFLOW SUMMARY")
    logger.info("="*60)
    logger.info(f"Workflow: {task_summary['workflow_name']}")
    logger.info(f"Execution Date: {task_summary['execution_date']}")
    logger.info(f"Process Date: {task_summary['process_date']}")
    logger.info(f"Tasks Completed: {task_summary['completed_tasks']}")
    logger.info(f"Tasks Failed: {task_summary['failed_tasks']}")
    if 'success_rate' in task_summary:
        logger.info(f"Success Rate: {task_summary['success_rate']:.1f}%")
    logger.info("="*60)
    
    for task in task_summary["task_details"]:
        status_symbol = "✓" if task["status"] == TaskStatus.SUCCEEDED else "✗"
        logger.info(f"{status_symbol} {task['task_name']}: {task['status']}")
    
    logger.info("="*60)
    
    # Create summary DataFrame for reporting
    summary_df = spark.createDataFrame([task_summary])
    write_table(summary_df, "edw.carts_workflow_execution_log", "append")
    
    print("\\n" + "="*60)
    print("CARTS AGGREGATES WORKFLOW COMPLETED")
    print("="*60)
    print(f"Process Date: {task_summary['process_date']}")
    print(f"Completed Tasks: {task_summary['completed_tasks']}")
    print(f"Failed Tasks: {task_summary['failed_tasks']}")
    if 'success_rate' in task_summary:
        print(f"Success Rate: {task_summary['success_rate']:.1f}%")
    print("="*60)

def cleanup_spark_session(spark: SparkSession) -> None:
    """
    Clean up Spark session and resources
    """
    try:
        # Clear cache
        spark.catalog.clearCache()
        
        # Stop Spark session
        spark.stop()
        
        logger.info("Spark session cleaned up successfully")
        print("Spark session cleaned up")
        
    except Exception as e:
        logger.warning(f"Error during cleanup: {str(e)}")

# Execute workflow summary
generate_workflow_summary(spark)

# Note: Uncomment the following line to cleanup Spark session
# cleanup_spark_session(spark)

## Implementation Notes and Configuration

### Prerequisites
1. **PySpark Environment**: Ensure PySpark is installed with appropriate version
2. **Database Connectivity**: Oracle JDBC driver (`ojdbc8.jar`) must be available in Spark classpath
3. **Database Access**: Configure connection parameters in `CARTSConfig` class
4. **Email Configuration**: Update SMTP settings for failure notifications

### Configuration Updates Required
1. **Database Connection**: Update `CARTSConfig.DB_URL`, `DB_USER`, `DB_PASSWORD`
2. **Table Names**: Verify and update table names in `CARTSConfig` to match your schema
3. **SMTP Settings**: Configure email server details for notifications
4. **Spark Configuration**: Adjust memory and executor settings based on your cluster

### Scheduling Equivalent
The original workflow runs every 6 hours. To replicate this in a production environment:

**Option 1: Cron Job**
```bash
0 */6 * * * /path/to/spark-submit --py-files /path/to/notebook.py
```

**Option 2: Airflow DAG**
```python
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

dag = DAG(
    'carts_aggregates_workflow',
    schedule_interval='0 */6 * * *',  # Every 6 hours
    catchup=False
)

spark_task = SparkSubmitOperator(
    task_id='carts_aggregates',
    application='/path/to/carts_aggregates.py',
    dag=dag
)
```

### Performance Optimizations
1. **Partitioning**: Consider partitioning large tables by date
2. **Caching**: Cache frequently accessed dimension tables
3. **Broadcast Joins**: Use broadcast hints for small dimension tables
4. **Parallel Execution**: Leverage Spark's parallel processing capabilities

### Monitoring and Alerting
1. **Logging**: All tasks log to both file and console
2. **Email Notifications**: Automatic failure notifications to operations team
3. **Validation Checks**: Built-in data quality and consistency checks
4. **Execution Summary**: Comprehensive workflow execution reporting

### Error Recovery
- Each task is wrapped with error handling
- Failed tasks trigger email notifications
- Workflow continues where possible (dependent tasks are skipped)
- Detailed error logging for troubleshooting

### Data Quality Features
- Null value checks
- Negative value validation
- Cross-aggregate consistency verification
- Record count validation
- Variance analysis between staging and aggregate data

This PySpark implementation provides equivalent functionality to the original Informatica PowerCenter workflow while leveraging modern big data processing capabilities.