In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ETLPipeline")

# Initialize Spark
spark = SparkSession.builder.appName("DailyETLPipeline").getOrCreate()

# Pipeline configuration
CONFIG = {
    "source_path": "Files/raw/",
    "staging_path": "Files/staging/",
    "target_path": "Tables/",
    "run_date": datetime.now().strftime("%Y-%m-%d")
}

logger.info(f"ðŸš€ ETL Pipeline started - Run Date: {CONFIG['run_date']}")

## ðŸ“¥ Step 1: Extract - Load Raw Data

In [None]:
# Define schemas for data validation
sales_schema = StructType([
    StructField("transaction_id", StringType(), False),
    StructField("customer_id", StringType(), False),
    StructField("customer_name", StringType(), True),
    StructField("customer_email", StringType(), True),
    StructField("product_name", StringType(), False),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), False),
    StructField("unit_price", DoubleType(), False),
    StructField("total_amount", DoubleType(), False),
    StructField("transaction_date", DateType(), False),
    StructField("region", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("is_member", StringType(), True)
])

# Extract raw sales data
logger.info("ðŸ“‚ Loading raw sales data...")
df_raw_sales = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(f"{CONFIG['source_path']}sales_data.csv")

logger.info(f"âœ… Loaded {df_raw_sales.count()} raw sales records")

## ðŸ§¹ Step 2: Transform - Data Cleaning & Validation

In [None]:
# Data quality checks
def validate_data(df, name):
    """Perform data quality validation"""
    total_rows = df.count()
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
    
    logger.info(f"ðŸ“Š {name} - Total rows: {total_rows}")
    logger.info(f"ðŸ“Š {name} - Null counts per column:")
    null_counts.show()
    return total_rows

# Clean sales data
df_clean_sales = df_raw_sales \
    .dropDuplicates(["transaction_id"]) \
    .filter(col("total_amount") > 0) \
    .filter(col("quantity") > 0) \
    .withColumn("transaction_date", to_date(col("transaction_date"))) \
    .withColumn("is_member", when(col("is_member") == "Yes", True).otherwise(False)) \
    .withColumn("processed_timestamp", current_timestamp()) \
    .withColumn("etl_batch_id", lit(CONFIG['run_date']))

# Validate cleaned data
clean_count = validate_data(df_clean_sales, "Cleaned Sales")
logger.info(f"ðŸ§¹ Data cleaning complete - {clean_count} valid records")

## ðŸ”„ Step 3: Transform - Enrich Data

In [None]:
# Add derived columns
df_enriched = df_clean_sales \
    .withColumn("year", year(col("transaction_date"))) \
    .withColumn("month", month(col("transaction_date"))) \
    .withColumn("quarter", quarter(col("transaction_date"))) \
    .withColumn("day_of_week", dayofweek(col("transaction_date"))) \
    .withColumn("is_weekend", when(col("day_of_week").isin(1, 7), True).otherwise(False)) \
    .withColumn("profit_margin", 
        round(col("total_amount") * 0.35, 2)  # Assuming 35% margin
    ) \
    .withColumn("customer_tier",
        when(col("total_amount") >= 1000, "Premium")
        .when(col("total_amount") >= 500, "Standard")
        .otherwise("Basic")
    )

logger.info("âœ¨ Data enrichment complete")
df_enriched.printSchema()

## ðŸ’¾ Step 4: Load - Write to Delta Tables

In [None]:
# Write to Delta table with merge/upsert
target_table = "sales_transactions"

logger.info(f"ðŸ’¾ Writing to Delta table: {target_table}")

# Use merge for incremental load (upsert pattern)
df_enriched.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("year", "month") \
    .save(f"{CONFIG['target_path']}{target_table}")

logger.info(f"âœ… Successfully loaded {df_enriched.count()} records to {target_table}")

## ðŸ“ˆ Step 5: Post-Load Validation

In [None]:
# Validate loaded data
df_loaded = spark.read.format("delta").load(f"{CONFIG['target_path']}{target_table}")

# Generate summary statistics
summary = df_loaded.agg(
    count("*").alias("total_records"),
    countDistinct("customer_id").alias("unique_customers"),
    sum("total_amount").alias("total_revenue"),
    avg("total_amount").alias("avg_transaction"),
    min("transaction_date").alias("earliest_date"),
    max("transaction_date").alias("latest_date")
)

print("\nðŸ“Š ETL Pipeline Summary:")
print("=" * 50)
summary.show(truncate=False)

logger.info("ðŸŽ‰ ETL Pipeline completed successfully!")