In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.clustering import KMeans
from pyspark import StorageLevel
import urllib.parse
import numpy as np
import pandas as pd
import time
import os
import pyodbc


In [2]:
jar_path = "/media/softsuave/DATA-HDD/DataEngineering/Apache_Airflow/Jars/mssql-jdbc-12.2.0.jre8.jar"

try:
    spark.stop()  # pyright: ignore[reportUndefinedVariable]
except:
    pass

spark = SparkSession.builder \
    .appName("ECommerce_ETL_Database_Storage") \
    .config("spark.jars", jar_path) \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.sql.adaptive.localShuffleReader.enabled", "true") \
    .config("spark.sql.adaptive.dynamicPartitionPruning.enabled", "true") \
    .config("spark.sql.cbo.enabled", "true") \
    .config("spark.sql.cbo.joinReorder.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()


# Verify
print("✅ Spark JAR config:", spark.sparkContext.getConf().get("spark.jars"))
print(f"Spark Version: {spark.version}")
print("✅ JDBC driver loaded")



25/09/06 18:56:44 WARN Utils: Your hostname, softsuave-ASUS-EXPERTCENTER-D700ME-D500ME resolves to a loopback address: 127.0.1.1; using 192.168.6.3 instead (on interface eno2)
25/09/06 18:56:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/09/06 18:56:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/06 18:56:45 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


✅ Spark JAR config: /media/softsuave/DATA-HDD/DataEngineering/Apache_Airflow/Jars/mssql-jdbc-12.2.0.jre8.jar
Spark Version: 3.4.2
✅ JDBC driver loaded


In [3]:
ODBC_CONFIG = {
    "source_db": {
        "server": "localhost",
        "database": "ecom_db",
        "user": "sa",
        "password": "Gova#ss123",
        "driver": "ODBC Driver 17 for SQL Server"
    },
    "target_db": {
        "server": "localhost", 
        "database": "ecom_analytics",
        "user": "sa",
        "password": "Gova#ss123",
        "driver": "ODBC Driver 17 for SQL Server"
    }
}

In [4]:
def build_odbc_url(config, database_override=None):
    """Build ODBC connection string for pyodbc"""
    db_name = database_override if database_override else config["database"]
    
    odbc_string = (
        f"DRIVER={{{config['driver']}}};"
        f"SERVER={config['server']};"
        f"DATABASE={db_name};"
        f"UID={config['user']};"
        f"PWD={config['password']};"
        f"Encrypt=yes;"
        f"TrustServerCertificate=yes;"
        f"Connection Timeout=30;"
    )
    
    return odbc_string


In [5]:
SOURCE_ODBC_CONN = {
    "server": ODBC_CONFIG["source_db"]["server"],
    "database": ODBC_CONFIG["source_db"]["database"],
    "user": ODBC_CONFIG["source_db"]["user"],
    "password": ODBC_CONFIG["source_db"]["password"],
    "driver": "ODBC Driver 17 for SQL Server",  # <-- ODBC driver
    "encrypt": "yes",
    "trustServerCertificate": "yes",
    "timeout": 30
}

TARGET_ODBC_CONN = {
    "server": ODBC_CONFIG["target_db"]["server"],
    "database": ODBC_CONFIG["target_db"]["database"],
    "user": ODBC_CONFIG["target_db"]["user"],
    "password": ODBC_CONFIG["target_db"]["password"],
    "driver": "ODBC Driver 17 for SQL Server",  # <-- ODBC driver
    "encrypt": "yes",
    "trustServerCertificate": "yes",
    "timeout": 30
}


In [6]:
def get_odbc_connection(config, database_override=None, autocommit=False):
    """Get direct ODBC connection for DDL operations"""
    db_name = database_override if database_override else config["database"]
    
    connection_string = (
        f"DRIVER={{{config['driver']}}};"
        f"SERVER={config['server']};"
        f"DATABASE={db_name};"
        f"UID={config['user']};"
        f"PWD={config['password']};"
        f"Encrypt=yes;"
        f"TrustServerCertificate=yes;"
    )
    
    return pyodbc.connect(connection_string, autocommit=autocommit)

# URLs for Spark JDBC operations
SOURCE_URL = build_odbc_url(ODBC_CONFIG["source_db"])
TARGET_URL = build_odbc_url(ODBC_CONFIG["target_db"])

In [7]:
# PySpark Data Loading Functions
def load_spark_tables():
    """Load all tables as Spark DataFrames using JDBC"""
    
    # JDBC connection properties
    jdbc_url = "jdbc:sqlserver://localhost:1433;databaseName=ecom_db;encrypt=true;trustServerCertificate=true"
    connection_properties = {
        "user": "sa",
        "password": "Gova#ss123",
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    }
    
    tables = {}
    table_names = ["customers", "orders", "order_items", "products", "inventory", "clickstream"]
    
    for table_name in table_names:
        try:
            print(f"Loading {table_name}...")
            df = spark.read.jdbc(jdbc_url, table_name, properties=connection_properties)
            tables[table_name] = df
            print(f"✅ Loaded {table_name}: {df.count():,} records")
        except Exception as e:
            print(f"❌ Error loading {table_name}: {e}")
    
    return tables

# Load all data as Spark DataFrames
raw_tables = load_spark_tables()
print("RAW TABLES", raw_tables)

# Extract individual DataFrames
customers = raw_tables["customers"]
orders = raw_tables["orders"] 
order_items = raw_tables["order_items"]
products = raw_tables["products"]
inventory = raw_tables["inventory"]
clickstream = raw_tables["clickstream"]

print("✅ All tables loaded as Spark DataFrames")


Loading customers...


                                                                                

✅ Loaded customers: 1,000 records
Loading orders...
✅ Loaded orders: 5,000 records
Loading order_items...
✅ Loaded order_items: 8,716 records
Loading products...
✅ Loaded products: 500 records
Loading inventory...
✅ Loaded inventory: 500 records
Loading clickstream...
✅ Loaded clickstream: 47,851 records
RAW TABLES {'customers': DataFrame[customer_id: int, first_name: string, last_name: string, email: string, phone: string, country: string, state: string, city: string, postal_code: string, street: string, signup_date: timestamp, last_login: timestamp, segment: string, lifetime_value: decimal(14,2)], 'orders': DataFrame[order_id: int, order_uuid: string, customer_id: int, order_date: timestamp, status: string, payment_method: string, subtotal: decimal(12,2), shipping: decimal(10,2), tax: decimal(10,2), discount: decimal(10,2), total_amount: decimal(12,2), placed_via: string], 'order_items': DataFrame[order_item_id: int, order_id: int, product_id: int, sku: string, quantity: int, unit_pr

In [8]:
# PySpark RFM Analysis
def calculate_rfm_analysis():
    """Calculate RFM scores with advanced segmentation using PySpark"""
    
    print("Calculating RFM Analysis...")
    
    # Get analysis date
    analysis_date = orders.select(max("order_date")).collect()[0][0]
    
    # Calculate RFM base metrics
    rfm_base = orders.filter(col("status") != "Cancelled") \
        .groupBy("customer_id") \
        .agg(
            max("order_date").alias("last_order_date"),
            sum("total_amount").alias("monetary"),
            count("order_id").alias("frequency")
        )
    
    # Calculate recency
    rfm_metrics = rfm_base.withColumn(
        "recency", 
        datediff(lit(analysis_date), col("last_order_date"))
    )
    
    # Calculate RFM scores (1-5 scale)
    rfm_scores = rfm_metrics.withColumn(
        "r_score", 
        when(col("recency") <= 30, 5)
        .when(col("recency") <= 60, 4)
        .when(col("recency") <= 90, 3)
        .when(col("recency") <= 180, 2)
        .otherwise(1)
    ).withColumn(
        "f_score",
        when(col("frequency") >= 20, 5)
        .when(col("frequency") >= 10, 4)
        .when(col("frequency") >= 5, 3)
        .when(col("frequency") >= 2, 2)
        .otherwise(1)
    ).withColumn(
        "m_score",
        when(col("monetary") >= 2000, 5)
        .when(col("monetary") >= 1000, 4)
        .when(col("monetary") >= 500, 3)
        .when(col("monetary") >= 100, 2)
        .otherwise(1)
    )
    
    # Create RFM segments
    rfm_segments = rfm_scores.withColumn(
        "rfm_segment",
        when((col("r_score") >= 4) & (col("f_score") >= 4) & (col("m_score") >= 4), "Champions")
        .when((col("r_score") >= 3) & (col("f_score") >= 3) & (col("m_score") >= 3), "Loyal Customers")
        .when((col("r_score") >= 4) & (col("f_score") <= 2), "New Customers")
        .when((col("r_score") >= 3) & (col("f_score") >= 2) & (col("m_score") <= 2), "Potential Loyalists")
        .when((col("r_score") <= 2) & (col("f_score") >= 3) & (col("m_score") >= 3), "At Risk")
        .when((col("r_score") <= 2) & (col("f_score") >= 2) & (col("m_score") >= 2), "Cannot Lose Them")
        .when((col("r_score") <= 2) & (col("f_score") <= 2) & (col("m_score") <= 2), "Lost")
        .otherwise("Others")
    )
    
    return rfm_segments

# Execute RFM Analysis
rfm_analysis = calculate_rfm_analysis()
rfm_analysis.show(10)


Calculating RFM Analysis...
+-----------+--------------------+--------+---------+-------+-------+-------+-------+---------------+
|customer_id|     last_order_date|monetary|frequency|recency|r_score|f_score|m_score|    rfm_segment|
+-----------+--------------------+--------+---------+-------+-------+-------+-------+---------------+
|        881|2025-05-13 18:18:...| 8328.90|        5|    116|      2|      3|      5|        At Risk|
|         14|2025-08-02 22:00:...| 6281.13|       11|     35|      4|      4|      5|      Champions|
|         18|2025-08-12 18:18:...| 2885.38|        4|     25|      5|      2|      5|  New Customers|
|        300|2024-10-28 03:00:...|   49.62|        1|    313|      1|      1|      1|           Lost|
|         38|2025-09-03 20:43:...| 2478.07|        1|      3|      5|      1|      5|  New Customers|
|        440|2025-09-01 09:35:...| 5905.81|       11|      5|      5|      4|      5|      Champions|
|        677|2025-08-27 20:43:...| 7197.70|       16| 

In [9]:
# PySpark Cohort Analysis
def cohort_analysis():
    """Perform cohort analysis using PySpark"""
    
    print("Performing Cohort Analysis...")
    
    # Get first order date for each customer
    customer_cohorts = orders.filter(col("status") != "Cancelled") \
        .groupBy("customer_id") \
        .agg(min("order_date").alias("first_order_date"))
    
    # Create cohort orders with first order date
    cohort_orders = orders.filter(col("status") != "Cancelled") \
        .join(customer_cohorts, "customer_id") \
        .select(
            "customer_id",
            "order_date",
            "first_order_date",
            "total_amount"
        )
    
    # Calculate cohort period and order period
    cohort_data = cohort_orders.withColumn(
        "cohort_period", 
        date_format(col("first_order_date"), "yyyy-MM")
    ).withColumn(
        "order_period",
        date_format(col("order_date"), "yyyy-MM")
    ).withColumn(
        "period_number",
        months_between(
            to_date(col("order_period"), "yyyy-MM"),
            to_date(col("cohort_period"), "yyyy-MM")
        )
    )
    
    # Calculate cohort metrics
    cohort_metrics = cohort_data.groupBy("cohort_period", "period_number") \
        .agg(
            countDistinct("customer_id").alias("customers"),
            sum("total_amount").alias("revenue")
        ) \
        .orderBy("cohort_period", "period_number")
    
    return cohort_metrics

# Execute Cohort Analysis
cohort_data = cohort_analysis()
cohort_data.show(20)


Performing Cohort Analysis...
+-------------+-------------+---------+-------+
|cohort_period|period_number|customers|revenue|
+-------------+-------------+---------+-------+
|      2022-09|          0.0|        2| 443.34|
|      2022-09|          1.0|        2|2902.00|
|      2022-09|          6.0|        1| 197.35|
|      2022-09|          7.0|        1|  80.26|
|      2022-09|          8.0|        1|2152.49|
|      2022-09|         11.0|        1| 132.31|
|      2022-09|         16.0|        1| 281.05|
|      2022-09|         17.0|        1|1343.54|
|      2022-09|         18.0|        1| 352.92|
|      2022-09|         21.0|        1| 789.91|
|      2022-09|         22.0|        1| 239.80|
|      2022-09|         29.0|        1|5154.49|
|      2022-09|         30.0|        1| 447.98|
|      2022-09|         32.0|        2|3405.37|
|      2022-09|         35.0|        1|1937.24|
|      2022-09|         36.0|        1|2398.61|
|      2022-10|          0.0|        3|1302.80|
|      202

In [10]:
# PySpark Customer Features and ML Clustering
def create_customer_features():
    """Create comprehensive customer features using PySpark"""
    
    print("Creating Customer Features...")
    
    # Customer order patterns
    customer_patterns = orders.filter(col("status") != "Cancelled") \
        .groupBy("customer_id") \
        .agg(
            count("order_id").alias("total_orders"),
            sum("total_amount").alias("total_spent"),
            avg("total_amount").alias("avg_order_value"),
            max("order_date").alias("last_order_date"),
            min("order_date").alias("first_order_date"),
            countDistinct("order_date").alias("unique_order_days"),
            stddev("total_amount").alias("order_value_std")
        )
    
    # Calculate additional features
    customer_features = customer_patterns.withColumn(
        "days_since_last_order",
        datediff(current_date(), col("last_order_date"))
    ).withColumn(
        "customer_lifespan_days",
        datediff(col("last_order_date"), col("first_order_date"))
    ).withColumn(
        "order_frequency",
        when(col("customer_lifespan_days") > 0, 
             col("total_orders") / col("customer_lifespan_days") * 30)
        .otherwise(0)
    )
    
    # Join with customer demographics
    customer_demographics = customers.select(
        "customer_id", "country", "state", "city", "segment"
    )
    
    final_features = customer_features.join(customer_demographics, "customer_id", "left")
    
    return final_features

def ml_customer_clustering(customer_features, rfm_analysis):
    """Perform ML clustering using PySpark MLlib"""
    
    print("Performing ML Customer Clustering...")
    
    # Prepare features for clustering
    feature_columns = [
        "total_orders", "total_spent", "avg_order_value", 
        "days_since_last_order", "order_frequency", "order_value_std"
    ]
    
    # Handle null values
    clustering_data = customer_features.fillna(0, subset=feature_columns)
    
    # Assemble features
    assembler = VectorAssembler(
        inputCols=feature_columns,
        outputCol="features"
    )
    
    feature_vector = assembler.transform(clustering_data)
    
    # Standardize features
    scaler = StandardScaler(
        inputCol="features",
        outputCol="scaled_features",
        withStd=True,
        withMean=True
    )
    
    scaler_model = scaler.fit(feature_vector)
    scaled_data = scaler_model.transform(feature_vector)
    
    # K-means clustering
    kmeans = KMeans(
        featuresCol="scaled_features",
        predictionCol="cluster",
        k=5,
        seed=42
    )
    
    model = kmeans.fit(scaled_data)
    clustered_data = model.transform(scaled_data)
    
    # Add cluster centers
    cluster_centers = model.clusterCenters()
    print(f"Cluster Centers: {len(cluster_centers)}")
    
    return clustered_data

# Execute Customer Features and Clustering
customer_features = create_customer_features()
customer_clusters = ml_customer_clustering(customer_features, rfm_analysis)

print("✅ Customer Analytics Complete")
customer_clusters.select("customer_id", "cluster", "total_orders", "total_spent").show(10)


Creating Customer Features...
Performing ML Customer Clustering...


25/09/06 18:57:18 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


Cluster Centers: 5
✅ Customer Analytics Complete
+-----------+-------+------------+-----------+
|customer_id|cluster|total_orders|total_spent|
+-----------+-------+------------+-----------+
|        881|      4|           5|    8328.90|
|         14|      2|          11|    6281.13|
|         18|      1|           4|    2885.38|
|        300|      0|           1|      49.62|
|         38|      4|           1|    2478.07|
|        440|      2|          11|    5905.81|
|        677|      2|          16|    7197.70|
|        644|      4|           5|    6979.64|
|        857|      2|          11|    7968.51|
|        802|      4|           6|    9939.54|
+-----------+-------+------------+-----------+
only showing top 10 rows



In [None]:
# PySpark Data Saving Functions
def save_to_database_spark(df, table_name):
    """Save Spark DataFrame to SQL Server using JDBC"""
    
    jdbc_url = jdbc_url = "jdbc:sqlserver://localhost:1433;databaseName=ecom_db;encrypt=true;trustServerCertificate=true"
    connection_properties = {
        "user": "sa",
        "password": "Gova#ss123",
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    }
    
    try:
        # Write to database
        df.write \
            .mode("overwrite") \
            .jdbc(jdbc_url, table_name, properties=connection_properties)
        
        print(f"✅ Saved {table_name}: {df.count():,} records")
        
    except Exception as e:
        print(f"❌ Error saving {table_name}: {e}")

# Save all analytics results
print("Saving Analytics Results...")
save_to_database_spark(rfm_analysis, "rfm_analysis")
save_to_database_spark(cohort_data, "cohort_analysis") 
save_to_database_spark(customer_features, "customer_features")  
save_to_database_spark(customer_clusters, "customer_clusters")

print("✅ All analytics saved to database")


Saving Analytics Results...
✅ Saved rfm_analysis: 922 records
✅ Saved cohort_analysis: 651 records
✅ Saved customer_features: 922 records
❌ Error saving customer_clusters: Can't get JDBC type for struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
✅ All analytics saved to database


In [None]:
# Complete PySpark ETL Pipeline - Execute All Analytics
def advanced_customer_analytics():
    """Complete customer analytics pipeline using PySpark"""
    
    print("="*80)
    print("🚀 Starting Advanced Customer Analytics with PySpark")
    print("="*80)
    
    # 1. RFM Analysis
    print("\n📊 Calculating RFM Analysis...")
    analysis_date = orders.select(max("order_date")).collect()[0][0]
    
    rfm_base = orders.filter(col("status") != "Cancelled") \
        .groupBy("customer_id") \
        .agg(
            max("order_date").alias("last_order_date"),
            sum("total_amount").alias("monetary"),
            count("order_id").alias("frequency")
        )
    
    rfm_metrics = rfm_base.withColumn(
        "recency", 
        datediff(lit(analysis_date), col("last_order_date"))
    )
    
    rfm_scores = rfm_metrics.withColumn(
        "r_score", 
        when(col("recency") <= 30, 5)
        .when(col("recency") <= 60, 4)
        .when(col("recency") <= 90, 3)
        .when(col("recency") <= 180, 2)
        .otherwise(1)
    ).withColumn(
        "f_score",
        when(col("frequency") >= 20, 5)
        .when(col("frequency") >= 10, 4)
        .when(col("frequency") >= 5, 3)
        .when(col("frequency") >= 2, 2)
        .otherwise(1)
    ).withColumn(
        "m_score",
        when(col("monetary") >= 2000, 5)
        .when(col("monetary") >= 1000, 4)
        .when(col("monetary") >= 500, 3)
        .when(col("monetary") >= 100, 2)
        .otherwise(1)
    )
    
    rfm_analysis = rfm_scores.withColumn(
        "rfm_segment",
        when((col("r_score") >= 4) & (col("f_score") >= 4) & (col("m_score") >= 4), "Champions")
        .when((col("r_score") >= 3) & (col("f_score") >= 3) & (col("m_score") >= 3), "Loyal Customers")
        .when((col("r_score") >= 4) & (col("f_score") <= 2), "New Customers")
        .when((col("r_score") >= 3) & (col("f_score") >= 2) & (col("m_score") <= 2), "Potential Loyalists")
        .when((col("r_score") <= 2) & (col("f_score") >= 3) & (col("m_score") >= 3), "At Risk")
        .when((col("r_score") <= 2) & (col("f_score") >= 2) & (col("m_score") >= 2), "Cannot Lose Them")
        .when((col("r_score") <= 2) & (col("f_score") <= 2) & (col("m_score") <= 2), "Lost")
        .otherwise("Others")
    )
    
    print("✅ RFM Analysis Complete")
    rfm_analysis.show(5)
    
    # 2. Cohort Analysis
    print("\n📈 Performing Cohort Analysis...")
    customer_cohorts = orders.filter(col("status") != "Cancelled") \
        .groupBy("customer_id") \
        .agg(min("order_date").alias("first_order_date"))
    
    cohort_orders = orders.filter(col("status") != "Cancelled") \
        .join(customer_cohorts, "customer_id") \
        .select("customer_id", "order_date", "first_order_date", "total_amount")
    
    cohort_data = cohort_orders.withColumn(
        "cohort_period", 
        date_format(col("first_order_date"), "yyyy-MM")
    ).withColumn(
        "order_period",
        date_format(col("order_date"), "yyyy-MM")
    ).withColumn(
        "period_number",
        months_between(
            to_date(col("order_period"), "yyyy-MM"),
            to_date(col("cohort_period"), "yyyy-MM")
        )
    )
    
    cohort_analysis = cohort_data.groupBy("cohort_period", "period_number") \
        .agg(
            countDistinct("customer_id").alias("customers"),
            sum("total_amount").alias("revenue")
        ) \
        .orderBy("cohort_period", "period_number")
    
    print("✅ Cohort Analysis Complete")
    cohort_analysis.show(5)
    
    # 3. Customer Features
    print("\n🔍 Creating Customer Features...")
    customer_patterns = orders.filter(col("status") != "Cancelled") \
        .groupBy("customer_id") \
        .agg(
            count("order_id").alias("total_orders"),
            sum("total_amount").alias("total_spent"),
            avg("total_amount").alias("avg_order_value"),
            max("order_date").alias("last_order_date"),
            min("order_date").alias("first_order_date"),
            countDistinct("order_date").alias("unique_order_days"),
            stddev("total_amount").alias("order_value_std")
        )
    
    customer_features = customer_patterns.withColumn(
        "days_since_last_order",
        datediff(current_date(), col("last_order_date"))
    ).withColumn(
        "customer_lifespan_days",
        datediff(col("last_order_date"), col("first_order_date"))
    ).withColumn(
        "order_frequency",
        when(col("customer_lifespan_days") > 0, 
             col("total_orders") / col("customer_lifespan_days") * 30)
        .otherwise(0)
    )
    
    customer_demographics = customers.select(
        "customer_id", "country", "state", "city", "segment"
    )
    
    customer_features_final = customer_features.join(customer_demographics, "customer_id", "left")
    
    print("✅ Customer Features Complete")
    customer_features_final.show(5)
    
    # 4. ML Clustering
    print("\n🤖 Performing ML Clustering...")
    feature_columns = [
        "total_orders", "total_spent", "avg_order_value", 
        "days_since_last_order", "order_frequency", "order_value_std"
    ]
    
    clustering_data = customer_features_final.fillna(0, subset=feature_columns)
    
    assembler = VectorAssembler(
        inputCols=feature_columns,
        outputCol="features"
    )
    
    feature_vector = assembler.transform(clustering_data)
    
    scaler = StandardScaler(
        inputCol="features",
        outputCol="scaled_features",
        withStd=True,
        withMean=True
    )
    
    scaler_model = scaler.fit(feature_vector)
    scaled_data = scaler_model.transform(feature_vector)
    
    kmeans = KMeans(
        featuresCol="scaled_features",
        predictionCol="cluster",
        k=5,
        seed=42
    )
    
    model = kmeans.fit(scaled_data)
    customer_clusters = model.transform(scaled_data)
    
    print("✅ ML Clustering Complete")
    customer_clusters.select("customer_id", "cluster", "total_orders", "total_spent").show(5)
    
    # 5. Save to Database
    print("\n💾 Saving Results to Database...")
    
    def save_to_database_spark(df, table_name):
        jdbc_url = "jdbc:sqlserver://localhost:1433;databaseName=ecom_analytics"
        connection_properties = {
            "user": "sa",
            "password": "Gova#ss123",
            "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        }
        
        try:
            df.write.mode("overwrite").jdbc(jdbc_url, table_name, properties=connection_properties)
            print(f"✅ Saved {table_name}: {df.count():,} records")
        except Exception as e:
            print(f"❌ Error saving {table_name}: {e}")
    
    save_to_database_spark(rfm_analysis, "rfm_analysis")
    save_to_database_spark(cohort_analysis, "cohort_analysis") 
    save_to_database_spark(customer_features_final, "customer_features")
    save_to_database_spark(customer_clusters, "customer_clusters")
    
    print("\n🎉 Customer Analytics Pipeline Complete!")
    print("="*80)
    
    return {
        "rfm_analysis": rfm_analysis,
        "cohort_analysis": cohort_analysis,
        "customer_features": customer_features_final,
        "customer_clusters": customer_clusters
    }

# Execute the complete pipeline
results = advanced_customer_analytics()


🚀 Starting Advanced Customer Analytics with PySpark

📊 Calculating RFM Analysis...


NameError: name 'orders' is not defined

# SECTION 1: DATABASE SCHEMA AND TABLE CREATION

In [None]:
# SIMPLIFIED ETL: Enhance existing tables instead of creating many new ones
print("="*60)
print("🚀 SIMPLIFIED ETL: Enhancing existing tables with analytics")
print("="*60)

# 1. Enhance customers table with RFM and clustering data
print("\n📊 Adding RFM and clustering to customers table...")
customers_enhanced = tables["customers"].join(
    rfm_analysis.select("customer_id", "recency_score", "frequency_score", "monetary_score", "rfm_segment"),
    "customer_id", "left"
).join(
    customer_clusters.select("customer_id", "cluster", "cluster_name"),
    "customer_id", "left"
)

# 2. Enhance orders table with customer features
print("📊 Adding customer features to orders table...")
orders_enhanced = tables["orders"].join(
    customer_features.select("customer_id", "total_orders", "total_spent", "avg_order_value", "days_since_last_order"),
    "customer_id", "left"
)

# 3. Function to save enhanced tables
def save_enhanced_table(df, table_name):
    try:
        df.write \
            .format("jdbc") \
            .option("url", jdbc_url) \
            .option("dbtable", table_name) \
            .option("user", connection_properties["user"]) \
            .option("password", connection_properties["password"]) \
            .option("driver", connection_properties["driver"]) \
            .mode("overwrite") \
            .save()
        print(f"✅ Enhanced {table_name}: {df.count():,} records")
    except Exception as e:
        print(f"❌ Error enhancing {table_name}: {e}")

# 4. Save enhanced tables back to original table names
print("\n💾 Saving enhanced tables...")
save_enhanced_table(customers_enhanced, "customers")
save_enhanced_table(orders_enhanced, "orders")

print("\n🎉 ETL Complete! Tables enhanced with analytics data")
print("="*60)


In [None]:
def create_analytics_database_and_schema():
    """Create analytics database and schema using ODBC"""
    
    print("Creating analytics database and schema...")
    
    try:
        # --------------------------
        # Step 1: Create database
        # --------------------------
        master_conn = get_odbc_connection(
            ODBC_CONFIG["target_db"], 
            database_override="master", 
            autocommit=True  # <--- important for CREATE DATABASE
        )
        cursor = master_conn.cursor()
        create_db_sql = """
        IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = 'ecom_analytics')
        BEGIN
            CREATE DATABASE ecom_analytics;
        END
        """
        cursor.execute(create_db_sql)
        cursor.close()
        master_conn.close()
        print("✅ Analytics database created/verified")
        
        # --------------------------
        # Step 2: Create schema
        # --------------------------
        analytics_conn = get_odbc_connection(
            ODBC_CONFIG["target_db"], 
            autocommit=False  # schema can run in a transaction
        )
        cursor = analytics_conn.cursor()
        create_schema_sql = """
        IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = 'analytics')
        BEGIN
            EXEC('CREATE SCHEMA analytics');
        END
        """
        cursor.execute(create_schema_sql)
        analytics_conn.commit()
        cursor.close()
        analytics_conn.close()
        print("✅ Analytics schema created/verified")
        
    except Exception as e:
        print(f"❌ Error creating database/schema: {e}")


In [None]:
def create_table_if_not_exists(table_name, create_sql):
    """Helper function to create table if it doesn't exist using ODBC"""
    
    try:
        # DDL safe with autocommit
        conn = get_odbc_connection(ODBC_CONFIG["target_db"], autocommit=True)
        cursor = conn.cursor()
        
        # Check if table exists
        check_sql = """
        SELECT COUNT(*) FROM sys.tables t 
        JOIN sys.schemas s ON t.schema_id = s.schema_id 
        WHERE s.name = 'analytics' AND t.name = ?
        """
        cursor.execute(check_sql, table_name)
        exists = cursor.fetchone()[0] > 0
        
        if not exists:
            cursor.execute(create_sql)  # Must be fully formatted string
            print(f"✅ Created table analytics.{table_name}")
        else:
            print(f"✅ Table analytics.{table_name} already exists")
            
        cursor.close()
        conn.close()
        
    except Exception as e:
        print(f"❌ Error creating table {table_name}: {e}")


In [None]:
def save_to_database_spark(df, table_name, mode="overwrite"):
    """Save DataFrame to database table using Spark JDBC"""
    
    try:
        df_optimized = df.coalesce(10) if df.rdd.getNumPartitions() > 10 else df
        
        df_optimized.write \
            .format("jdbc") \
            .option("url", TARGET_URL) \
            .option("dbtable", f"analytics.{table_name}") \
            .option("user", TARGET_ODBC_CONN["user"]) \
            .option("password", TARGET_ODBC_CONN["password"]) \
            .option("driver", TARGET_ODBC_CONN["driver"]) \
            .option("batchsize", "5000") \
            .option("isolationLevel", "READ_UNCOMMITTED") \
            .mode(mode) \
            .save()
        
        print(f"✅ Saved {df.count():,} records to analytics.{table_name}")
        return True
        
    except Exception as e:
        print(f"❌ Error saving to {table_name}: {e}")
        return False


In [None]:
def read_from_database_spark_jdbc(table_name, columns="*", predicates=None):
    """Read DataFrame from database table using Spark JDBC"""
    
    try:
        dbtable = f"(SELECT {columns} FROM analytics.{table_name}) AS temp" \
                  if columns != "*" else f"analytics.{table_name}"
        
        df_reader = spark.read.format("jdbc") \
            .option("url", TARGET_URL) \
            .option("dbtable", dbtable) \
            .option("user", TARGET_ODBC_CONN["user"]) \
            .option("password", TARGET_ODBC_CONN["password"]) \
            .option("driver", TARGET_ODBC_CONN["driver"])
        
        if predicates:
            df_reader = df_reader.option("predicate", predicates)
        
        df = df_reader.load()
        print(f"✅ Read {df.count():,} records from analytics.{table_name}")
        return df
        
    except Exception as e:
        print(f"❌ Error reading from {table_name}: {e}")
        return None


In [None]:
create_analytics_database_and_schema()

Creating analytics database and schema...
✅ Analytics database created/verified
✅ Analytics schema created/verified


# SECTION 2: CREATE ALL ANALYTICS TABLES WITH ODBC

In [None]:
def create_all_analytics_tables():
    """Create all analytics tables with proper schemas using ODBC"""
    
    print("Creating analytics tables...")
    
    # Customer Analytics Tables
    customer_features_sql = """
    CREATE TABLE analytics.customer_features (
        customer_id INT PRIMARY KEY,
        first_name NVARCHAR(60),
        last_name NVARCHAR(60),
        email NVARCHAR(255),
        country NVARCHAR(80),
        state NVARCHAR(80),
        city NVARCHAR(80),
        segment NVARCHAR(30),
        customer_tier NVARCHAR(20),
        total_orders INT,
        total_spent DECIMAL(14,2),
        avg_order_value DECIMAL(10,2),
        customer_age_days INT,
        days_since_last_order INT,
        order_frequency DECIMAL(10,6),
        payment_method_diversity INT,
        signup_to_first_order_days INT,
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    rfm_analysis_sql = """
    CREATE TABLE analytics.rfm_analysis (
        customer_id INT PRIMARY KEY,
        last_order_date DATETIME2,
        frequency INT,
        monetary_value DECIMAL(14,2),
        first_order_date DATETIME2,
        avg_order_value DECIMAL(10,2),
        recency INT,
        customer_lifespan INT,
        r_score INT,
        f_score INT,
        m_score INT,
        rfm_segment NVARCHAR(30),
        rfm_score INT,
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    cohort_analysis_sql = """
    CREATE TABLE analytics.cohort_analysis (
        cohort_month DATE,
        months_since_first_purchase INT,
        customers INT,
        orders INT,
        revenue DECIMAL(14,2),
        cohort_size INT,
        retention_rate DECIMAL(5,4),
        revenue_per_customer DECIMAL(10,2),
        created_date DATETIME2 DEFAULT GETDATE(),
        PRIMARY KEY (cohort_month, months_since_first_purchase)
    )
    """
    
    customer_clusters_sql = """
    CREATE TABLE analytics.customer_clusters (
        customer_id INT PRIMARY KEY,
        cluster INT,
        cluster_name NVARCHAR(50),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    geographic_analysis_sql = """
    CREATE TABLE analytics.geographic_analysis (
        id INT IDENTITY(1,1) PRIMARY KEY,
        country NVARCHAR(80),
        state NVARCHAR(80),
        city NVARCHAR(80),
        customer_count INT,
        avg_customer_value DECIMAL(10,2),
        total_market_value DECIMAL(14,2),
        avg_order_value DECIMAL(10,2),
        avg_orders_per_customer DECIMAL(8,2),
        market_penetration_score DECIMAL(14,2),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    # Product Analytics Tables
    product_performance_sql = """
    CREATE TABLE analytics.product_performance (
        product_id INT PRIMARY KEY,
        sku NVARCHAR(64),
        product_name NVARCHAR(255),
        category NVARCHAR(80),
        brand NVARCHAR(80),
        price DECIMAL(10,2),
        total_quantity_sold INT,
        total_revenue DECIMAL(14,2),
        total_line_items INT,
        unique_orders INT,
        unique_customers INT,
        avg_selling_price DECIMAL(10,2),
        min_selling_price DECIMAL(10,2),
        max_selling_price DECIMAL(10,2),
        price_volatility DECIMAL(10,2),
        return_rate DECIMAL(5,4),
        cancellation_rate DECIMAL(5,4),
        stock_level INT,
        reorder_threshold INT,
        revenue_per_unit DECIMAL(10,2),
        inventory_turnover DECIMAL(8,4),
        stockout_risk NVARCHAR(10),
        profit_margin DECIMAL(5,4),
        lifecycle_stage NVARCHAR(20),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    market_basket_analysis_sql = """
    CREATE TABLE analytics.market_basket_analysis (
        id INT IDENTITY(1,1) PRIMARY KEY,
        product_a INT,
        product_b INT,
        co_occurrence_count INT,
        support DECIMAL(8,6),
        confidence_a_to_b DECIMAL(5,4),
        confidence_b_to_a DECIMAL(5,4),
        lift DECIMAL(8,4),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    category_performance_sql = """
    CREATE TABLE analytics.category_performance (
        category NVARCHAR(80) PRIMARY KEY,
        total_products INT,
        category_quantity_sold BIGINT,
        category_revenue DECIMAL(14,2),
        avg_category_price DECIMAL(10,2),
        avg_return_rate DECIMAL(5,4),
        avg_inventory_turnover DECIMAL(8,4),
        brand_diversity INT,
        market_share DECIMAL(5,4),
        revenue_per_product DECIMAL(12,2),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    # Sales Analytics Tables
    sales_timeseries_sql = """
    CREATE TABLE analytics.sales_timeseries (
        order_date DATE PRIMARY KEY,
        daily_orders INT,
        daily_revenue DECIMAL(14,2),
        avg_order_value DECIMAL(10,2),
        unique_customers INT,
        total_shipping DECIMAL(10,2),
        total_tax DECIMAL(10,2),
        total_discounts DECIMAL(10,2),
        day_of_week INT,
        day_name NVARCHAR(10),
        month INT,
        quarter INT,
        year INT,
        is_weekend BIT,
        revenue_7d_ma DECIMAL(14,2),
        revenue_30d_ma DECIMAL(14,2),
        revenue_growth DECIMAL(8,4),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    channel_analysis_sql = """
    CREATE TABLE analytics.channel_analysis (
        placed_via NVARCHAR(64) PRIMARY KEY,
        total_orders INT,
        total_revenue DECIMAL(14,2),
        avg_order_value DECIMAL(10,2),
        unique_customers INT,
        total_discounts DECIMAL(10,2),
        revenue_per_customer DECIMAL(10,2),
        discount_rate DECIMAL(5,4),
        market_share DECIMAL(5,4),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    conversion_funnel_sql = """
    CREATE TABLE analytics.conversion_funnel (
        signup_month DATE PRIMARY KEY,
        signups INT,
        conversions INT,
        avg_days_to_convert DECIMAL(8,2),
        conversion_rate DECIMAL(5,4),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    customer_acquisition_sql = """
    CREATE TABLE analytics.customer_acquisition (
        acquisition_month DATE PRIMARY KEY,
        new_customers INT,
        new_customer_revenue DECIMAL(14,2),
        avg_first_order_value DECIMAL(10,2),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    # Clickstream Analytics Tables
    session_metrics_sql = """
    CREATE TABLE analytics.session_metrics (
        session_id NVARCHAR(64) PRIMARY KEY,
        customer_id INT,
        device NVARCHAR(40),
        page_views INT,
        unique_pages INT,
        products_viewed INT,
        session_start DATETIME2,
        session_end DATETIME2,
        session_duration_minutes DECIMAL(8,2),
        bounce BIT,
        traffic_source NVARCHAR(80),
        referrer NVARCHAR(255),
        completed_purchase BIT,
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    user_journey_flow_sql = """
    CREATE TABLE analytics.user_journey_flow (
        id INT IDENTITY(1,1) PRIMARY KEY,
        page NVARCHAR(80),
        next_page NVARCHAR(80),
        transition_count INT,
        total_from_page INT,
        transition_probability DECIMAL(5,4),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    user_behavior_segments_sql = """
    CREATE TABLE analytics.user_behavior_segments (
        customer_id INT PRIMARY KEY,
        total_clicks INT,
        total_sessions INT,
        unique_pages_visited INT,
        unique_products_viewed INT,
        devices_used INT,
        clicks_per_session DECIMAL(8,2),
        product_focus DECIMAL(5,4),
        source_diversity INT,
        engagement_level NVARCHAR(20),
        session_intensity NVARCHAR(20),
        shopping_focus NVARCHAR(30),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    # ML Features Tables
    clv_features_sql = """
    CREATE TABLE analytics.clv_features (
        customer_id INT PRIMARY KEY,
        total_orders INT,
        total_spent DECIMAL(14,2),
        avg_order_value DECIMAL(10,2),
        customer_age_days INT,
        days_since_last_order INT,
        order_frequency DECIMAL(10,6),
        r_score INT,
        f_score INT,
        m_score INT,
        high_value_customer BIT,
        orders_per_day DECIMAL(10,6),
        revenue_per_day DECIMAL(10,2),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    churn_features_sql = """
    CREATE TABLE analytics.churn_features (
        customer_id INT PRIMARY KEY,
        last_order_date DATETIME2,
        recent_orders INT,
        avg_recent_order_value DECIMAL(10,2),
        days_since_last_order INT,
        churn_risk BIT,
        total_orders INT,
        total_spent DECIMAL(14,2),
        avg_order_value DECIMAL(10,2),
        r_score INT,
        f_score INT,
        m_score INT,
        churn_risk_score DECIMAL(5,4),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    # Dashboard Tables
    executive_summary_sql = """
    CREATE TABLE analytics.executive_summary (
        metric_name NVARCHAR(50) PRIMARY KEY,
        metric_value DECIMAL(18,2),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    top_customers_sql = """
    CREATE TABLE analytics.top_customers (
        id INT IDENTITY(1,1) PRIMARY KEY,
        rank_position INT,
        customer_id INT,
        first_name NVARCHAR(60),
        last_name NVARCHAR(60),
        total_spent DECIMAL(14,2),
        total_orders INT,
        customer_tier NVARCHAR(20),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    top_products_sql = """
    CREATE TABLE analytics.top_products (
        id INT IDENTITY(1,1) PRIMARY KEY,
        rank_position INT,
        product_id INT,
        product_name NVARCHAR(255),
        category NVARCHAR(80),
        total_revenue DECIMAL(14,2),
        total_quantity_sold INT,
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    # Data Warehouse Tables
    dim_customer_sql = """
    CREATE TABLE analytics.dim_customer (
        customer_id INT PRIMARY KEY,
        first_name NVARCHAR(60),
        last_name NVARCHAR(60),
        email NVARCHAR(255),
        country NVARCHAR(80),
        state NVARCHAR(80),
        city NVARCHAR(80),
        segment NVARCHAR(30),
        customer_tier NVARCHAR(20),
        rfm_segment NVARCHAR(30),
        cluster_id INT,
        cluster_name NVARCHAR(50),
        total_orders INT,
        total_spent DECIMAL(14,2),
        effective_date DATE,
        is_current BIT,
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    dim_product_sql = """
    CREATE TABLE analytics.dim_product (
        product_id INT PRIMARY KEY,
        sku NVARCHAR(64),
        product_name NVARCHAR(255),
        category NVARCHAR(80),
        brand NVARCHAR(80),
        price DECIMAL(10,2),
        lifecycle_stage NVARCHAR(20),
        stockout_risk NVARCHAR(10),
        profit_margin DECIMAL(5,4),
        total_quantity_sold INT,
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    dim_date_sql = """
    CREATE TABLE analytics.dim_date (
        date DATE PRIMARY KEY,
        year INT,
        month INT,
        day INT,
        quarter INT,
        day_of_week INT,
        day_name NVARCHAR(10),
        month_name NVARCHAR(10),
        is_weekend BIT,
        is_month_end BIT,
        fiscal_year INT,
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    fact_sales_sql = """
    CREATE TABLE analytics.fact_sales (
        date DATE PRIMARY KEY,
        daily_orders INT,
        daily_revenue DECIMAL(14,2),
        avg_order_value DECIMAL(10,2),
        unique_customers INT,
        total_shipping DECIMAL(10,2),
        total_tax DECIMAL(10,2),
        total_discounts DECIMAL(10,2),
        revenue_7d_ma DECIMAL(14,2),
        revenue_30d_ma DECIMAL(14,2),
        revenue_growth DECIMAL(8,4),
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    fact_customer_behavior_sql = """
    CREATE TABLE analytics.fact_customer_behavior (
        id INT IDENTITY(1,1) PRIMARY KEY,
        customer_id INT,
        date DATE,
        total_page_views INT,
        total_products_viewed INT,
        avg_session_duration DECIMAL(8,2),
        total_bounces INT,
        total_sessions INT,
        created_date DATETIME2 DEFAULT GETDATE()
    )
    """
    
    # Create all tables
    tables_to_create = [
        ("customer_features", customer_features_sql),
        ("rfm_analysis", rfm_analysis_sql),
        ("cohort_analysis", cohort_analysis_sql),
        ("customer_clusters", customer_clusters_sql),
        ("geographic_analysis", geographic_analysis_sql),
        ("product_performance", product_performance_sql),
        ("market_basket_analysis", market_basket_analysis_sql),
        ("category_performance", category_performance_sql),
        ("sales_timeseries", sales_timeseries_sql),
        ("channel_analysis", channel_analysis_sql),
        ("conversion_funnel", conversion_funnel_sql),
        ("customer_acquisition", customer_acquisition_sql),
        ("session_metrics", session_metrics_sql),
        ("user_journey_flow", user_journey_flow_sql),
        ("user_behavior_segments", user_behavior_segments_sql),
        ("clv_features", clv_features_sql),
        ("churn_features", churn_features_sql),
        ("executive_summary", executive_summary_sql),
        ("top_customers", top_customers_sql),
        ("top_products", top_products_sql),
        ("dim_customer", dim_customer_sql),
        ("dim_product", dim_product_sql),
        ("dim_date", dim_date_sql),
        ("fact_sales", fact_sales_sql),
        ("fact_customer_behavior", fact_customer_behavior_sql)
    ]
    
    for table_name, create_sql in tables_to_create:
        create_table_if_not_exists(table_name, create_sql)
    
    print(f"✅ Created/verified {len(tables_to_create)} analytics tables")


In [None]:
create_all_analytics_tables()

Creating analytics tables...
✅ Table analytics.customer_features already exists
✅ Table analytics.rfm_analysis already exists
✅ Table analytics.cohort_analysis already exists
✅ Table analytics.customer_clusters already exists
✅ Table analytics.geographic_analysis already exists
✅ Table analytics.product_performance already exists
✅ Table analytics.market_basket_analysis already exists
✅ Table analytics.category_performance already exists
✅ Table analytics.sales_timeseries already exists
✅ Table analytics.channel_analysis already exists
✅ Table analytics.conversion_funnel already exists
✅ Table analytics.customer_acquisition already exists
✅ Table analytics.session_metrics already exists
✅ Table analytics.user_journey_flow already exists
✅ Table analytics.user_behavior_segments already exists
✅ Table analytics.clv_features already exists
✅ Table analytics.churn_features already exists
✅ Table analytics.executive_summary already exists
✅ Table analytics.top_customers already exists
✅ Tab

# SECTION 3: DATA EXTRACTION

In [None]:
def extract_data_from_source_pyodbc(conn_str):
    """Extract tables from SQL Server using pyodbc"""

    print("="*80)
    print("EXTRACTING DATA FROM SOURCE DATABASE (pyodbc)")
    print("="*80)
    
    extraction_config = {
        "customers": {"partition_column": "customer_id"},
        "products": {"partition_column": "product_id"},
        "inventory": {"partition_column": None},
        "orders": {"partition_column": "order_id"},
        "order_items": {"partition_column": "order_id"},
        "clickstream": {"partition_column": "click_id"}
    }
    
    raw_tables = {}
    
    # Connect once using the passed connection string
    with pyodbc.connect(conn_str) as conn:
        for table_name, config in extraction_config.items():
            try:
                sql = f"SELECT * FROM {table_name}"
                
                # Optional: you could implement batching/partitioning here
                df = pd.read_sql(sql, conn)
                
                print(f"✅ Extracted {table_name}: {len(df):,} records")
                raw_tables[table_name] = df
            except Exception as e:
                print(f"❌ Error extracting {table_name}: {e}")
    
    return raw_tables


In [None]:
source_conn_str = build_odbc_url(ODBC_CONFIG["source_db"])
raw_tables = extract_data_from_source_pyodbc(source_conn_str)




EXTRACTING DATA FROM SOURCE DATABASE (pyodbc)
✅ Extracted customers: 1,000 records
✅ Extracted products: 500 records
✅ Extracted inventory: 500 records
✅ Extracted orders: 5,000 records
✅ Extracted order_items: 8,716 records


  df = pd.read_sql(sql, conn)


✅ Extracted clickstream: 47,851 records


# SECTION 4: CUSTOMER ANALYTICS

In [None]:
def advanced_customer_analytics():
    """Comprehensive customer analytics with direct ODBC database storage"""
    
    print("\n" + "="*80)
    print("ADVANCED CUSTOMER ANALYTICS")
    print("="*80)
    
    customers = raw_tables["customers"]
    orders = raw_tables["orders"]
    order_items = raw_tables["order_items"]
    
    # RFM Analysis
    def calculate_rfm_analysis():
        """Calculate RFM scores with advanced segmentation"""
        
        print("Calculating RFM Analysis...")
        
        analysis_date = orders.select(max("order_date")).collect()[0][0]
        
        # Calculate RFM base metrics
        rfm_base = orders.filter(col("status") != "Cancelled") \
            .groupBy("customer_id") \
            .agg(
                max("order_date").alias("last_order_date"),
                count("order_id").alias("frequency"),
                sum("total_amount").alias("monetary_value"),
                min("order_date").alias("first_order_date"),
                avg("total_amount").alias("avg_order_value")
            ) \
            .withColumn("recency", datediff(lit(analysis_date), col("last_order_date"))) \
            .withColumn("customer_lifespan", datediff(col("last_order_date"), col("first_order_date")))
        
        # Calculate percentiles for scoring
        percentiles_query = rfm_base.select(
            expr("percentile_approx(recency, array(0.2, 0.4, 0.6, 0.8))").alias("r_percentiles"),
            expr("percentile_approx(frequency, array(0.2, 0.4, 0.6, 0.8))").alias("f_percentiles"),
            expr("percentile_approx(monetary_value, array(0.2, 0.4, 0.6, 0.8))").alias("m_percentiles")
        ).collect()[0]
        
        r_p = percentiles_query["r_percentiles"]
        f_p = percentiles_query["f_percentiles"]
        m_p = percentiles_query["m_percentiles"]
        
        # Apply RFM scoring
        rfm_scored = rfm_base.select(
            "*",
            when(col("recency") <= r_p[0], 5)
            .when(col("recency") <= r_p[1], 4)
            .when(col("recency") <= r_p[2], 3)
            .when(col("recency") <= r_p[3], 2)
            .otherwise(1).alias("r_score"),
            
            when(col("frequency") >= f_p[3], 5)
            .when(col("frequency") >= f_p[2], 4)
            .when(col("frequency") >= f_p[1], 3)
            .when(col("frequency") >= f_p[0], 2)
            .otherwise(1).alias("f_score"),
            
            when(col("monetary_value") >= m_p[3], 5)
            .when(col("monetary_value") >= m_p[2], 4)
            .when(col("monetary_value") >= m_p[1], 3)
            .when(col("monetary_value") >= m_p[0], 2)
            .otherwise(1).alias("m_score")
        )
        
        # Create RFM segments
        rfm_segments = rfm_scored.withColumn(
            "rfm_segment",
            when((col("r_score") >= 4) & (col("f_score") >= 4) & (col("m_score") >= 4), "Champions")
            .when((col("r_score") >= 3) & (col("f_score") >= 3) & (col("m_score") >= 3), "Loyal Customers")
            .when((col("r_score") >= 4) & (col("f_score") <= 2), "New Customers")
            .when((col("r_score") >= 3) & (col("f_score") <= 2) & (col("m_score") >= 3), "Potential Loyalists")
            .when((col("r_score") <= 2) & (col("f_score") >= 3) & (col("m_score") >= 3), "At Risk")
            .when((col("r_score") <= 2) & (col("f_score") <= 2) & (col("m_score") >= 3), "Cannot Lose Them")
            .when((col("r_score") <= 2) & (col("f_score") <= 2) & (col("m_score") <= 2), "Lost Customers")
            .otherwise("Others")
        ).withColumn("rfm_score", col("r_score") * 100 + col("f_score") * 10 + col("m_score"))
        
        # Save to database using ODBC
        save_to_database_spark(rfm_segments, "rfm_analysis")
        return rfm_segments

        # Cohort Analysis
    def cohort_analysis():
        """Advanced cohort analysis with retention metrics"""
        
        print("Performing Cohort Analysis...")
        
        # Get first purchase date for each customer
        customer_cohorts = orders.filter(col("status") != "Cancelled") \
            .groupBy("customer_id") \
            .agg(min("order_date").alias("first_purchase_date")) \
            .withColumn("cohort_month", date_trunc("month", col("first_purchase_date")))
        
        # Calculate cohort metrics
        cohort_orders = orders.filter(col("status") != "Cancelled") \
            .join(customer_cohorts, "customer_id") \
            .withColumn("order_month", date_trunc("month", col("order_date"))) \
            .withColumn(
                "months_since_first_purchase",
                months_between(col("order_month"), col("cohort_month"))
            )
        
        cohort_table = cohort_orders.groupBy("cohort_month", "months_since_first_purchase") \
            .agg(
                countDistinct("customer_id").alias("customers"),
                count("order_id").alias("orders"),
                sum("total_amount").alias("revenue")
            )
        
        # Calculate retention rates
        cohort_sizes = cohort_table.filter(col("months_since_first_purchase") == 0) \
            .select("cohort_month", col("customers").alias("cohort_size"))
        
        cohort_retention = cohort_table.join(cohort_sizes, "cohort_month") \
            .withColumn("retention_rate", col("customers") / col("cohort_size")) \
            .withColumn("revenue_per_customer", col("revenue") / col("customers"))
        
        # Save to database using ODBC
        save_to_database_spark(cohort_retention, "cohort_analysis")
        return cohort_retention

    def create_customer_features():
        """Create comprehensive customer features"""
        
        print("Creating Customer Features...")
        
        # Customer transaction patterns
        customer_patterns = orders.filter(col("status") != "Cancelled") \
            .groupBy("customer_id") \
            .agg(
                count("order_id").alias("total_orders"),
                sum("total_amount").alias("total_spent"),
                avg("total_amount").alias("avg_order_value"),
                min("order_date").alias("first_order_date"),
                max("order_date").alias("last_order_date"),
                stddev("total_amount").alias("order_value_std"),
                collect_list("payment_method").alias("payment_methods_used")
            )
        
        # Calculate features
        current_date = orders.select(max("order_date")).collect()[0][0]
        
        customer_features = customer_patterns \
            .withColumn("customer_age_days", datediff(col("last_order_date"), col("first_order_date"))) \
            .withColumn("days_since_last_order", datediff(lit(current_date), col("last_order_date"))) \
            .withColumn("order_frequency", col("total_orders") / (col("customer_age_days") + 1)) \
            .withColumn("payment_method_diversity", size(array_distinct(col("payment_methods_used")))) \
            .withColumn(
                "customer_tier",
                when(col("total_spent") >= 1000, "Premium")
                .when(col("total_spent") >= 500, "Gold")
                .when(col("total_spent") >= 200, "Silver")
                .otherwise("Bronze")
            )
        
        # Join with customer demographic data
        customer_enriched = customer_features.join(customers, "customer_id") \
            .withColumn("signup_to_first_order_days", 
                       datediff(col("first_order_date"), col("signup_date"))) \
            .select(
                "customer_id", "first_name", "last_name", "email", "country", "state", "city",
                "segment", "customer_tier", "total_orders", "total_spent", "avg_order_value",
                "customer_age_days", "days_since_last_order", "order_frequency", 
                "payment_method_diversity", "signup_to_first_order_days"
            )
        
        # Save to database using ODBC
        save_to_database_spark(customer_enriched, "customer_features")
        return customer_enriched
    
    # ML Customer Clustering
    def ml_customer_clustering(customer_features, rfm_data):
        """Advanced customer clustering using ML"""
        
        print("Performing ML Customer Clustering...")
        
        # Combine features for clustering
        clustering_data = customer_features.join(rfm_data.select("customer_id", "r_score", "f_score", "m_score"), "customer_id")
        
        # Select numerical features
        feature_cols = ["total_orders", "total_spent", "avg_order_value", "customer_age_days", 
                       "days_since_last_order", "order_frequency", "r_score", "f_score", "m_score"]
        
        clustering_features = clustering_data.select("customer_id", *feature_cols).fillna(0)
        
        # Prepare features for ML
        assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
        feature_vector = assembler.transform(clustering_features)
        
        # Standardize features
        scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
        scaler_model = scaler.fit(feature_vector)
        scaled_data = scaler_model.transform(feature_vector)
        
        # Apply K-means clustering
        kmeans = KMeans(featuresCol="scaled_features", predictionCol="cluster", k=6, seed=42)
        kmeans_model = kmeans.fit(scaled_data)
        clustered_data = kmeans_model.transform(scaled_data)
        
        # Add cluster names
        cluster_names = clustered_data \
            .withColumn("cluster_name",
                       when(col("cluster") == 0, "High_Value_Loyal")
                       .when(col("cluster") == 1, "New_Customers")
                       .when(col("cluster") == 2, "At_Risk_Customers")
                       .when(col("cluster") == 3, "Low_Value_Occasional")
                       .when(col("cluster") == 4, "Average_Customers")
                       .otherwise("Premium_Customers")) \
            .select("customer_id", "cluster", "cluster_name")
        
        # Save to database using ODBC
        save_to_database_spark(cluster_names, "customer_clusters")
        return cluster_names
    
    # Geographic Analysis
    def geographic_analysis(customer_features):
        """Analyze customer distribution by geography"""
        
        print("Performing Geographic Analysis...")
        
        geo_analysis = customer_features.groupBy("country", "state", "city") \
            .agg(
                count("customer_id").alias("customer_count"),
                avg("total_spent").alias("avg_customer_value"),
                sum("total_spent").alias("total_market_value"),
                avg("avg_order_value").alias("avg_order_value"),
                avg("total_orders").alias("avg_orders_per_customer")
            ) \
            .withColumn("market_penetration_score", 
                       col("customer_count") * col("avg_customer_value")) \
            .orderBy(col("total_market_value").desc())
        
        # Save to database using ODBC
        save_to_database_spark(geo_analysis, "geographic_analysis")
        return geo_analysis
    
    # Execute customer analytics
    rfm_analysis = calculate_rfm_analysis()
    cohort_data = cohort_analysis()
    customer_features = create_customer_features()
    customer_clusters = ml_customer_clustering(customer_features, rfm_analysis)
    geo_analysis = geographic_analysis(customer_features)
    
    return {
        "rfm_analysis": rfm_analysis,
        "cohort_data": cohort_data,
        "customer_features": customer_features,
        "customer_clusters": customer_clusters,
        "geo_analysis": geo_analysis
    }

In [None]:
customer_analytics = advanced_customer_analytics()


ADVANCED CUSTOMER ANALYTICS
Calculating RFM Analysis...


AttributeError: 'DataFrame' object has no attribute 'select'

# SECTION 5: PRODUCT ANALYTICS

In [None]:
def advanced_product_analytics():
    """Comprehensive product analytics with ODBC database storage"""
    
    print("\n" + "="*80)
    print("ADVANCED PRODUCT ANALYTICS")
    print("="*80)
    
    products = raw_tables["products"]
    inventory = raw_tables["inventory"]
    orders = raw_tables["orders"]
    order_items = raw_tables["order_items"]
    
    # Product Performance
    def calculate_product_performance():
        """Calculate comprehensive product performance metrics"""
        
        print("Calculating Product Performance...")
        
        # Basic sales metrics
        product_sales = order_items.join(orders.filter(col("status") != "Cancelled"), "order_id") \
            .groupBy("product_id") \
            .agg(
                sum("quantity").alias("total_quantity_sold"),
                sum("line_total").alias("total_revenue"),
                count("order_item_id").alias("total_line_items"),
                countDistinct("order_id").alias("unique_orders"),
                countDistinct("customer_id").alias("unique_customers"),
                avg("unit_price").alias("avg_selling_price"),
                min("unit_price").alias("min_selling_price"),
                max("unit_price").alias("max_selling_price"),
                stddev("unit_price").alias("price_volatility")
            )
        
        # Return metrics
        return_metrics = order_items.join(orders, "order_id") \
            .groupBy("product_id") \
            .agg(
                sum(when(col("status") == "Returned", col("quantity")).otherwise(0)).alias("returned_quantity"),
                sum(when(col("status") == "Cancelled", col("quantity")).otherwise(0)).alias("cancelled_quantity"),
                count("*").alias("total_order_lines")
            ) \
            .withColumn("return_rate", col("returned_quantity") / col("total_order_lines")) \
            .withColumn("cancellation_rate", col("cancelled_quantity") / col("total_order_lines"))
        
        # Join all data
        product_performance = products.join(product_sales, "product_id", "left") \
            .join(return_metrics.select("product_id", "return_rate", "cancellation_rate"), "product_id", "left") \
            .join(inventory, "product_id", "left") \
            .fillna(0, ["total_quantity_sold", "total_revenue", "return_rate", "cancellation_rate"])
        
        # Calculate advanced metrics
        product_performance = product_performance \
            .withColumn("revenue_per_unit", 
                       when(col("total_quantity_sold") > 0, col("total_revenue") / col("total_quantity_sold")).otherwise(0)) \
            .withColumn("inventory_turnover", 
                       when(col("stock_level") > 0, col("total_quantity_sold") / col("stock_level")).otherwise(0)) \
            .withColumn("stockout_risk", 
                       when(col("stock_level") <= col("reorder_threshold"), "High")
                       .when(col("stock_level") <= col("reorder_threshold") * 2, "Medium")
                       .otherwise("Low")) \
            .withColumn("profit_margin", 
                       when(col("price") > 0, (col("avg_selling_price") - col("price")) / col("price")).otherwise(0)) \
            .withColumn("lifecycle_stage", lit("Active"))  # Simplified for this example
        
        # Save to database using ODBC
        save_to_database_spark(product_performance, "product_performance")
        return product_performance
    
    # Market Basket Analysis
    def market_basket_analysis():
        """Advanced market basket analysis"""
        
        print("Performing Market Basket Analysis...")
        
        # Products that appear together in orders
        order_products = order_items.join(orders.filter(col("status") != "Cancelled"), "order_id") \
            .select("order_id", "product_id") \
            .distinct()
        
        # Calculate co-occurrence
        product_pairs = order_products.alias("a") \
            .join(order_products.alias("b"), col("a.order_id") == col("b.order_id")) \
            .filter(col("a.product_id") < col("b.product_id")) \
            .groupBy(col("a.product_id").alias("product_a"), col("b.product_id").alias("product_b")) \
            .agg(count("a.order_id").alias("co_occurrence_count"))
        
        # Calculate metrics
        total_orders = orders.filter(col("status") != "Cancelled").count()
        
        product_support = order_products.groupBy("product_id") \
            .agg(count("order_id").alias("product_count")) \
            .withColumn("product_support", col("product_count") / total_orders)
        
        basket_metrics = product_pairs \
            .join(product_support.alias("pa"), col("product_a") == col("pa.product_id")) \
            .join(product_support.alias("pb"), col("product_b") == col("pb.product_id")) \
            .withColumn("support", col("co_occurrence_count") / total_orders) \
            .withColumn("confidence_a_to_b", col("co_occurrence_count") / col("pa.product_count")) \
            .withColumn("confidence_b_to_a", col("co_occurrence_count") / col("pb.product_count")) \
            .withColumn("lift", col("support") / (col("pa.product_support") * col("pb.product_support"))) \
            .filter(col("lift") > 1.0) \
            .select("product_a", "product_b", "co_occurrence_count", "support", 
                   "confidence_a_to_b", "confidence_b_to_a", "lift") \
            .orderBy(col("lift").desc()) \
            .limit(1000)  # Limit for database storage
        
        # Save to database using ODBC
        save_to_database_spark(basket_metrics, "market_basket_analysis")
        return basket_metrics
    
    # Category Performance
    def category_performance_analysis():
        """Analyze performance by product categories"""
        
        print("Analyzing Category Performance...")
        
        # Get product performance data
        product_perf = read_from_database_spark_jdbc("product_performance")
        
        category_metrics = product_perf.groupBy("category") \
            .agg(
                count("product_id").alias("total_products"),
                sum("total_quantity_sold").alias("category_quantity_sold"),
                sum("total_revenue").alias("category_revenue"),
                avg("avg_selling_price").alias("avg_category_price"),
                avg("return_rate").alias("avg_return_rate"),
                avg("inventory_turnover").alias("avg_inventory_turnover"),
                countDistinct("brand").alias("brand_diversity")
            )
        
        # Calculate market share
        total_revenue = category_metrics.select(sum("category_revenue")).collect()[0][0]
        
        category_share = category_metrics \
            .withColumn("market_share", col("category_revenue") / total_revenue) \
            .withColumn("revenue_per_product", col("category_revenue") / col("total_products")) \
            .orderBy(col("category_revenue").desc())
        
        # Save to database using ODBC
        save_to_database_spark(category_share, "category_performance")
        return category_share
    
    # Execute product analytics
    product_performance = calculate_product_performance()
    basket_analysis = market_basket_analysis()
    category_analysis = category_performance_analysis()
    
    return {
        "product_performance": product_performance,
        "basket_analysis": basket_analysis,
        "category_analysis": category_analysis
    }

In [None]:
product_analytics = advanced_product_analytics()

In [None]:
# CELL 7: Advanced Sales Analytics
def advanced_sales_analytics():
    """Comprehensive sales analytics with ODBC database storage"""
    
    print("\n" + "="*80)
    print("ADVANCED SALES ANALYTICS")
    print("="*80)
    
    orders = raw_tables["orders"]
    customers = raw_tables["customers"]
    
    # Time Series Sales Analysis
    def sales_time_series_analysis():
        """Advanced time series analysis"""
        
        print("Performing Sales Time Series Analysis...")
        
        # Daily sales metrics
        daily_sales = orders.filter(col("status") != "Cancelled") \
            .withColumn("order_date_only", to_date(col("order_date"))) \
            .groupBy("order_date_only") \
            .agg(
                count("order_id").alias("daily_orders"),
                sum("total_amount").alias("daily_revenue"),
                avg("total_amount").alias("avg_order_value"),
                countDistinct("customer_id").alias("unique_customers"),
                sum("shipping").alias("total_shipping"),
                sum("tax").alias("total_tax"),
                sum("discount").alias("total_discounts")
            )
        
        # Add time features
        daily_sales_enriched = daily_sales \
            .withColumn("day_of_week", dayofweek("order_date_only")) \
            .withColumn("day_name", date_format("order_date_only", "EEEE")) \
            .withColumn("month", month("order_date_only")) \
            .withColumn("quarter", quarter("order_date_only")) \
            .withColumn("year", year("order_date_only")) \
            .withColumn("is_weekend", when(col("day_of_week").isin([1, 7]), True).otherwise(False))
        
        # Calculate moving averages
        window_7d = Window.orderBy("order_date_only").rowsBetween(-6, 0)
        window_30d = Window.orderBy("order_date_only").rowsBetween(-29, 0)
        window_lag = Window.orderBy("order_date_only")
        
        sales_with_trends = daily_sales_enriched \
            .withColumn("revenue_7d_ma", avg("daily_revenue").over(window_7d)) \
            .withColumn("revenue_30d_ma", avg("daily_revenue").over(window_30d)) \
            .withColumn("prev_day_revenue", lag("daily_revenue").over(window_lag)) \
            .withColumn("revenue_growth", 
                       when(col("prev_day_revenue") > 0,
                            (col("daily_revenue") - col("prev_day_revenue")) / col("prev_day_revenue"))
                       .otherwise(0)) \
            .select("order_date_only", "daily_orders", "daily_revenue", "avg_order_value",
                   "unique_customers", "total_shipping", "total_tax", "total_discounts",
                   "day_of_week", "day_name", "month", "quarter", "year", "is_weekend",
                   "revenue_7d_ma", "revenue_30d_ma", "revenue_growth")
        
        # Save to database using ODBC
        save_to_database_odbc(sales_with_trends, "sales_timeseries")
        return sales_with_trends
    
    # Channel Analysis
    def channel_performance_analysis():
        """Analyze sales channel performance"""
        
        print("Analyzing Channel Performance...")
        
        channel_performance = orders.filter(col("status") != "Cancelled") \
            .groupBy("placed_via") \
            .agg(
                count("order_id").alias("total_orders"),
                sum("total_amount").alias("total_revenue"),
                avg("total_amount").alias("avg_order_value"),
                countDistinct("customer_id").alias("unique_customers"),
                sum("discount").alias("total_discounts")
            ) \
            .withColumn("revenue_per_customer", col("total_revenue") / col("unique_customers")) \
            .withColumn("discount_rate", col("total_discounts") / col("total_revenue"))
        
        # Calculate market share
        total_revenue = channel_performance.select(sum("total_revenue")).collect()[0][0]
        
        channel_share = channel_performance \
            .withColumn("market_share", col("total_revenue") / total_revenue) \
            .orderBy(col("total_revenue").desc())
        
        # Save to database using ODBC
        save_to_database_odbc(channel_share, "channel_analysis")
        return channel_share
    
    # Customer Acquisition Analysis
    def customer_acquisition_analysis():
        """Analyze customer acquisition patterns"""
        
        print("Analyzing Customer Acquisition...")
        
        # First-time customers
        customer_order_sequence = orders.filter(col("status") != "Cancelled") \
            .withColumn("order_rank", 
                       row_number().over(Window.partitionBy("customer_id").orderBy("order_date")))
        
        new_customers = customer_order_sequence.filter(col("order_rank") == 1) \
            .withColumn("acquisition_month", date_trunc("month", col("order_date"))) \
            .groupBy("acquisition_month") \
            .agg(
                count("customer_id").alias("new_customers"),
                sum("total_amount").alias("new_customer_revenue"),
                avg("total_amount").alias("avg_first_order_value")
            )
        
        # Conversion funnel
        signup_to_purchase = customers \
            .join(customer_order_sequence.filter(col("order_rank") == 1), "customer_id", "left") \
            .withColumn("days_to_first_purchase", 
                       datediff(col("order_date"), col("signup_date"))) \
            .withColumn("converted", when(col("order_date").isNotNull(), 1).otherwise(0))
        
        conversion_metrics = signup_to_purchase \
            .withColumn("signup_month", date_trunc("month", col("signup_date"))) \
            .groupBy("signup_month") \
            .agg(
                count("customer_id").alias("signups"),
                sum("converted").alias("conversions"),
                avg("days_to_first_purchase").alias("avg_days_to_convert")
            ) \
            .withColumn("conversion_rate", col("conversions") / col("signups"))
        
        # Save to database using ODBC
        save_to_database_odbc(new_customers, "customer_acquisition")
        save_to_database_odbc(conversion_metrics, "conversion_funnel")
        
        return new_customers, conversion_metrics
    
    # Execute sales analytics
    sales_timeseries = sales_time_series_analysis()
    channel_analysis = channel_performance_analysis()
    new_customers, conversion_funnel = customer_acquisition_analysis()
    
    return {
        "sales_timeseries": sales_timeseries,
        "channel_analysis": channel_analysis,
        "new_customers": new_customers,
        "conversion_funnel": conversion_funnel
    }

# Execute sales analytics


In [None]:
sales_analytics = advanced_sales_analytics()

In [None]:

# CELL 8: Advanced Clickstream Analytics
def advanced_clickstream_analytics():
    """Comprehensive clickstream analytics with ODBC database storage"""
    
    print("\n" + "="*80)
    print("ADVANCED CLICKSTREAM ANALYTICS")
    print("="*80)
    
    clickstream = raw_tables["clickstream"]
    
    # Session Analysis
    def session_analysis():
        """Advanced session analysis"""
        
        print("Performing Session Analysis...")
        
        # Session-level metrics
        session_metrics = clickstream \
            .groupBy("session_id", "customer_id", "device") \
            .agg(
                count("click_id").alias("page_views"),
                countDistinct("page").alias("unique_pages"),
                countDistinct("product_id").alias("products_viewed"),
                min("timestamp").alias("session_start"),
                max("timestamp").alias("session_end"),
                first("utm_source").alias("traffic_source"),
                first("referrer").alias("referrer")
            ) \
            .withColumn("session_duration_minutes", 
                       (unix_timestamp("session_end") - unix_timestamp("session_start")) / 60) \
            .withColumn("bounce", when(col("page_views") == 1, True).otherwise(False)) \
            .withColumn("completed_purchase", 
                       when(col("page_views") >= 5, True).otherwise(False))  # Simplified logic
        
        # Save to database using ODBC
        save_to_database_odbc(session_metrics, "session_metrics")
        
        # User journey flow analysis
        journey_patterns = clickstream \
            .withColumn("page_order", 
                       row_number().over(Window.partitionBy("session_id").orderBy("timestamp"))) \
            .withColumn("next_page", 
                       lead("page").over(Window.partitionBy("session_id").orderBy("timestamp"))) \
            .filter(col("next_page").isNotNull()) \
            .groupBy("page", "next_page") \
            .agg(count("*").alias("transition_count")) \
            .withColumn("total_from_page", 
                       sum("transition_count").over(Window.partitionBy("page"))) \
            .withColumn("transition_probability", 
                       col("transition_count") / col("total_from_page")) \
            .orderBy(col("transition_count").desc()) \
            .limit(500)  # Limit for database storage
        
        # Save to database using ODBC
        save_to_database_odbc(journey_patterns, "user_journey_flow")
        
        return session_metrics, journey_patterns
    
    # User Behavior Segmentation
    def user_behavior_segmentation():
        """Segment users based on browsing behavior"""
        
        print("Performing User Behavior Segmentation...")
        
        # User-level behavioral metrics
        user_behavior = clickstream.filter(col("customer_id").isNotNull()) \
            .groupBy("customer_id") \
            .agg(
                count("click_id").alias("total_clicks"),
                countDistinct("session_id").alias("total_sessions"),
                countDistinct("page").alias("unique_pages_visited"),
                countDistinct("product_id").alias("unique_products_viewed"),
                countDistinct("device").alias("devices_used"),
                collect_set("utm_source").alias("traffic_sources")
            ) \
            .withColumn("clicks_per_session", col("total_clicks") / col("total_sessions")) \
            .withColumn("product_focus", col("unique_products_viewed") / col("total_clicks")) \
            .withColumn("source_diversity", size(col("traffic_sources")))
        
        # Calculate percentiles for segmentation
        behavior_stats = user_behavior.select(
            expr("percentile_approx(total_clicks, array(0.33, 0.67))").alias("clicks_p"),
            expr("percentile_approx(clicks_per_session, array(0.33, 0.67))").alias("cps_p"),
            expr("percentile_approx(product_focus, array(0.33, 0.67))").alias("pf_p")
        ).collect()[0]
        
        # Apply behavioral segmentation
        user_segments = user_behavior \
            .withColumn(
                "engagement_level",
                when(col("total_clicks") >= behavior_stats["clicks_p"][1], "High")
                .when(col("total_clicks") >= behavior_stats["clicks_p"][0], "Medium")
                .otherwise("Low")
            ) \
            .withColumn(
                "session_intensity",
                when(col("clicks_per_session") >= behavior_stats["cps_p"][1], "Intensive")
                .when(col("clicks_per_session") >= behavior_stats["cps_p"][0], "Moderate")
                .otherwise("Light")
            ) \
            .withColumn(
                "shopping_focus",
                when(col("product_focus") >= behavior_stats["pf_p"][1], "Product Focused")
                .when(col("product_focus") >= behavior_stats["pf_p"][0], "Mixed Browsing")
                .otherwise("General Browsing")
            ) \
            .select("customer_id", "total_clicks", "total_sessions", "unique_pages_visited",
                   "unique_products_viewed", "devices_used", "clicks_per_session", "product_focus",
                   "source_diversity", "engagement_level", "session_intensity", "shopping_focus")
        
        # Save to database using ODBC
        save_to_database_odbc(user_segments, "user_behavior_segments")
        return user_segments
    
    # Execute clickstream analytics
    session_metrics, journey_patterns = session_analysis()
    behavior_segments = user_behavior_segmentation()
    
    return {
        "session_metrics": session_metrics,
        "journey_patterns": journey_patterns,
        "behavior_segments": behavior_segments
    }

# Execute clickstream analytics


In [None]:
clickstream_analytics = advanced_clickstream_analytics()


In [None]:
def create_ml_features():
    """Create ML features and store in ODBC database"""
    
    print("\n" + "="*80)
    print("ADVANCED ML FEATURE ENGINEERING")
    print("="*80)
    
    # Load data from database for ML features
    customer_features = read_from_database_odbc("customer_features")
    rfm_analysis = read_from_database_odbc("rfm_analysis")
    
    # CLV Features
    def create_clv_features():
        """Create CLV prediction features"""
        
        print("Creating CLV Features...")
        
        clv_features = customer_features.join(rfm_analysis.select("customer_id", "r_score", "f_score", "m_score"), "customer_id") \
            .withColumn("orders_per_day", col("total_orders") / col("customer_age_days")) \
            .withColumn("revenue_per_day", col("total_spent") / col("customer_age_days")) \
            .withColumn("high_value_customer", when(col("total_spent") > 1000, True).otherwise(False)) \
            .select("customer_id", "total_orders", "total_spent", "avg_order_value", 
                   "customer_age_days", "days_since_last_order", "order_frequency",
                   "r_score", "f_score", "m_score", "high_value_customer", 
                   "orders_per_day", "revenue_per_day")
        
        # Save to database using ODBC
        save_to_database_odbc(clv_features, "clv_features")
        return clv_features
    
    # Churn Features
    def create_churn_features():
        """Create churn prediction features"""
        
        print("Creating Churn Features...")
        
        orders = raw_tables["orders"]
        
        # Calculate recent activity
        latest_orders = orders.filter(col("status") != "Cancelled") \
            .groupBy("customer_id") \
            .agg(
                max("order_date").alias("last_order_date"),
                count("order_id").alias("recent_orders"),
                avg("total_amount").alias("avg_recent_order_value")
            )
        
        current_date = orders.select(max("order_date")).collect()[0][0]
        
        churn_features = latest_orders \
            .withColumn("days_since_last_order", datediff(lit(current_date), col("last_order_date"))) \
            .withColumn("churn_risk", when(col("days_since_last_order") > 90, True).otherwise(False)) \
            .join(customer_features.select("customer_id", "total_orders", "total_spent", "avg_order_value"), "customer_id") \
            .join(rfm_analysis.select("customer_id", "r_score", "f_score", "m_score"), "customer_id") \
            .withColumn("churn_risk_score", 
                       (col("days_since_last_order") / 365.0 * 0.4) + 
                       ((5 - col("r_score")) / 5.0 * 0.3) + 
                       ((5 - col("f_score")) / 5.0 * 0.3))
        
        # Save to database using ODBC
        save_to_database_odbc(churn_features, "churn_features")
        return churn_features
    
    # Execute ML feature creation
    clv_features = create_clv_features()
    churn_features = create_churn_features()
    
    return {
        "clv_features": clv_features,
        "churn_features": churn_features
    }

In [None]:
ml_features = create_ml_features()

In [None]:
def create_data_warehouse():
    """Create optimized data warehouse with ODBC database storage"""
    
    print("\n" + "="*80)
    print("CREATING DATA WAREHOUSE")
    print("="*80)
    
    # Load analytics data from database using ODBC
    customer_features = read_from_database_odbc("customer_features")
    customer_clusters = read_from_database_odbc("customer_clusters")
    rfm_analysis = read_from_database_odbc("rfm_analysis")
    product_performance = read_from_database_odbc("product_performance")
    sales_timeseries = read_from_database_odbc("sales_timeseries")
    session_metrics = read_from_database_odbc("session_metrics")
    
    # Customer Dimension
    def create_dim_customer():
        """Create customer dimension table"""
        
        print("Creating Customer Dimension...")
        
        dim_customer = customer_features \
            .join(customer_clusters, "customer_id") \
            .join(rfm_analysis.select("customer_id", "rfm_segment"), "customer_id") \
            .select(
                "customer_id", "first_name", "last_name", "email", "country", "state", "city",
                "segment", "customer_tier", "rfm_segment", "cluster", "cluster_name", 
                "total_orders", "total_spent"
            ) \
            .withColumn("effective_date", current_date()) \
            .withColumn("is_current", lit(True))
        
        # Save to database using ODBC
        save_to_database_odbc(dim_customer, "dim_customer")
        return dim_customer
    
    # Product Dimension
    def create_dim_product():
        """Create product dimension table"""
        
        print("Creating Product Dimension...")
        
        dim_product = product_performance.select(
            "product_id", "sku", "product_name", "category", "brand", "price",
            "lifecycle_stage", "stockout_risk", "profit_margin", "total_quantity_sold"
        )
        
        # Save to database using ODBC
        save_to_database_odbc(dim_product, "dim_product")
        return dim_product
    
    # Date Dimension
    def create_dim_date():
        """Create date dimension table"""
        
        print("Creating Date Dimension...")
        
        date_range = sales_timeseries.select("order_date").distinct() \
            .withColumnRenamed("order_date", "date")
        
        dim_date = date_range \
            .withColumn("year", year("date")) \
            .withColumn("month", month("date")) \
            .withColumn("day", dayofmonth("date")) \
            .withColumn("quarter", quarter("date")) \
            .withColumn("day_of_week", dayofweek("date")) \
            .withColumn("day_name", date_format("date", "EEEE")) \
            .withColumn("month_name", date_format("date", "MMMM")) \
            .withColumn("is_weekend", when(col("day_of_week").isin([1, 7]), True).otherwise(False)) \
            .withColumn("is_month_end", when(col("day") >= 28, True).otherwise(False)) \
            .withColumn("fiscal_year", when(col("month") >= 4, col("year")).otherwise(col("year") - 1))
        
        # Save to database using ODBC
        save_to_database_odbc(dim_date, "dim_date")
        return dim_date
    
    # Sales Fact
    def create_fact_sales():
        """Create sales fact table"""
        
        print("Creating Sales Fact...")
        
        fact_sales = sales_timeseries.select(
            col("order_date").alias("date"), "daily_orders", "daily_revenue", "avg_order_value",
            "unique_customers", "total_shipping", "total_tax", "total_discounts",
            "revenue_7d_ma", "revenue_30d_ma", "revenue_growth"
        )
        
        # Save to database using ODBC
        save_to_database_odbc(fact_sales, "fact_sales")
        return fact_sales
    
    # Customer Behavior Fact
    def create_fact_customer_behavior():
        """Create customer behavior fact table"""
        
        print("Creating Customer Behavior Fact...")
        
        fact_customer_behavior = session_metrics \
            .filter(col("customer_id").isNotNull()) \
            .withColumn("date", to_date("session_start")) \
            .groupBy("customer_id", "date") \
            .agg(
                sum("page_views").alias("total_page_views"),
                sum("products_viewed").alias("total_products_viewed"),
                avg("session_duration_minutes").alias("avg_session_duration"),
                sum(when(col("bounce") == True, 1).otherwise(0)).alias("total_bounces"),
                countDistinct("session_id").alias("total_sessions")
            )
        
        # Save to database using ODBC
        save_to_database_odbc(fact_customer_behavior, "fact_customer_behavior")
        return fact_customer_behavior
    
    # Execute warehouse creation
    dim_customer = create_dim_customer()
    dim_product = create_dim_product()
    dim_date = create_dim_date()
    fact_sales = create_fact_sales()
    fact_customer_behavior = create_fact_customer_behavior()
    
    return {
        "dim_customer": dim_customer,
        "dim_product": dim_product,
        "dim_date": dim_date,
        "fact_sales": fact_sales,
        "fact_customer_behavior": fact_customer_behavior
    }


In [None]:
warehouse_tables = create_data_warehouse()

In [None]:
def create_dashboard_and_summary_tables():
    """Create dashboard and summary tables using ODBC"""
    
    print("\n" + "="*80)
    print("CREATING DASHBOARD AND SUMMARY TABLES")
    print("="*80)
    
    # Executive Summary
    def create_executive_summary():
        """Create executive summary metrics"""
        
        print("Creating Executive Summary...")
        
        # Calculate key metrics
        customer_count = read_from_database_odbc("customer_features").count()
        
        sales_metrics = read_from_database_odbc("sales_timeseries").agg(
            sum("daily_revenue").alias("total_revenue"),
            avg("avg_order_value").alias("avg_order_value"),
            sum("daily_orders").alias("total_orders")
        ).collect()[0]
        
        # Create summary records
        summary_data = [
            ("total_customers", float(customer_count)),
            ("total_revenue", float(sales_metrics["total_revenue"] or 0)),
            ("avg_order_value", float(sales_metrics["avg_order_value"] or 0)),
            ("total_orders", float(sales_metrics["total_orders"] or 0))
        ]
        
        exec_summary_df = spark.createDataFrame(summary_data, ["metric_name", "metric_value"])
        
        # Save to database using ODBC
        save_to_database_odbc(exec_summary_df, "executive_summary")
        return exec_summary_df
    
    # Top Customers
    def create_top_customers():
        """Create top customers table"""
        
        print("Creating Top Customers...")
        
        customer_features = read_from_database_odbc("customer_features")
        
        top_customers = customer_features \
            .orderBy(col("total_spent").desc()) \
            .limit(100) \
            .withColumn("rank_position", row_number().over(Window.orderBy(col("total_spent").desc()))) \
            .select("rank_position", "customer_id", "first_name", "last_name", 
                   "total_spent", "total_orders", "customer_tier")
        
        # Save to database using ODBC
        save_to_database_odbc(top_customers, "top_customers")
        return top_customers
    
    # Top Products
    def create_top_products():
        """Create top products table"""
        
        print("Creating Top Products...")
        
        product_performance = read_from_database_odbc("product_performance")
        
        top_products = product_performance \
            .orderBy(col("total_revenue").desc()) \
            .limit(100) \
            .withColumn("rank_position", row_number().over(Window.orderBy(col("total_revenue").desc()))) \
            .select("rank_position", "product_id", "product_name", "category", 
                   "total_revenue", "total_quantity_sold")
        
        # Save to database using ODBC
        save_to_database_odbc(top_products, "top_products")
        return top_products
    
    # Execute dashboard creation
    exec_summary = create_executive_summary()
    top_customers = create_top_customers()
    top_products = create_top_products()
    
    return {
        "executive_summary": exec_summary,
        "top_customers": top_customers,
        "top_products": top_products
    }

# Execute dashboard creation


In [None]:
dashboard_data = create_dashboard_and_summary_tables()

In [None]:
def final_validation_and_monitoring():
    """Perform final validation and generate monitoring report using ODBC"""
    
    print("\n" + "="*80)
    print("FINAL VALIDATION AND MONITORING")
    print("="*80)
    
    # Count records in all analytics tables
    tables_to_check = [
        "customer_features", "rfm_analysis", "cohort_analysis", "customer_clusters",
        "geographic_analysis", "product_performance", "market_basket_analysis", 
        "category_performance", "sales_timeseries", "channel_analysis",
        "conversion_funnel", "customer_acquisition", "session_metrics",
        "user_journey_flow", "user_behavior_segments", "clv_features",
        "churn_features", "executive_summary", "top_customers", "top_products",
        "dim_customer", "dim_product", "dim_date", "fact_sales", "fact_customer_behavior"
    ]
    
    validation_results = {}
    
    for table_name in tables_to_check:
        try:
            df = read_from_database_odbc(table_name)
            if df is not None:
                count = df.count()
                validation_results[table_name] = {
                    "status": "SUCCESS",
                    "record_count": count
                }
                print(f"✅ {table_name}: {count:,} records")
            else:
                validation_results[table_name] = {
                    "status": "ERROR",
                    "error": "Unable to read table"
                }
                print(f"❌ {table_name}: Unable to read table")
            
        except Exception as e:
            validation_results[table_name] = {
                "status": "ERROR",
                "error": str(e)
            }
            print(f"❌ {table_name}: Error - {e}")
    
    # Calculate summary statistics
    total_tables = len(tables_to_check)
    successful_tables = len([v for v in validation_results.values() if v["status"] == "SUCCESS"])
    total_records = sum([v["record_count"] for v in validation_results.values() if v["status"] == "SUCCESS"])
    
    print(f"\n" + "="*60)
    print(f"VALIDATION SUMMARY")
    print(f"="*60)
    print(f"Total Tables: {total_tables}")
    print(f"Successful Tables: {successful_tables}")
    print(f"Success Rate: {(successful_tables/total_tables)*100:.1f}%")
    print(f"Total Records Processed: {total_records:,}")
    
    return validation_results

# Execute final validation


In [None]:
validation_results = final_validation_and_monitoring()


In [None]:
def final_summary_and_cleanup():
    """Provide final summary and cleanup resources"""
    
    print("\n" + "="*80)
    print("ETL PIPELINE EXECUTION SUMMARY")
    print("="*80)
    
    # Success metrics
    successful_tables = len([v for v in validation_results.values() if v["status"] == "SUCCESS"])
    total_records = sum([v["record_count"] for v in validation_results.values() if v["status"] == "SUCCESS"])
    
    print(f"✅ PIPELINE COMPLETED SUCCESSFULLY!")
    print(f"📊 Analytics Tables Created: {successful_tables}")
    print(f"📈 Total Records Processed: {total_records:,}")
    print(f"🗄️ Database: {ODBC_CONFIG['target_db']['database']}")
    print(f"📋 Schema: analytics")
    print(f"🔌 Connection: ODBC ({ODBC_CONFIG['target_db']['driver']})")
    
    print(f"\n🔧 FEATURES IMPLEMENTED:")
    print(f"  • Advanced Customer Analytics (RFM, Cohort, Clustering)")
    print(f"  • Product Performance & Market Basket Analysis")
    print(f"  • Sales Time Series & Channel Analysis")
    print(f"  • Clickstream & User Behavior Analytics")
    print(f"  • Machine Learning Feature Engineering")
    print(f"  • Data Warehouse (Star Schema)")
    print(f"  • Executive Dashboard Tables")
    print(f"  • Real-time Performance Monitoring")
    
    print(f"\n📋 ANALYTICS TABLES AVAILABLE:")
    for table_name, result in validation_results.items():
        if result["status"] == "SUCCESS":
            print(f"  • analytics.{table_name} ({result['record_count']:,} records)")
    
    print(f"\n🔌 ODBC CONNECTION BENEFITS:")
    print(f"  • Native database connectivity")
    print(f"  • Optimized for Windows environments")
    print(f"  • Better performance for SQL Server")
    print(f"  • Enterprise security compliance")
    print(f"  • Direct integration with BI tools")
    
    print(f"\n🚀 READY FOR:")
    print(f"  • Power BI / Tableau Dashboards")
    print(f"  • Excel Analytics & Reporting")
    print(f"  • SSRS Report Services")
    print(f"  • Machine Learning Model Training")
    print(f"  • Real-time Analytics Applications")
    print(f"  • Executive Reporting")
    print(f"  • Customer Segmentation")
    print(f"  • Product Recommendations")
    print(f"  • Churn Prediction")
    print(f"  • Revenue Forecasting")
    
    # Cleanup cached tables
    try:
        for table_name in raw_tables.keys():
            if hasattr(raw_tables[table_name], 'unpersist'):
                raw_tables[table_name].unpersist()
        
        spark.catalog.clearCache()
        print(f"\n🧹 Cleanup completed - cache cleared")
        
    except Exception as e:
        print(f"Cleanup warning: {e}")
    
    print("\n" + "="*80)
    print("🎉 E-COMMERCE ANALYTICS PLATFORM READY FOR PRODUCTION!")
    print("🔌 FULLY OPTIMIZED FOR ODBC CONNECTIVITY!")
    print("="*80)

In [None]:
final_summary_and_cleanup()

In [None]:
spark.stop()
print("✅ Spark session stopped. ODBC ETL pipeline execution complete.")