# Data Processing Notebook

This notebook performs data transformation, dimensional modeling, and analytics processing.

## Steps:
1. Load validated raw data
2. Create dimensional model (facts and dimensions)
3. Customer segmentation analysis
4. Product performance analytics
5. Generate business metrics
6. Store processed data for analytics


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import logging
from datetime import datetime

# Build the Spark Session using AWS Keys from Databricks Secrets
spark = SparkSession.builder \
    .appName("PsychoBunny-DataIngestion") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.access.key", dbutils.secrets.get(scope="aws-keys", key="aws-access-key")) \
    .config("spark.hadoop.fs.s3a.secret.key", dbutils.secrets.get(scope="aws-keys", key="aws-secret-key")) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .getOrCreate()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

logger.info("Spark session initialized with Delta Lake and AWS S3 support")


INFO:__main__:Spark session initialized with Delta Lake and AWS S3 support


In [0]:
# Configuration
S3_BUCKET = "psycho-bunny-data-lake"
RAW_DATA_PATH = "s3://psycho-bunny-data-lake/raw-data/"
PROCESSED_DATA_PATH = f"s3://{S3_BUCKET}/processed-data/"
config_path = "/dbfs/FileStore/tables/pipeline_config.json"

# Load configuration
with open(config_path, 'r') as f:
    config = json.load(f)

business_rules = config['business_rules']


INFO:py4j.clientserver:Received command c on object id p0


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-7564814444599035>, line 9[0m
[1;32m      7[0m [38;5;66;03m# Load configuration[39;00m
[1;32m      8[0m [38;5;28;01mwith[39;00m [38;5;28mopen[39m(config_path, [38;5;124m'[39m[38;5;124mr[39m[38;5;124m'[39m) [38;5;28;01mas[39;00m f:
[0;32m----> 9[0m     config [38;5;241m=[39m json[38;5;241m.[39mload(f)
[1;32m     11[0m business_rules [38;5;241m=[39m config[[38;5;124m'[39m[38;5;124mbusiness_rules[39m[38;5;124m'[39m]

[0;31mNameError[0m: name 'json' is not defined

In [0]:
access = dbutils.secrets.get("aws-keys", key="aws-access-key")
secret = dbutils.secrets.get("aws-keys", key="aws-secret-key")

spark.conf.set("fs.s3a.access.key", access)
spark.conf.set("fs.s3a.secret.key", secret)
spark.conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")    


In [0]:
dbutils.fs.ls("s3://psycho-bunny-data-lake/raw-data/")

INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0


[FileInfo(path='s3://psycho-bunny-data-lake/raw-data/calendar/', name='calendar/', size=0, modificationTime=1750378984376),
 FileInfo(path='s3://psycho-bunny-data-lake/raw-data/customers/', name='customers/', size=0, modificationTime=1750378984376),
 FileInfo(path='s3://psycho-bunny-data-lake/raw-data/transactions/', name='transactions/', size=0, modificationTime=1750378984376)]

In [0]:
# Load raw data from Delta Lake
try:
    customers_df = spark.read.format("delta").load(f"{RAW_DATA_PATH}customers")
    transactions_df = spark.read.format("delta").load(f"{RAW_DATA_PATH}transactions")
    calendar_df = spark.read.format("delta").load(f"{RAW_DATA_PATH}calendar")
    
    logger.info("Raw data loaded successfully")
    logger.info(f"Customers: {customers_df.count()}, Transactions: {transactions_df.count()}, Calendar: {calendar_df.count()}")
except Exception as e:
    logger.error(f"Error loading raw data: {str(e)}")
    raise


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:__main__:Raw data loaded successfully
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Receiv

In [0]:
# Create customer dimension table
dim_customer = customers_df.select(
    col("customer_id").alias("customer_key"),
    col("first_name"),
    col("last_name"),
    concat_ws(" ", col("first_name"), col("last_name")).alias("full_name"),
    col("email"),
    col("company_name"),
    col("address"),  
    col("city"),
    col("state"),
    col("postal"),  
    col("province"),
    col("phone1").alias("phone"),  
    col("ingestion_date").alias("created_date"),
    current_timestamp().alias("updated_date")
).dropDuplicates(["customer_key"])

logger.info(f"Created customer dimension with {dim_customer.count()} records")
dim_customer.show(5)


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:__main__:✅ Created customer dimension with 2000 records
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0


+--------------------+----------+---------+----------------+--------------------+--------------------+--------------------+---------------+-----+-------+--------+------------+--------------------+--------------------+
|        customer_key|first_name|last_name|       full_name|               email|        company_name|             address|           city|state| postal|province|       phone|        created_date|        updated_date|
+--------------------+----------+---------+----------------+--------------------+--------------------+--------------------+---------------+-----+-------+--------+------------+--------------------+--------------------+
|aaron_kloska_aaro...|     Aaron|   Kloska|    Aaron Kloska|aaron_kloska@klos...|Radecker, H Phili...| 423 S Navajo St #56|      Brookhill|  QLD|   NULL|    NULL|07-9896-4827|2025-06-19 04:03:...|2025-06-20 01:18:...|
|abel_maclead_amac...|      Abel|  Maclead|    Abel Maclead|  amaclead@gmail.com| Rangoni Of Florence|  37275 St  Rt 17m M|  Mid

In [0]:
# extract product family from product code
def extract_product_family(product_code):
    if product_code and "_" in product_code:
        return product_code.split("_")[0]
    return "UNKNOWN"

extract_product_family_udf = udf(extract_product_family, StringType())

# Create simple product dimension table
dim_product = transactions_df.select("PRODUCTCODE").distinct() \
    .withColumn("product_key", col("PRODUCTCODE")) \
    .withColumn("product_family", extract_product_family_udf(col("PRODUCTCODE"))) \
    .withColumn("product_line", lit("General")) \
    .withColumn("created_date", current_timestamp())

logger.info(f"Created product dimension with {dim_product.count()} records")
dim_product.show(5)


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:__main__:✅ Created product dimension with 109 records
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0


+-----------+-----------+--------------+------------+--------------------+
|PRODUCTCODE|product_key|product_family|product_line|        created_date|
+-----------+-----------+--------------+------------+--------------------+
|   S18_4600|   S18_4600|           S18|     General|2025-06-20 03:35:...|
|   S18_1749|   S18_1749|           S18|     General|2025-06-20 03:35:...|
|   S12_3891|   S12_3891|           S12|     General|2025-06-20 03:35:...|
|   S18_2248|   S18_2248|           S18|     General|2025-06-20 03:35:...|
|  S700_1138|  S700_1138|          S700|     General|2025-06-20 03:35:...|
+-----------+-----------+--------------+------------+--------------------+
only showing top 5 rows


In [0]:
from pyspark.sql.types import DoubleType

BUSINESS_RULES = {
    "high_value_customer_threshold": 10000,  # Customers who spent more than $10K
    "refund_restocking_fee_rate": 0.10,      # 10% restocking fee on refunds
    "large_order_threshold": 5000            # Orders above $5K are considered large
}
# Simple transaction fact table with business rules using ACTUAL DataFrame columns
def calculate_restocking_fee(amount, transaction_type):
    """Calculate 10% restocking fee for refunds"""
    if transaction_type == "REFUND":
        return float(amount) * BUSINESS_RULES["refund_restocking_fee_rate"]
    return 0.0

calculate_restocking_fee_udf = udf(calculate_restocking_fee, DoubleType())

# Create fact table with business logic using actual DataFrame columns
fact_transactions = transactions_df.select(
    col("ORDERNUMBER").alias("order_number"),
    col("CUSTOMERNAME").alias("customer_name"),
    col("PRODUCTCODE").alias("product_code"),
    col("QUANTITYORDERED").alias("quantity"),
    # Calculate unit price from total amount and quantity
    (abs(col("TOTAL_AMOUNT")) / col("QUANTITYORDERED")).alias("unit_price"),
    abs(col("TOTAL_AMOUNT")).alias("total_amount"),  # Use absolute value since amounts are negative
    col("DEALSIZE").alias("deal_size"),
    col("TERRITORY").alias("territory"),
    to_date(col("ORDERDATE"), "M/d/yyyy H:mm").alias("order_date"),  # Convert date format
    # Set default status and product line
    lit("COMPLETED").alias("status"),
    lit("General").alias("product_line"),
    current_timestamp().alias("created_date")
).withColumn(
    # Business rule: Classify transaction type based on negative amounts (refunds)
    "transaction_type",
    when(col("TOTAL_AMOUNT") < 0, "REFUND").otherwise("SALE")
).withColumn(
    # Business rule: Calculate restocking fee for refunds (10% of amount)
    "restocking_fee", 
    when(col("transaction_type") == "REFUND", 
         abs(col("TOTAL_AMOUNT")) * BUSINESS_RULES["refund_restocking_fee_rate"])
    .otherwise(0.0)
).withColumn(
    # Business rule: Calculate net amount after fees
    "net_amount",
    when(col("transaction_type") == "REFUND", 
         abs(col("TOTAL_AMOUNT")) - col("restocking_fee"))
    .otherwise(abs(col("TOTAL_AMOUNT")))
).withColumn(
    # Business rule: Flag large orders (>$5K)
    "is_large_order",
    when(abs(col("TOTAL_AMOUNT")) >= BUSINESS_RULES["large_order_threshold"], 'true')
    .otherwise('false')
)

logger.info(f"Created fact transactions with {fact_transactions.count()} records")
fact_transactions.show(5)

print("\n=== Sample Fact Transactions ===")
fact_transactions.select("order_number", "customer_name", "product_code", "quantity", 
                         "unit_price", "total_amount", "transaction_type", 
                         "restocking_fee", "net_amount", "is_large_order").show(10)



INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0
INFO:__main__:✅ Created fact transactions with 2823 records
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0


+------------+--------------------+------------+--------+------------------+------------+---------+---------+----------+---------+------------+--------------------+----------------+--------------+----------+--------------+
|order_number|       customer_name|product_code|quantity|        unit_price|total_amount|deal_size|territory|order_date|   status|product_line|        created_date|transaction_type|restocking_fee|net_amount|is_large_order|
+------------+--------------------+------------+--------+------------------+------------+---------+---------+----------+---------+------------+--------------------+----------------+--------------+----------+--------------+
|       10107|   Land of Toys Inc.|    S10_1678|      30| 66.43333333333334|        1993|    Small|       NA|2003-02-24|COMPLETED|     General|2025-06-22 04:30:...|            SALE|           0.0|    1993.0|         false|
|       10121|  Reims Collectables|    S10_1678|      34|2.7058823529411766|          92|    Small|     EMEA

INFO:py4j.clientserver:Received command c on object id p0


+------------+--------------------+------------+--------+------------------+------------+----------------+--------------+----------+--------------+
|order_number|       customer_name|product_code|quantity|        unit_price|total_amount|transaction_type|restocking_fee|net_amount|is_large_order|
+------------+--------------------+------------+--------+------------------+------------+----------------+--------------+----------+--------------+
|       10107|   Land of Toys Inc.|    S10_1678|      30| 66.43333333333334|        1993|            SALE|           0.0|    1993.0|         false|
|       10121|  Reims Collectables|    S10_1678|      34|2.7058823529411766|          92|            SALE|           0.0|      92.0|         false|
|       10134|     Lyon Souveniers|    S10_1678|      41|              14.0|         574|            SALE|           0.0|     574.0|         false|
|       10145|   Toys4GrownUps.com|    S10_1678|      45|13.844444444444445|         623|            SALE|      

INFO:py4j.clientserver:Received command c on object id p0


In [0]:
# Customer segmentation based on spending
customer_segments = fact_transactions.filter(col("transaction_type") == "SALE") \
    .groupBy("customer_name") \
    .agg(
        sum("net_amount").alias("total_spent"),
        count("order_number").alias("total_orders"),
        avg("net_amount").alias("avg_order_value"),
        max("order_date").alias("last_order_date")
    ).withColumn(
        # Business rule: Customer value segmentation
        "customer_segment",
        when(col("total_spent") >= BUSINESS_RULES["high_value_customer_threshold"], "High Value")
        .otherwise("Regular")
    ).withColumn("created_date", current_timestamp())

logger.info(f"Created customer segments with {customer_segments.count()} customers")
customer_segments.show(10)


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:__main__:✅ Created customer segments with 92 customers
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0


+--------------------+-----------+------------+------------------+---------------+----------------+--------------------+
|       customer_name|total_spent|total_orders|   avg_order_value|last_order_date|customer_segment|        created_date|
+--------------------+-----------+------------+------------------+---------------+----------------+--------------------+
| Suominen Souveniers|    33642.0|          30|            1121.4|     2005-01-06|      High Value|2025-06-22 04:30:...|
|  Amica Models & Co.|    25642.0|          26| 986.2307692307693|     2004-09-09|      High Value|2025-06-22 04:30:...|
|Collectables For ...|    24096.0|          24|            1004.0|     2005-01-20|      High Value|2025-06-22 04:30:...|
|         CAF Imports|    13319.0|          13|1024.5384615384614|     2004-03-19|      High Value|2025-06-22 04:30:...|
|   giftsbymail.co.uk|    29294.0|          26|1126.6923076923076|     2004-11-01|      High Value|2025-06-22 04:30:...|
|       Rovelli Gifts|    48156.

In [0]:
# Store processed data in Delta Lake
try:
    # Store customer dimension
    dim_customer.write \
        .format("delta") \
        .mode("overwrite") \
        .save(f"{PROCESSED_DATA_PATH}dim_customer")
    
    # Store product dimension
    dim_product.write \
        .format("delta") \
        .mode("overwrite") \
        .save(f"{PROCESSED_DATA_PATH}dim_product")
    
    # Store fact transactions
    fact_transactions.write \
        .format("delta") \
        .mode("overwrite") \
        .save(f"{PROCESSED_DATA_PATH}fact_transactions")
    
    # Store customer segments
    customer_segments.write \
        .format("delta") \
        .mode("overwrite") \
        .save(f"{PROCESSED_DATA_PATH}customer_segments")
    
    logger.info("All processed data stored in Delta Lake successfully")
    
except Exception as e:
    logger.error(f"Error storing processed data: {str(e)}")
    raise


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:__main__:✅ All processed data stored in Delta Lake successfully


In [0]:
# Load to Redshift
import psycopg2
from pyspark.sql import DataFrameWriter

def load_to_redshift(df, table_name, redshift_config):
    """Load DataFrame to Redshift"""
    try:
        # Convert Spark DataFrame to Pandas for easier Redshift loading
        pandas_df = df.toPandas()
        
        # Connect to Redshift
        conn = psycopg2.connect(
            host=redshift_config["host"],
            port=redshift_config["port"],
            database=redshift_config["database"],
            user=redshift_config["user"],
            password=redshift_config["password"]
        )
        
        cursor = conn.cursor()
        
        # Create table if not exists (simplified schema)
        if table_name == "dim_customer":
            cursor.execute(f"""
                CREATE TABLE IF NOT EXISTS {table_name} (
                    customer_key VARCHAR(500),
                    first_name VARCHAR(100),
                    last_name VARCHAR(100),
                    full_name VARCHAR(200),
                    email VARCHAR(200),
                    company_name VARCHAR(200),
                    address VARCHAR(500),
                    city VARCHAR(100),
                    state VARCHAR(50),
                    postal VARCHAR(20),
                    province VARCHAR(50),
                    phone VARCHAR(50),
                    created_date TIMESTAMP,
                    updated_date TIMESTAMP
                );
            """)
        elif table_name == "fact_transactions":
            cursor.execute(f"""
                CREATE TABLE IF NOT EXISTS {table_name} (
                    order_number VARCHAR(50),
                    customer_key VARCHAR(500),
                    customer_name VARCHAR(200),
                    product_code VARCHAR(50),
                    quantity INTEGER,
                    unit_price DECIMAL(10,2),
                    total_amount DECIMAL(12,2),
                    transaction_type VARCHAR(20),
                    restocking_fee DECIMAL(10,2),
                    net_amount DECIMAL(12,2),
                    is_large_order BOOLEAN,
                    order_date DATE,
                    territory VARCHAR(50),
                    product_line VARCHAR(100),
                    status VARCHAR(50),
                    created_date TIMESTAMP
                );
            """)
        elif table_name == "customer_segments":
            cursor.execute(f"""
                CREATE TABLE IF NOT EXISTS {table_name} (
                    customer_name VARCHAR(200),
                    total_spent DECIMAL(12,2),
                    total_orders INTEGER,
                    avg_order_value DECIMAL(10,2),
                    last_order_date DATE,
                    customer_segment VARCHAR(50),
                    created_date TIMESTAMP
                );
            """)
        
        # Clear existing data
        cursor.execute(f"DELETE FROM {table_name};")
        
        # Insert new data
        for _, row in pandas_df.iterrows():
            placeholders = ', '.join(['%s'] * len(row))
            columns = ', '.join(pandas_df.columns)
            sql = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})"
            cursor.execute(sql, tuple(row))
        
        conn.commit()
        cursor.close()
        conn.close()
        
        logger.info(f"Successfully loaded {len(pandas_df)} records to Redshift table {table_name}")
        
    except Exception as e:
        logger.error(f"Error loading to Redshift table {table_name}: {str(e)}")
        raise

# Redshift configuration (using secrets)
try:
    redshift_config = {
        "host": "psycho-bunny-workgroup.499897076142.us-east-1.redshift-serverless.amazonaws.com",
        "port": "5439",
        "database": "dev",
        "user": "****",
        "password": "****"
    }
    
    # Load key tables to Redshift
    load_to_redshift(dim_customer, "dim_customer", redshift_config)
    load_to_redshift(fact_transactions, "fact_transactions", redshift_config)
    load_to_redshift(customer_segments, "customer_segments", redshift_config)
    
    logger.info("All data successfully loaded to Redshift")
    
except Exception as e:
    logger.warning(f"Redshift loading failed (this is optional): {str(e)}")
    logger.info("Continuing without Redshift - data is still available in Delta Lake")


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clie

In [0]:
# Load additional dimension tables to Redshift (dim_product and dim_calendar)
def load_additional_dimensions_to_redshift(redshift_config):
    """Load dim_product and dim_calendar to Redshift"""
    try:
        # Connect to Redshift
        conn = psycopg2.connect(
           host= "psycho-bunny-workgroup.499897076142.us-east-1.redshift-serverless.amazonaws.com",
        port="5439",
        database= "dev",
        user= "admin",
        password= "Tata!8237552399"
        )
        
        cursor = conn.cursor()
        
        # Create dim_product table schema matching the DataFrame
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS dim_product (
                PRODUCTCODE VARCHAR(50),
                product_key VARCHAR(50),
                product_family VARCHAR(100),
                product_line VARCHAR(100),
                created_date TIMESTAMP
            );
        """)
        
        # Create dim_calendar table schema (using calendar_df columns)
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS dim_calendar (
                date_key DATE,
                year INTEGER,
                month INTEGER,
                day INTEGER,
                quarter INTEGER,
                day_of_week INTEGER,
                month_name VARCHAR(20),
                day_name VARCHAR(20),
                created_date TIMESTAMP
            );
        """)
        
        # Clear existing data
        cursor.execute("DELETE FROM dim_product;")
        cursor.execute("DELETE FROM dim_calendar;")
        
        # Load dim_product data
        dim_product_pandas = dim_product.toPandas()
        for _, row in dim_product_pandas.iterrows():
            cursor.execute("""
                INSERT INTO dim_product (PRODUCTCODE, product_key, product_family, product_line, created_date) 
                VALUES (%s, %s, %s, %s, %s)
            """, (row['PRODUCTCODE'], row['product_key'], row['product_family'], row['product_line'], row['created_date']))
        
        # Create simple calendar dimension from calendar_df
        dim_calendar = calendar_df.select(
            col("CALENDAR_DATE").alias("date_key"),
            year(col("CALENDAR_DATE")).alias("year"),
            month(col("CALENDAR_DATE")).alias("month"),
            dayofmonth(col("CALENDAR_DATE")).alias("day"),
            quarter(col("CALENDAR_DATE")).alias("quarter"),
            dayofweek(col("CALENDAR_DATE")).alias("day_of_week"),
            date_format(col("CALENDAR_DATE"), "MMMM").alias("month_name"),
            date_format(col("CALENDAR_DATE"), "EEEE").alias("day_name"),
            current_timestamp().alias("created_date")
        )
        
        # Load dim_calendar data
        dim_calendar_pandas = dim_calendar.toPandas()
        for _, row in dim_calendar_pandas.iterrows():
            cursor.execute("""
                INSERT INTO dim_calendar (date_key, year, month, day, quarter, day_of_week, month_name, day_name, created_date) 
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
            """, (row['date_key'], row['year'], row['month'], row['day'], row['quarter'], row['day_of_week'], row['month_name'], row['day_name'], row['created_date']))
        
        conn.commit()
        cursor.close()
        conn.close()
        
        logger.info(f"Successfully loaded dim_product ({len(dim_product_pandas)} records) and dim_calendar ({len(dim_calendar_pandas)} records) to Redshift")
        
    except Exception as e:
        logger.error(f"Error loading additional dimensions to Redshift: {str(e)}")
        raise

# Load additional dimension tables
try:
    load_additional_dimensions_to_redshift(redshift_config)
    logger.info("All dimension tables loaded to Redshift successfully")
    
except Exception as e:
    logger.warning(f"Additional dimension loading failed (this is optional): {str(e)}")
    logger.info("Main data tables are still available in Redshift and Delta Lake")


INFO:py4j.clientserver:Received command c on object id p1
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clie

In [0]:
# Data Processing Summary
print("=" * 60)
print("DATA PROCESSING COMPLETED SUCCESSFULLY")
print("=" * 60)

print(f"Customer Dimension: {dim_customer.count()} records")
print(f"Product Dimension: {dim_product.count()} records") 
print(f"Fact Transactions: {fact_transactions.count()} records")
print(f"Customer Segments: {customer_segments.count()} customers")

print("\n🔧 Business Rules Applied:")
print(f"• High-value customer threshold: ${BUSINESS_RULES['high_value_customer_threshold']:,}")
print(f"• Refund restocking fee: {BUSINESS_RULES['refund_restocking_fee_rate']*100}%")
print(f"• Large order threshold: ${BUSINESS_RULES['large_order_threshold']:,}")

print("\nCustomer Segments:")
customer_segments.groupBy("customer_segment").count().show()

print("\nTransaction Types:")
fact_transactions.groupBy("transaction_type").count().show()

print("\nSales by Territory:")
fact_transactions.filter(col("transaction_type") == "SALE") \
    .groupBy("territory") \
    .agg(sum("net_amount").alias("total_revenue")) \
    .orderBy(desc("total_revenue")) \
    .show()

print("Data available in Delta Lake and Redshift (if configured)")
print(f"Delta Lake location: {PROCESSED_DATA_PATH}")



DATA PROCESSING COMPLETED SUCCESSFULLY


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0


📊 Customer Dimension: 2000 records


INFO:py4j.clientserver:Received command c on object id p0


📦 Product Dimension: 109 records


INFO:py4j.clientserver:Received command c on object id p0


💰 Fact Transactions: 2823 records


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0


👥 Customer Segments: 1 customers

🔧 Business Rules Applied:
• High-value customer threshold: $10,000
• Refund restocking fee: 10.0%
• Large order threshold: $5,000

📈 Customer Segments:


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0


+----------------+-----+
|customer_segment|count|
+----------------+-----+
|      High Value|    1|
+----------------+-----+


💳 Transaction Types:


INFO:py4j.clientserver:Received command c on object id p0


+----------------+-----+
|transaction_type|count|
+----------------+-----+
|            SALE| 2823|
+----------------+-----+


🌍 Sales by Territory:


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.java_gateway:Callback Server Shutting Down
INFO:py4j.clientserver:Closing down clientserver connection


+---------+-------------+
|territory|total_revenue|
+---------+-------------+
|   Medium|     138400.0|
|    Small|     128200.0|
|    Large|      15700.0|
+---------+-------------+

✅ Data available in Delta Lake and Redshift (if configured)
📁 Delta Lake location: s3://psycho-bunny-data-lake/processed-data/


INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clientserver:Received command c on object id p0
INFO:py4j.clie

In [0]:
# Load processed data from Delta Lake
try:
    fact_transactions = spark.read.format("delta").load(f"{PROCESSED_DATA_PATH}fact_transactions")
    dim_customer = spark.read.format("delta").load(f"{PROCESSED_DATA_PATH}dim_customer")
    dim_product = spark.read.format("delta").load(f"{PROCESSED_DATA_PATH}dim_product")
    calendar_df = spark.read.format("delta").load(f"{RAW_DATA_PATH}calendar")
    
    logger.info(f"Loaded data: {fact_transactions.count()} transactions, {dim_customer.count()} customers")
    
except Exception as e:
    logger.error(f"❌ Error loading data: {str(e)}")
    raise
