In [1]:
import sys
import os
notebook_dir = os.getcwd()
src_path = os.path.abspath(os.path.join(notebook_dir, '..', 'src'))
sys.path.append(src_path)

from utils.spark_utils import load_config, initialize_spark, insert_dataframe_to_sqlite, read_dataframe_from_sqlite
from pyspark.sql.functions import col, count, avg, sum, format_number,current_date

# Load configuration
config = load_config("../configs/data_config.json")

# Initialize Spark session
spark = initialize_spark(config["spark"]["app_name"], "../jars/sqlite-jdbc-3.46.0.1.jar")

product_df = read_dataframe_from_sqlite(spark, "product_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
store_df = read_dataframe_from_sqlite(spark, "store_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
customer_df = read_dataframe_from_sqlite(spark, "customer_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
time_df = read_dataframe_from_sqlite(spark, "time_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
sales_df = read_dataframe_from_sqlite(spark, "sales_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
supplier_df = read_dataframe_from_sqlite(spark, "supplier_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
feedback_df = read_dataframe_from_sqlite(spark, "feedback_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
loyalty_df = read_dataframe_from_sqlite(spark, "loyalty_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")

24/08/15 16:25:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
# Print schema for all DataFrames
print("=== Product DataFrame Schema ===")
product_df.printSchema()

print("=== Store DataFrame Schema ===")
store_df.printSchema()

print("=== Customer DataFrame Schema ===")
customer_df.printSchema()

print("=== Time DataFrame Schema ===")
time_df.printSchema()

print("=== Sales DataFrame Schema ===")
sales_df.printSchema()

print("=== Supplier DataFrame Schema ===")
supplier_df.printSchema()

print("=== Feedback DataFrame Schema ===")
feedback_df.printSchema()

print("=== Loyalty DataFrame Schema ===")
loyalty_df.printSchema()

In [None]:
### Import necessary functions
from pyspark.sql.functions import count, avg, sum, col, countDistinct, format_number, year, datediff, current_date, max, min

### 1. Product Analysis
print("=== Product Analysis ===")
product_df.printSchema()

# Basic stats
total_products = product_df.select("Product_ID").distinct().count()
total_brands = product_df.select("Brand_Name").distinct().count()
total_categories = product_df.select("Category").distinct().count()
total_suppliers = product_df.select("Supplier_ID").distinct().count()

print(f"Total Products: {total_products}")
print(f"Total Brands: {total_brands}")
print(f"Total Categories: {total_categories}")
print(f"Total Suppliers: {total_suppliers}")

# Products and Avg. Price Across Categories
print("Products and Average Price Across Categories")
category_statistics = product_df.groupBy("Category").agg(
    count("Product_ID").alias("Product_Count"),
    format_number(avg("Price"), 2).alias("Average_Price")
)
category_statistics.show()

# Brands by Product Count
print("Brands by Product Count")
brand_statistics = product_df.groupBy("Brand_Name").agg(
    count("Product_ID").alias("Product_Count"),
    format_number(avg("Price"), 2).alias("Average_Price")
)
brand_statistics.orderBy(col("Product_Count").desc()).show(10)

### 2. Store Analysis
print("=== Store Analysis ===")
store_df.printSchema()

# Basic stats
total_stores = store_df.select("Store_ID").distinct().count()
total_store_locations = store_df.select("Store_Location").distinct().count()

print(f"Total Stores: {total_stores}")
print(f"Total Store Locations: {total_store_locations}")

# Stores and Avg. Size by Store Type
print("Stores and Average Size by Store Type")
store_statistics = store_df.groupBy("Store_Type").agg(
    count("Store_ID").alias("Store_Count"),
    format_number(avg("Store_Size"), 2).alias("Average_Store_Size")
)
store_statistics.show()

# Store Size Distribution
print("Store Size Distribution")
store_size_distribution = store_df.groupBy("Store_Size").agg(
    count("Store_ID").alias("Store_Count")
)
store_size_distribution.orderBy(col("Store_Size").desc()).show()

### 3. Customer Analysis
print("=== Customer Analysis ===")
customer_df.printSchema()

# Basic stats
total_customers = customer_df.select("Customer_ID").distinct().count()
print(f"Total Customers: {total_customers}")

# Customers by State
print("Customer Distribution by State")
customer_state_stats = customer_df.groupBy("State").agg(
    count("Customer_ID").alias("Customer_Count")
)
customer_state_stats.show()

# Gender Distribution
print("Gender Distribution")
customer_df.groupBy("Gender").count().show()

# Age Analysis
print("Customer Age Analysis")
current_year = year(current_date())
customer_age_df = customer_df.withColumn("Age", current_year - year(col("DOB")))

# Age Distribution
print("Age Distribution")
age_distribution = customer_age_df.groupBy("Age").agg(
    count("Customer_ID").alias("Customer_Count")
)
age_distribution.orderBy(col("Age").desc()).show()

# Customer Join Date Distribution
print("Customer Join Date Distribution")
customer_df.groupBy(year("Customer_Join_Date").alias("Join_Year")).count().orderBy("Join_Year").show()

### 4. Sales Analysis
print("=== Sales Analysis ===")
sales_df.printSchema()

# Basic stats
total_transactions = sales_df.select("Transaction_ID").distinct().count()
print(f"Total Transactions: {total_transactions}")

# Avg Transaction Value by Store
print("Average Transaction Value by Store")
sales_statistics = sales_df.groupBy("Store_ID").agg(
    count("Transaction_ID").alias("Transaction_Count"),
    format_number(avg("Sales_Amount"), 2).alias("Average_Sales_Amount"),
    format_number(sum("Sales_Amount"), 2).alias("Total_Sales")
)
sales_statistics.show()

# Top 10 Products Sold by Sales
print("Top 10 Products Sold by Sales Amount")
product_sales = sales_df.groupBy("Product_ID").agg(
    format_number(sum(col("Sales_Amount")), 2).alias("Total_Sales")
)
top_selling_products = product_sales.orderBy(col("Total_Sales").desc()).limit(10)
top_selling_products.show()

### 5. Feedback Analysis
print("=== Feedback Analysis ===")
feedback_df.printSchema()

# Basic stats
total_feedbacks = feedback_df.select("Feedback_ID").distinct().count()
print(f"Total Feedbacks: {total_feedbacks}")

# Top 10 Products with Highest Ratings
print("Top 10 Products with Highest Ratings")
product_feedback_stats = feedback_df.groupBy("Product_ID").agg(
    count("Feedback_ID").alias("Feedback_Count"),
    format_number(avg(col("Feedback_Rating")), 2).alias("Average_Rating")
)
top_rated_products = product_feedback_stats.orderBy(col("Average_Rating").desc()).limit(10)
top_rated_products.show()

# Feedback Rating Distribution
print("Feedback Rating Distribution")
feedback_rating_distribution = feedback_df.groupBy("Feedback_Rating").agg(
    count("Feedback_ID").alias("Feedback_Count")
)
feedback_rating_distribution.orderBy(col("Feedback_Rating").desc()).show()

### 6. Loyalty Analysis
print("=== Loyalty Analysis ===")
loyalty_df.printSchema()

# Membership Tier Distribution
print("Membership Tier Distribution")
membership_tier_distribution = loyalty_df.groupBy("Membership_Tier").agg(
    count("Customer_ID").alias("Customer_Count"),
    format_number(avg("Points_Earned"), 2).alias("Average_Points_Earned"),
    format_number(avg("Points_Redeemed"), 2).alias("Average_Points_Redeemed")
)
membership_tier_distribution.show()

### 7. Relationship Analysis

# Relationship 1: Sales and Products
print("=== Relationship Analysis: Sales and Products ===")
sales_product_df = sales_df.join(product_df, "Product_ID").dropDuplicates(["Product_ID", "Store_ID"])

sales_product_analysis = sales_product_df.groupBy("Category", "Brand_Name").agg(
    format_number(sum("Sales_Amount"), 2).alias("Total_Sales"),
    format_number(avg("Sales_Amount"), 2).alias("Average_Sales_Amount")
)
sales_product_analysis.show()

# Relationship 2: Sales and Stores
print("=== Relationship Analysis: Sales and Stores ===")
sales_store_analysis = sales_df.groupBy("Store_ID").agg(
    format_number(sum("Sales_Amount"), 2).alias("Total_Sales"),
    format_number(avg("Sales_Amount"), 2).alias("Average_Sales_Amount")
)
sales_store_analysis.show()

# Relationship 3: Feedback and Products with Sales
print("=== Relationship Analysis: Feedback and Products with Sales ===")
feedback_sales_product_df = feedback_df.join(sales_df, "Product_ID").join(product_df, "Product_ID").dropDuplicates(["Product_ID"])

feedback_product_analysis = feedback_sales_product_df.groupBy("Product_ID", "Product_Name").agg(
    format_number(avg("Feedback_Rating"), 2).alias("Average_Rating"),
    format_number(sum("Sales_Amount"), 2).alias("Total_Sales")
)
feedback_product_analysis.orderBy(col("Average_Rating").desc()).show(10)

# Relationship 4: Loyalty and Sales
print("=== Relationship Analysis: Loyalty and Sales ===")
loyalty_sales_df = loyalty_df.join(sales_df, "Customer_ID").dropDuplicates(["Customer_ID"])

loyalty_sales_analysis = loyalty_sales_df.groupBy("Membership_Tier").agg(
    format_number(sum("Sales_Amount"), 2).alias("Total_Sales"),
    format_number(avg("Sales_Amount"), 2).alias("Average_Sales_Amount")
)
loyalty_sales_analysis.show()

# Relationship 5: Age and Sales
print("=== Relationship Analysis: Age and Sales ===")
age_sales_df = customer_age_df.join(sales_df, "Customer_ID").dropDuplicates(["Customer_ID"])

age_sales_analysis = age_sales_df.groupBy("Age").agg(
    format_number(sum("Sales_Amount"), 2).alias("Total_Sales"),
    format_number(avg("Sales_Amount"), 2).alias("Average_Sales_Amount")
)
age_sales_analysis.orderBy(col("Age").desc()).show()

### 8. Summary Statistics for Customer Data
print("=== Basic Statistics for Customer Data ===")
customer_df.describe().show()

print("=== Top 10 Cities by Customer Count ===")
customer_df.groupBy("City").count().orderBy(col("count").desc()).show(10)

print("=== Top 10 States by Customer Count ===")
customer_df.groupBy("State").count().orderBy(col("count").desc()).show(10)

### 9. Aggregated Sales Data for Customers
print("=== Aggregated Sales Data for Customers ===")
sales_agg_df = sales_df.groupBy("Customer_ID").agg(
    sum("Sales_Amount").alias("Total_Spend"),
    countDistinct("Transaction_ID").alias("Total_Transactions"),
    max("Date").alias("Last_Purchase_Date"),
    min("Date").alias("First_Purchase_Date")
)

# Calculate Recency in days
sales_agg_df = sales_agg_df.withColumn("Recency", datediff(current_date(), col("Last_Purchase_Date")))

# Join with customer data
customer_sales_df = customer_df.join(sales_agg_df, "Customer_ID", "left")

# Show summary statistics of the new DataFrame
print("=== Summary Statistics of Customer Spend, Transactions, and Recency ===")
customer_sales_df.describe(["Total_Spend", "Total_Transactions", "Recency"]).show()

# Display the first few rows of the final DataFrame for review
print("=== Sample Data from Customer Sales DataFrame ===")
customer_sales_df.show(5)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, countDistinct, max, datediff, current_date, mean
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans, BisectingKMeans, GaussianMixture
from pyspark.ml.evaluation import ClusteringEvaluator
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize Spark session
spark = SparkSession.builder.appName("Customer Segmentation").getOrCreate()

# Load and preprocess data
def load_data(spark):
    customer_df = read_dataframe_from_sqlite(spark, "customer_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    sales_df = read_dataframe_from_sqlite(spark, "sales_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    return customer_df, sales_df

def calculate_rfm(sales_df):
    rfm_metrics = sales_df.groupBy("Customer_ID").agg(
        datediff(current_date(), max("Date")).alias("Recency"),
        countDistinct("Transaction_ID").alias("Frequency"),
        sum("Sales_Amount").alias("Monetary")
    )
    return rfm_metrics

def preprocess_data(customer_rfm_df):
    # Fill nulls with mean values for Recency, Frequency, and Monetary
    mean_recency = customer_rfm_df.select(mean("Recency")).first()[0]
    mean_frequency = customer_rfm_df.select(mean("Frequency")).first()[0]
    mean_monetary = customer_rfm_df.select(mean("Monetary")).first()[0]

    customer_rfm_df = customer_rfm_df.fillna({
        "Recency": mean_recency, 
        "Frequency": mean_frequency, 
        "Monetary": mean_monetary
    })

    # Assemble features into a vector
    assembler = VectorAssembler(
        inputCols=["Recency", "Frequency", "Monetary"], 
        outputCol="features",
        handleInvalid="skip"  # Skip rows with null values
    )
    feature_vector = assembler.transform(customer_rfm_df)
    
    # Apply Standard Scaler
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
    scaled_data = scaler.fit(feature_vector).transform(feature_vector)
    
    return scaled_data

def train_and_evaluate_model(model, scaled_data, model_name):
    # Train the model
    trained_model = model.fit(scaled_data)
    
    # Get cluster assignments
    clusters = trained_model.transform(scaled_data)
    
    # Evaluate the model using Silhouette Score
    evaluator = ClusteringEvaluator(featuresCol="scaled_features", metricName="silhouette", distanceMeasure="squaredEuclidean")
    silhouette = evaluator.evaluate(clusters)
    
    print(f"{model_name} Silhouette Score: {silhouette}")
    
    # Analyze each cluster's characteristics
    cluster_summary = clusters.groupBy("prediction").agg(
        avg("Recency").alias("Avg_Recency"),
        avg("Frequency").alias("Avg_Frequency"),
        avg("Monetary").alias("Avg_Monetary")
    )
    cluster_summary.show()
    
    return silhouette, clusters

# Load data
customer_df, sales_df = load_data(spark)

# Calculate RFM metrics
rfm_df = calculate_rfm(sales_df)

# Join RFM metrics with customer data
customer_rfm_df = customer_df.join(rfm_df, "Customer_ID", "left")

# Preprocess the data (assemble and scale features)
scaled_data = preprocess_data(customer_rfm_df)

# Dictionary to store models and their names
models = {
    "KMeans": KMeans(k=4, seed=1, featuresCol="scaled_features"),
    "BisectingKMeans": BisectingKMeans(k=4, seed=1, featuresCol="scaled_features"),
    "GaussianMixture": GaussianMixture(k=4, seed=1, featuresCol="scaled_features")
}

# Dictionaries to store results
silhouette_scores = {}
cluster_results = {}

# Train and evaluate each model
for model_name, model in models.items():
    silhouette_score, clusters = train_and_evaluate_model(model, scaled_data, model_name)
    silhouette_scores[model_name] = silhouette_score
    cluster_results[model_name] = clusters

best_model_name = None
best_score = float('-inf')

for model_name, score in silhouette_scores.items():
    if score > best_score:
        best_score = score
        best_model_name = model_name

print(f"Best Model: {best_model_name} with Silhouette Score = {best_score}")

# Visualize the clusters for the best-performing model
pandas_df = cluster_results[best_model_name].select("Recency", "Frequency", "Monetary", "prediction").toPandas()

# Pairplot to visualize clusters
sns.pairplot(pandas_df, hue="prediction", palette="viridis")
plt.show()

# Filter for customers in Cluster 1
cluster_1_customers = cluster_results["BisectingKMeans"].filter(col("prediction") == 1).select("Customer_ID")

# Filter for customers in Cluster 0
cluster_0_customers = cluster_results["BisectingKMeans"].filter(col("prediction") == 0).select("Customer_ID")

# Show the first few rows of each cluster
print("Cluster 1 Customer IDs:")
cluster_1_customers.show()

print("Cluster 0 Customer IDs:")
cluster_0_customers.show()

# End Spark session
spark.stop()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, countDistinct, max, datediff, current_date, mean, col, avg, year
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.clustering import KMeans, BisectingKMeans, GaussianMixture
from pyspark.ml.evaluation import ClusteringEvaluator
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize Spark session
spark = SparkSession.builder.appName("Customer Segmentation").getOrCreate()

# Function to load data
def read_dataframe_from_sqlite(spark, table_name, db_path, driver):
    return spark.read.format("jdbc").option("url", f"jdbc:sqlite:{db_path}")\
            .option("dbtable", table_name).option("driver", driver).load()

# Load and preprocess data
def load_data(spark):
    customer_df = read_dataframe_from_sqlite(spark, "customer_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    sales_df = read_dataframe_from_sqlite(spark, "sales_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    product_df = read_dataframe_from_sqlite(spark, "product_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    store_df = read_dataframe_from_sqlite(spark, "store_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    loyalty_df = read_dataframe_from_sqlite(spark, "loyalty_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    return customer_df, sales_df, product_df, store_df, loyalty_df

# Calculate RFM metrics
def calculate_rfm(sales_df):
    rfm_metrics = sales_df.groupBy("Customer_ID").agg(
        datediff(current_date(), max("Date")).alias("Recency"),
        countDistinct("Transaction_ID").alias("Frequency"),
        sum("Sales_Amount").alias("Monetary")
    )
    return rfm_metrics

# Preprocess data (assemble and scale features)
def preprocess_data(customer_rfm_df, feature_columns):
    # Cast BigDecimal columns to DoubleType
    for feature in feature_columns:
        customer_rfm_df = customer_rfm_df.withColumn(feature, col(feature).cast(DoubleType()))
    
    # Check if DataFrame is empty
    if customer_rfm_df.count() == 0:
        raise ValueError("The DataFrame is empty. Cannot proceed with preprocessing.")
    
    # Calculate mean values with error handling for None
    mean_values = {}
    for feature in feature_columns:
        mean_value = customer_rfm_df.select(mean(feature)).first()[0]
        if mean_value is None:
            mean_value = 0.0  # Default to 0.0 if the mean is None
        mean_values[feature] = mean_value

    # Fill missing values with calculated mean values
    customer_rfm_df = customer_rfm_df.fillna(mean_values)

    # Assemble features into a vector
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features", handleInvalid="skip")
    feature_vector = assembler.transform(customer_rfm_df)
    
    # Check if features contain any data
    if feature_vector.select("features").head()[0].size == 0:
        raise ValueError("Feature vector is empty. Cannot proceed with scaling.")
    
    # Apply Standard Scaler
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
    scaled_data = scaler.fit(feature_vector).transform(feature_vector)
    
    return scaled_data

# Train and evaluate clustering model
def train_and_evaluate_model(model, scaled_data, model_name, feature_columns):
    # Check if DataFrame is empty
    if scaled_data.count() == 0:
        raise ValueError(f"The DataFrame for {model_name} is empty. Cannot proceed with model training.")
    
    trained_model = model.fit(scaled_data)
    clusters = trained_model.transform(scaled_data)
    
    # Check if the number of clusters is greater than one
    num_clusters = clusters.select("prediction").distinct().count()
    if num_clusters <= 1:
        print(f"{model_name}: Number of clusters is {num_clusters}. Cannot compute silhouette score.")
        return None, clusters
    
    evaluator = ClusteringEvaluator(featuresCol="scaled_features", metricName="silhouette", distanceMeasure="squaredEuclidean")
    silhouette = evaluator.evaluate(clusters)
    
    print(f"{model_name} Silhouette Score: {silhouette}")
    
    # Dynamically aggregate based on available features
    agg_exprs = [avg(col(feature)).alias(f"Avg_{feature}") for feature in feature_columns]
    
    cluster_summary = clusters.groupBy("prediction").agg(*agg_exprs)
    cluster_summary.show()
    
    return silhouette, clusters

# Product Purchase Segmentation
def product_purchase_segmentation(sales_df, product_df):
    sales_product_df = sales_df.join(product_df, "Product_ID")
    customer_product_metrics = sales_product_df.groupBy("Customer_ID", "Category").agg(
        countDistinct("Transaction_ID").alias("Purchase_Count"),
        sum("Sales_Amount").alias("Total_Spending")
    )
    return customer_product_metrics

# Store Visit Segmentation
def store_visit_segmentation(sales_df, store_df):
    # Perform the join with the condition specified
    sales_store_df = sales_df.join(store_df, sales_df["Store_ID"] == store_df["Store_Location"], "left")
    
    # Group by Customer_ID and Store_Type to calculate visit frequency and total spending
    customer_store_metrics = sales_store_df.groupBy("Customer_ID", "Store_Type").agg(
        countDistinct("Transaction_ID").alias("Visit_Frequency"),
        sum("Sales_Amount").alias("Total_Spending")
    )
    return customer_store_metrics

# Demographic Segmentation with String Indexing
def demographic_segmentation(customer_df):
    customer_df = customer_df.withColumn("Age", year(current_date()) - year(customer_df["DOB"]))

    # Convert categorical string columns to numerical indices
    indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(customer_df) for col in ["Gender", "City", "State"]]
    for indexer in indexers:
        customer_df = indexer.transform(customer_df)
    
    return customer_df

# Loyalty Program Segmentation
def loyalty_program_segmentation(loyalty_df):
    return loyalty_df

# Load data
customer_df, sales_df, product_df, store_df, loyalty_df = load_data(spark)

# RFM Segmentation
rfm_df = calculate_rfm(sales_df)
customer_rfm_df = customer_df.join(rfm_df, "Customer_ID", "left")
scaled_data_rfm = preprocess_data(customer_rfm_df, ["Recency", "Frequency", "Monetary"])

# Product Purchase Segmentation
product_metrics_df = product_purchase_segmentation(sales_df, product_df)
scaled_data_product = preprocess_data(product_metrics_df, ["Purchase_Count", "Total_Spending"])

# Store Visit Segmentation
store_metrics_df = store_visit_segmentation(sales_df, store_df)
print("Store Metrics DataFrame:")
store_metrics_df.show()  # Inspect the DataFrame

if store_metrics_df.count() == 0:
    print("No data available for store visit segmentation.")
else:
    scaled_data_store = preprocess_data(store_metrics_df, ["Visit_Frequency", "Total_Spending"])

# Demographic Segmentation
demographic_df = demographic_segmentation(customer_df)
scaled_data_demographic = preprocess_data(demographic_df, ["Age", "Gender_index", "City_index", "State_index"])

# Loyalty Program Segmentation
loyalty_df = loyalty_program_segmentation(loyalty_df)
scaled_data_loyalty = preprocess_data(loyalty_df, ["Points_Earned", "Points_Redeemed", "Membership_Tier"])

# Dictionary to store models and their names
models = {
    "KMeans": KMeans(k=4, seed=1, featuresCol="scaled_features"),
    "BisectingKMeans": BisectingKMeans(k=4, seed=1, featuresCol="scaled_features"),
    "GaussianMixture": GaussianMixture(k=4, seed=1, featuresCol="scaled_features")
}

# Train and evaluate each model for each segmentation
segmentation_data = {
    "RFM": (scaled_data_rfm, ["Recency", "Frequency", "Monetary"]),
    "Product Purchase": (scaled_data_product, ["Purchase_Count", "Total_Spending"]),
    "Store Visit": (scaled_data_store if 'scaled_data_store' in locals() else None, ["Visit_Frequency", "Total_Spending"]),
    "Demographics": (scaled_data_demographic, ["Age", "Gender_index", "City_index", "State_index"]),
    "Loyalty Program": (scaled_data_loyalty, ["Points_Earned", "Points_Redeemed", "Membership_Tier"])
}

best_models = {}

for seg_name, (data, feature_columns) in segmentation_data.items():
    if data is not None and data.count() > 0:
        silhouette_scores = {}
        cluster_results = {}
        best_model_name = None
        best_score = float('-inf')
        
        for model_name, model in models.items():
            silhouette_score, clusters = train_and_evaluate_model(model, data, f"{seg_name} - {model_name}", feature_columns)
            if silhouette_score is not None:  # Only consider valid silhouette scores
                silhouette_scores[model_name] = silhouette_score
                cluster_results[model_name] = clusters
                
                if silhouette_score > best_score:
                    best_score = silhouette_score
                    best_model_name = model_name
        
        if best_model_name is not None:
            best_models[seg_name] = {
                "model_name": best_model_name,
                "silhouette_score": best_score,
                "clusters": cluster_results[best_model_name]
            }
            print(f"Best Model for {seg_name}: {best_model_name} with Silhouette Score = {best_score}")
        else:
            print(f"No valid clustering model found for {seg_name}.")
    else:
        print(f"No data available for segmentation: {seg_name}")

# Visualize clusters for the best RFM model
if "RFM" in best_models:
    rfm_clusters = best_models["RFM"]["clusters"]
    pandas_df = rfm_clusters.select("Recency", "Frequency", "Monetary", "prediction").toPandas()
    
    sns.pairplot(pandas_df, hue="prediction", palette="viridis")
    plt.show()

# End Spark session
spark.stop()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct, sum, col
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize Spark session
spark = SparkSession.builder.appName("Behavioral Segmentation").getOrCreate()

# Load and preprocess data
def load_data(spark):
    customer_df = read_dataframe_from_sqlite(spark, "customer_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    sales_df = read_dataframe_from_sqlite(spark, "sales_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    product_df = read_dataframe_from_sqlite(spark, "product_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    return customer_df, sales_df, product_df

# Product Variety and Spending Segmentation
def product_variety_segmentation(sales_df, product_df):
    # Join Sales and Product DataFrames
    sales_product_df = sales_df.join(product_df, "Product_ID")
    
    # Aggregate data: calculate the variety of products purchased and the total spending
    customer_behavioral_metrics = sales_product_df.groupBy("Customer_ID").agg(
        countDistinct("Category").alias("Product_Variety"),
        sum("Sales_Amount").alias("Total_Spending")
    )
    
    return customer_behavioral_metrics

# Preprocess data (assemble and scale features)
def preprocess_data(customer_behavioral_df, feature_columns):
    # Check if DataFrame is empty
    if customer_behavioral_df.count() == 0:
        raise ValueError("The DataFrame is empty. Cannot proceed with preprocessing.")
    
    # Fill missing values with zeros (if any)
    customer_behavioral_df = customer_behavioral_df.fillna(0)

    # Assemble features into a vector
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features", handleInvalid="skip")
    feature_vector = assembler.transform(customer_behavioral_df)
    
    # Apply Standard Scaler
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
    scaled_data = scaler.fit(feature_vector).transform(feature_vector)
    
    return scaled_data

# Train and evaluate clustering model
def train_and_evaluate_model(model, scaled_data, model_name):
    # Train the model
    trained_model = model.fit(scaled_data)
    clusters = trained_model.transform(scaled_data)
    
    # Evaluate the model using Silhouette Score
    evaluator = ClusteringEvaluator(featuresCol="scaled_features", metricName="silhouette", distanceMeasure="squaredEuclidean")
    silhouette = evaluator.evaluate(clusters)
    
    print(f"{model_name} Silhouette Score: {silhouette}")
    
    # Analyze each cluster's characteristics
    cluster_summary = clusters.groupBy("prediction").agg(
        avg("Product_Variety").alias("Avg_Product_Variety"),
        avg("Total_Spending").alias("Avg_Total_Spending")
    )
    cluster_summary.show()
    
    return silhouette, clusters

# Load data
customer_df, sales_df, product_df = load_data(spark)

# Perform Behavioral Segmentation
behavioral_df = product_variety_segmentation(sales_df, product_df)
scaled_data_behavioral = preprocess_data(behavioral_df, ["Product_Variety", "Total_Spending"])

# Apply KMeans Clustering
kmeans = KMeans(k=4, seed=1, featuresCol="scaled_features")
silhouette_score, clusters = train_and_evaluate_model(kmeans, scaled_data_behavioral, "Behavioral Segmentation")

# Visualize the clusters
pandas_df = clusters.select("Product_Variety", "Total_Spending", "prediction").toPandas()
sns.pairplot(pandas_df, hue="prediction", palette="viridis")
plt.show()

# End Spark session
spark.stop()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, countDistinct, max, datediff, current_date, mean, col, avg, year, when, count
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.clustering import KMeans, BisectingKMeans, GaussianMixture
from pyspark.ml.evaluation import ClusteringEvaluator
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize Spark session
spark = SparkSession.builder.appName("Comprehensive Customer Segmentation").getOrCreate()

# Function to load data
def read_dataframe_from_sqlite(spark, table_name, db_path, driver):
    return spark.read.format("jdbc").option("url", f"jdbc:sqlite:{db_path}")\
            .option("dbtable", table_name).option("driver", driver).load()

# Load and preprocess data
def load_data(spark):
    customer_df = read_dataframe_from_sqlite(spark, "customer_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    sales_df = read_dataframe_from_sqlite(spark, "sales_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    product_df = read_dataframe_from_sqlite(spark, "product_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    store_df = read_dataframe_from_sqlite(spark, "store_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    loyalty_df = read_dataframe_from_sqlite(spark, "loyalty_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    feedback_df = read_dataframe_from_sqlite(spark, "feedback_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    return customer_df, sales_df, product_df, store_df, loyalty_df, feedback_df

# Calculate RFM metrics with Time
def calculate_rfm_with_time(sales_df):
    rfm_metrics = sales_df.groupBy("Customer_ID").agg(
        datediff(current_date(), max("Date")).alias("Recency"),
        countDistinct("Transaction_ID").alias("Frequency"),
        sum("Sales_Amount").alias("Monetary"),
        datediff(current_date(), min("Date")).alias("Customer_Age")  # Add customer tenure/age
    )
    return rfm_metrics

# Loyalty Program Engagement Segmentation
def loyalty_program_engagement_segmentation(loyalty_df):
    loyalty_metrics = loyalty_df.groupBy("Customer_ID", "Membership_Tier").agg(
        sum("Points_Earned").alias("Total_Points_Earned"),
        sum(col("Points_Redeemed").cast(DoubleType())).alias("Total_Points_Redeemed")
    )

    # Verify that the Membership_Tier column exists
    if "Membership_Tier" not in loyalty_metrics.columns:
        raise ValueError("The column 'Membership_Tier' does not exist in the DataFrame.")
    
    # Include Membership_Tier as a feature
    indexer = StringIndexer(inputCol="Membership_Tier", outputCol="Membership_Tier_Index")
    loyalty_metrics = indexer.fit(loyalty_metrics).transform(loyalty_metrics)
    return loyalty_metrics

# Customer Lifecycle Segmentation
def customer_lifecycle_segmentation(rfm_df):
    rfm_df = rfm_df.withColumn(
        "Lifecycle_Segment",
        when(col("Recency") <= 30, "Active").when(col("Recency") <= 90, "Warm").otherwise("Inactive")
    )
    indexer = StringIndexer(inputCol="Lifecycle_Segment", outputCol="Lifecycle_Segment_Index")
    rfm_df = indexer.fit(rfm_df).transform(rfm_df)
    return rfm_df

# Product Affinity Segmentation
def product_affinity_segmentation(sales_df, product_df):
    product_affinity_df = sales_df.join(product_df, "Product_ID").groupBy("Customer_ID", "Category").agg(
        countDistinct("Transaction_ID").alias("Category_Purchase_Count"),
        sum("Sales_Amount").alias("Category_Spending")
    )
    return product_affinity_df

# Store Loyalty Segmentation
def store_loyalty_segmentation(sales_df, store_df):
    store_loyalty_df = sales_df.join(store_df, sales_df["Store_ID"] == store_df["Store_Location"], "left").groupBy(
        "Customer_ID", "Store_Type").agg(
        countDistinct("Transaction_ID").alias("Store_Visit_Frequency"),
        sum("Sales_Amount").alias("Store_Total_Spending")
    )
    return store_loyalty_df

# Feedback Sentiment Segmentation
def feedback_sentiment_segmentation(feedback_df):
    feedback_sentiment_df = feedback_df.groupBy("Customer_ID").agg(
        avg("Feedback_Rating").alias("Avg_Sentiment_Score"),  # Use Feedback_Rating as the sentiment score
        count("Feedback_ID").alias("Total_Feedbacks")
    )
    return feedback_sentiment_df

# Preprocess data (assemble and scale features)
def preprocess_data(customer_df, feature_columns):
    # Cast BigDecimal columns to DoubleType
    for feature in feature_columns:
        customer_df = customer_df.withColumn(feature, col(feature).cast(DoubleType()))
    
    # Check if DataFrame is empty
    if customer_df.count() == 0:
        raise ValueError("The DataFrame is empty. Cannot proceed with preprocessing.")
    
    # Calculate mean values with error handling for None
    mean_values = {}
    for feature in feature_columns:
        mean_value = customer_df.select(mean(feature)).first()[0]
        if mean_value is None:
            mean_value = 0.0  # Default to 0.0 if the mean is None
        mean_values[feature] = mean_value

    # Fill missing values with calculated mean values
    customer_df = customer_df.fillna(mean_values)

    # Assemble features into a vector
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features", handleInvalid="skip")
    feature_vector = assembler.transform(customer_df)
    
    # Check if features contain any data
    if feature_vector.select("features").head()[0].size == 0:
        raise ValueError("Feature vector is empty. Cannot proceed with scaling.")
    
    # Apply Standard Scaler
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
    scaled_data = scaler.fit(feature_vector).transform(feature_vector)
    
    return scaled_data

# Train and evaluate clustering model
def train_and_evaluate_model(model, scaled_data, model_name, feature_columns):
    # Check if DataFrame is empty
    if scaled_data.count() == 0:
        raise ValueError(f"The DataFrame for {model_name} is empty. Cannot proceed with model training.")
    
    trained_model = model.fit(scaled_data)
    clusters = trained_model.transform(scaled_data)
    
    # Check if the number of clusters is greater than one
    num_clusters = clusters.select("prediction").distinct().count()
    if num_clusters <= 1:
        print(f"{model_name}: Number of clusters is {num_clusters}. Cannot compute silhouette score.")
        return None, clusters
    
    evaluator = ClusteringEvaluator(featuresCol="scaled_features", metricName="silhouette", distanceMeasure="squaredEuclidean")
    silhouette = evaluator.evaluate(clusters)
    
    print(f"{model_name} Silhouette Score: {silhouette}")
    
    # Dynamically aggregate based on available features
    agg_exprs = [avg(col(feature)).alias(f"Avg_{feature}") for feature in feature_columns]
    
    cluster_summary = clusters.groupBy("prediction").agg(*agg_exprs)
    cluster_summary.show()
    
    return silhouette, clusters

# Function to plot clusters
def plot_clusters(clusters, feature_columns, seg_name):
    # Convert to Pandas DataFrame for plotting
    pandas_df = clusters.select(*feature_columns, "prediction", "Customer_ID").toPandas()
    
    # Plot pairplot if there are more than one feature
    if len(feature_columns) > 1:
        sns.pairplot(pandas_df, hue="prediction", palette="viridis")
        plt.suptitle(f"Cluster Plot for {seg_name}", y=1.02)
        plt.show()
    else:
        # Simple scatter plot if there's only one feature
        plt.figure(figsize=(10, 6))
        sns.scatterplot(x=feature_columns[0], y="prediction", data=pandas_df, hue="prediction", palette="viridis")
        plt.title(f"Cluster Plot for {seg_name}")
        plt.show()

# Load data
customer_df, sales_df, product_df, store_df, loyalty_df, feedback_df = load_data(spark)

# RFM Segmentation Enhanced with Time
rfm_df = calculate_rfm_with_time(sales_df)
customer_rfm_df = customer_df.join(rfm_df, "Customer_ID", "left")
scaled_data_rfm = preprocess_data(customer_rfm_df, ["Recency", "Frequency", "Monetary", "Customer_Age"])

# Loyalty Program Engagement Segmentation
loyalty_metrics_df = loyalty_program_engagement_segmentation(loyalty_df)
scaled_data_loyalty = preprocess_data(loyalty_metrics_df, ["Total_Points_Earned", "Total_Points_Redeemed", "Membership_Tier_Index"])

# Customer Lifecycle Segmentation
lifecycle_df = customer_lifecycle_segmentation(rfm_df)
scaled_data_lifecycle = preprocess_data(lifecycle_df, ["Recency", "Frequency", "Monetary", "Lifecycle_Segment_Index"])

# Product Affinity Segmentation
product_affinity_df = product_affinity_segmentation(sales_df, product_df)
scaled_data_product_affinity = preprocess_data(product_affinity_df, ["Category_Purchase_Count", "Category_Spending"])

# Store Loyalty Segmentation
store_loyalty_df = store_loyalty_segmentation(sales_df, store_df)
scaled_data_store_loyalty = preprocess_data(store_loyalty_df, ["Store_Visit_Frequency", "Store_Total_Spending"])

# Feedback Sentiment Segmentation
feedback_sentiment_df = feedback_sentiment_segmentation(feedback_df)
scaled_data_feedback_sentiment = preprocess_data(feedback_sentiment_df, ["Avg_Sentiment_Score", "Total_Feedbacks"])

# Dictionary to store models and their names
models = {
    "KMeans": KMeans(k=4, seed=1, featuresCol="scaled_features"),
    "BisectingKMeans": BisectingKMeans(k=4, seed=1, featuresCol="scaled_features"),
    "GaussianMixture": GaussianMixture(k=4, seed=1, featuresCol="scaled_features")
}

# Train and evaluate each model for each segmentation
segmentation_data = {
    "RFM with Time": (scaled_data_rfm, ["Recency", "Frequency", "Monetary", "Customer_Age"]),
    "Loyalty Program Engagement": (scaled_data_loyalty, ["Total_Points_Earned", "Total_Points_Redeemed", "Membership_Tier_Index"]),
    "Customer Lifecycle": (scaled_data_lifecycle, ["Recency", "Frequency", "Monetary", "Lifecycle_Segment_Index"]),
    "Product Affinity": (scaled_data_product_affinity, ["Category_Purchase_Count", "Category_Spending"]),
    "Store Loyalty": (scaled_data_store_loyalty, ["Store_Visit_Frequency", "Store_Total_Spending"]),
    "Feedback Sentiment": (scaled_data_feedback_sentiment, ["Avg_Sentiment_Score", "Total_Feedbacks"])
}

best_models = {}
segmented_customers = {}

for seg_name, (data, feature_columns) in segmentation_data.items():
    if data is not None and data.count() > 0:
        silhouette_scores = {}
        cluster_results = {}
        best_model_name = None
        best_score = float('-inf')
        
        for model_name, model in models.items():
            silhouette_score, clusters = train_and_evaluate_model(model, data, f"{seg_name} - {model_name}", feature_columns)
            if silhouette_score is not None:  # Only consider valid silhouette scores
                silhouette_scores[model_name] = silhouette_score
                cluster_results[model_name] = clusters
                
                if silhouette_score > best_score:
                    best_score = silhouette_score
                    best_model_name = model_name
        
        if best_model_name is not None:
            best_models[seg_name] = {
                "model_name": best_model_name,
                "silhouette_score": best_score,
                "clusters": cluster_results[best_model_name]
            }
            print(f"Best Model for {seg_name}: {best_model_name} with Silhouette Score = {best_score}")
            
            # Plot clusters
            plot_clusters(best_models[seg_name]["clusters"], feature_columns, seg_name)
            
            # Retrieve and store customer IDs and their clusters
            customer_clusters = best_models[seg_name]["clusters"].select("Customer_ID", "prediction").toPandas()
            segmented_customers[seg_name] = customer_clusters
            print(customer_clusters.head())  # Display the first few rows
        else:
            print(f"No valid clustering model found for {seg_name}.")
    else:
        print(f"No data available for segmentation: {seg_name}")

# End Spark session
spark.stop()


In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, countDistinct, max, min, datediff, current_date, col, to_date, avg, when, count, mean
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.clustering import KMeans, BisectingKMeans, GaussianMixture
from pyspark.ml.evaluation import ClusteringEvaluator

# Initialize Spark session
spark = SparkSession.builder.appName("Comprehensive Customer Segmentation").getOrCreate()

# Function to load data
def read_dataframe_from_sqlite(spark, table_name, db_path, driver):
    return spark.read.format("jdbc").option("url", f"jdbc:sqlite:{db_path}")\
            .option("dbtable", table_name).option("driver", driver).load()

def load_data(spark):
    customer_df = read_dataframe_from_sqlite(spark, "customer_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    sales_df = read_dataframe_from_sqlite(spark, "sales_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    product_df = read_dataframe_from_sqlite(spark, "product_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    store_df = read_dataframe_from_sqlite(spark, "store_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    loyalty_df = read_dataframe_from_sqlite(spark, "loyalty_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    feedback_df = read_dataframe_from_sqlite(spark, "feedback_sdf", "../data/raw/retail_data.db","org.sqlite.JDBC")
    return customer_df, sales_df, product_df, store_df, loyalty_df, feedback_df

# Calculate RFM metrics with Time
def calculate_rfm_with_time(sales_df):
    sales_df = sales_df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))
    rfm_metrics = sales_df.groupBy("Customer_ID").agg(
        datediff(current_date(), max("Date")).alias("Recency"),  # Calculate recency
        countDistinct("Transaction_ID").alias("Frequency"),      # Calculate frequency
        sum("Sales_Amount").alias("Monetary"),                   # Calculate monetary value
        datediff(current_date(), min("Date")).alias("Customer_Age")  # Calculate customer age/tenure
    )
    return rfm_metrics

# Loyalty Program Engagement Segmentation
def loyalty_program_engagement_segmentation(loyalty_df):
    loyalty_metrics = loyalty_df.groupBy("Customer_ID", "Membership_Tier").agg(
        sum("Points_Earned").alias("Total_Points_Earned"),
        sum(col("Points_Redeemed").cast(DoubleType())).alias("Total_Points_Redeemed")
    )

    # Verify that the Membership_Tier column exists
    if "Membership_Tier" not in loyalty_metrics.columns:
        raise ValueError("The column 'Membership_Tier' does not exist in the DataFrame.")
    
    # Include Membership_Tier as a feature
    indexer = StringIndexer(inputCol="Membership_Tier", outputCol="Membership_Tier_Index")
    loyalty_metrics = indexer.fit(loyalty_metrics).transform(loyalty_metrics)
    return loyalty_metrics

# Customer Lifecycle Segmentation
def customer_lifecycle_segmentation(rfm_df):
    rfm_df = rfm_df.withColumn(
        "Lifecycle_Segment",
        when(col("Recency") <= 30, "Active").when(col("Recency") <= 90, "Warm").otherwise("Inactive")
    )
    indexer = StringIndexer(inputCol="Lifecycle_Segment", outputCol="Lifecycle_Segment_Index")
    rfm_df = indexer.fit(rfm_df).transform(rfm_df)
    return rfm_df

# Product Affinity Segmentation
def product_affinity_segmentation(sales_df, product_df):
    product_affinity_df = sales_df.join(product_df, "Product_ID").groupBy("Customer_ID", "Category").agg(
        countDistinct("Transaction_ID").alias("Category_Purchase_Count"),
        sum("Sales_Amount").alias("Category_Spending")
    )
    return product_affinity_df

# Store Loyalty Segmentation
def store_loyalty_segmentation(sales_df, store_df):
    store_loyalty_df = sales_df.join(store_df, sales_df["Store_ID"] == store_df["Store_Location"], "left").groupBy(
        "Customer_ID", "Store_Type").agg(
        countDistinct("Transaction_ID").alias("Store_Visit_Frequency"),
        sum("Sales_Amount").alias("Store_Total_Spending")
    )
    return store_loyalty_df

# Feedback Sentiment Segmentation
def feedback_sentiment_segmentation(feedback_df):
    feedback_sentiment_df = feedback_df.groupBy("Customer_ID").agg(
        avg("Feedback_Rating").alias("Avg_Sentiment_Score"),
        count("Feedback_ID").alias("Total_Feedbacks")
    )
    return feedback_sentiment_df

# Preprocess data (assemble and scale features)
def preprocess_data(customer_df, feature_columns):
    for feature in feature_columns:
        customer_df = customer_df.withColumn(feature, col(feature).cast(DoubleType()))
    
    if customer_df.count() == 0:
        raise ValueError("The DataFrame is empty. Cannot proceed with preprocessing.")
    
    mean_values = {}
    for feature in feature_columns:
        mean_value = customer_df.select(mean(feature)).first()[0]
        if mean_value is None:
            mean_value = 0.0  # Default to 0.0 if the mean is None
        mean_values[feature] = mean_value

    customer_df = customer_df.fillna(mean_values)
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    feature_vector = assembler.transform(customer_df)
    
    if feature_vector.select("features").head()[0].size == 0:
        raise ValueError("Feature vector is empty. Cannot proceed with scaling.")
    
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
    scaled_data = scaler.fit(feature_vector).transform(feature_vector)
    return scaled_data

# Train and evaluate clustering model
def train_and_evaluate_model(model, scaled_data, model_name, feature_columns):
    if scaled_data.count() == 0:
        raise ValueError(f"The DataFrame for {model_name} is empty. Cannot proceed with model training.")
    
    trained_model = model.fit(scaled_data)
    clusters = trained_model.transform(scaled_data)
    
    num_clusters = clusters.select("prediction").distinct().count()
    if num_clusters <= 1:
        print(f"{model_name}: Number of clusters is {num_clusters}. Cannot compute silhouette score.")
        return None, clusters
    
    evaluator = ClusteringEvaluator(featuresCol="scaled_features", metricName="silhouette", distanceMeasure="squaredEuclidean")
    silhouette = evaluator.evaluate(clusters)
    
    print(f"{model_name} Silhouette Score: {silhouette}")
    
    agg_exprs = [avg(col(feature)).alias(f"Avg_{feature}") for feature in feature_columns]
    cluster_summary = clusters.groupBy("prediction").agg(*agg_exprs)
    return silhouette, clusters, cluster_summary

# Load data
customer_df, sales_df, product_df, store_df, loyalty_df, feedback_df = load_data(spark)


# RFM Segmentation Enhanced with Time
rfm_df = calculate_rfm_with_time(sales_df)
customer_rfm_df = customer_df.join(rfm_df, "Customer_ID", "left")
scaled_data_rfm = preprocess_data(customer_rfm_df, ["Recency", "Frequency", "Monetary", "Customer_Age"])

# Loyalty Program Engagement Segmentation
loyalty_metrics_df = loyalty_program_engagement_segmentation(loyalty_df)
scaled_data_loyalty = preprocess_data(loyalty_metrics_df, ["Total_Points_Earned", "Total_Points_Redeemed", "Membership_Tier_Index"])

# Customer Lifecycle Segmentation
lifecycle_df = customer_lifecycle_segmentation(rfm_df)
scaled_data_lifecycle = preprocess_data(lifecycle_df, ["Recency", "Frequency", "Monetary", "Lifecycle_Segment_Index"])

# Product Affinity Segmentation
product_affinity_df = product_affinity_segmentation(sales_df, product_df)
scaled_data_product_affinity = preprocess_data(product_affinity_df, ["Category_Purchase_Count", "Category_Spending"])

# Store Loyalty Segmentation
store_loyalty_df = store_loyalty_segmentation(sales_df, store_df)
scaled_data_store_loyalty = preprocess_data(store_loyalty_df, ["Store_Visit_Frequency", "Store_Total_Spending"])

# Feedback Sentiment Segmentation
feedback_sentiment_df = feedback_sentiment_segmentation(feedback_df)
scaled_data_feedback_sentiment = preprocess_data(feedback_sentiment_df, ["Avg_Sentiment_Score", "Total_Feedbacks"])

# Dictionary to store models and their names
models = {
    "KMeans": KMeans(k=4, seed=1, featuresCol="scaled_features"),
    "BisectingKMeans": BisectingKMeans(k=4, seed=1, featuresCol="scaled_features"),
    "GaussianMixture": GaussianMixture(k=4, seed=1, featuresCol="scaled_features")
}

# Dictionary for grid search parameters
k_values = [2, 3, 4, 5, 6, 7, 8, 9, 10]

# Train and evaluate each model for each segmentation
segmentation_data = {
    "RFM_with_Time": (scaled_data_rfm, ["Recency", "Frequency", "Monetary", "Customer_Age"]),
    "Loyalty_Program_Engagement": (scaled_data_loyalty, ["Total_Points_Earned", "Total_Points_Redeemed", "Membership_Tier_Index"]),
    "Customer_Lifecycle": (scaled_data_lifecycle, ["Recency", "Frequency", "Monetary", "Lifecycle_Segment_Index"]),
    "Product_Affinity": (scaled_data_product_affinity, ["Category_Purchase_Count", "Category_Spending"]),
    "Store_Loyalty": (scaled_data_store_loyalty, ["Store_Visit_Frequency", "Store_Total_Spending"]),
    "Feedback_Sentiment": (scaled_data_feedback_sentiment, ["Avg_Sentiment_Score", "Total_Feedbacks"])
}

for seg_name, (data, feature_columns) in segmentation_data.items():
    print(f"Starting segmentation: {seg_name}")
    
    best_silhouette = float('-inf')
    best_model_name = None
    best_clusters = None
    best_summary = None

    # Grid Search for KMeans
    kmeans_results = grid_search_kmeans(data, feature_columns, k_values)
    
    # Save KMeans results
    for k, silhouette, clusters, summary in kmeans_results:
        model_name = f"KMeans_k{k}"
        save_results_to_csv(clusters, summary, seg_name, model_name)
        if silhouette > best_silhouette:
            best_silhouette = silhouette
            best_model_name = model_name
            best_clusters = clusters
            best_summary = summary
    
    # Evaluate and save BisectingKMeans and GaussianMixture
    for model_name, model in models.items():
        silhouette, clusters, summary = train_and_evaluate_model(model, data, f"{seg_name} - {model_name}", feature_columns)
        if silhouette is not None:
            save_results_to_csv(clusters, summary, seg_name, model_name)
            if silhouette > best_silhouette:
                best_silhouette = silhouette
                best_model_name = model_name
                best_clusters = clusters
                best_summary = summary
    
    # Save the best model's clusters
    if best_clusters is not None:
        save_results_to_csv(best_clusters, best_summary, seg_name, best_model_name, is_best=True)

# End Spark session
spark.stop()


                                                                                

Starting segmentation: RFM_with_Time


                                                                                

KMeans (k=2) Silhouette Score: 0.5055794891292529


                                                                                

KMeans (k=3) Silhouette Score: 0.5086703340672766


                                                                                

KMeans (k=4) Silhouette Score: 0.5526579558454616


ERROR:root:Exception while sending command.                                     
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
RuntimeError: reentrant call inside <_io.BufferedReader name=78>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/anaconda3/lib/python3.12/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.12/

Py4JError: An error occurred while calling o12412.fit

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType
from pyspark.sql.window import Window  # Importing Window
import time
from pyspark.ml.feature import VectorAssembler, StandardScaler, StringIndexer
from pyspark.ml.clustering import KMeans, BisectingKMeans, GaussianMixture
from pyspark.ml.evaluation import ClusteringEvaluator
from datetime import datetime
import pandas as pd
import sqlite3
import os

# Initialize Spark session
spark = SparkSession.builder.appName("Comprehensive Customer Segmentation").getOrCreate()

def get_timestamp():
    return datetime.now().strftime("%Y%m%d%H%M%S")
    
def read_dataframe_from_sqlite(table_name, db_path):
    return spark.read.format("jdbc").option("url", f"jdbc:sqlite:{db_path}")\
            .option("dbtable", table_name).option("driver", "org.sqlite.JDBC").load()

def write_df_to_sqlite(df, table_name, db_path):
    conn = sqlite3.connect(db_path)
    df.to_sql(table_name, conn, if_exists='append', index=False)
    conn.close()

def get_next_version_number(model_name, db_path):
    """Retrieve and increment the version number for a given model from the database."""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute("SELECT MAX(Version) FROM model_info WHERE Model_Name = ?", (model_name,))
    result = cursor.fetchone()
    current_version = result[0] if result[0] is not None else 0
    next_version = current_version + 1
    conn.close()
    return next_version

def save_spark_model(model, model_name, db_path):
    version = get_next_version_number(model_name, db_path)
    date_path = datetime.now().strftime("%Y%m%d")
    base_path = f"./results/models/{date_path}"
    os.makedirs(base_path, exist_ok=True)
    file_path = f"{base_path}/{model_name}_v{version}"
    model.save(file_path)
    return file_path, version
    
def save_model_info_to_db(model_name, file_path, db_path, silhouette_score, version, training_time):
    timestamp = datetime.now()
    model_info_df = pd.DataFrame([(model_name, file_path, timestamp, silhouette_score, version, training_time)],
                                 columns=["Model_Name", "File_Path", "Saved_At", "Silhouette_Score", "Version", "Training_Time"])
    write_df_to_sqlite(model_info_df, "model_info", db_path)


from pyspark.sql.functions import when, col, lit

def preprocess_and_cluster(data, feature_columns, k_values, model_types, segmentation_name,db_path):
    if data.rdd.isEmpty():
        print(f"No data available for segmentation: {segmentation_name}")
        return None  # Return None or an empty DataFrame placeholder if needed
    
    assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
    data = assembler.transform(data.na.fill(0))  # Example: Fill NA with 0 or handle appropriately

    if data.select(feature_columns).dropna().rdd.isEmpty():
        print(f"All feature data are null for {segmentation_name}")
        return None

    scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
    try:
        model = scaler.fit(data)
        data = model.transform(data)
    except Exception as e:
        print(f"Failed to scale data for {segmentation_name}: {str(e)}")
        return None

    best_score = -1
    best_model_details = None
    for k in k_values:
        for model_type in model_types:
            model = model_types[model_type](k=k, featuresCol="scaled_features")
            try:
                start_time = time.time()
                trained_model = model.fit(data)
                predictions = trained_model.transform(data)
                training_time = time.time() - start_time 
            except Exception as e:
                print(f"Failed to fit or transform model {model_type} for {segmentation_name} with k={k}: {str(e)}")
                continue

            evaluator = ClusteringEvaluator(featuresCol="scaled_features", metricName="silhouette")
            silhouette = evaluator.evaluate(predictions)
            if silhouette > best_score:
                best_score = silhouette
                file_path, version = save_spark_model(trained_model, f"{segmentation_name}-{model_type}-k{k}", db_path)
                best_model_details = (model_type, k, file_path, silhouette, predictions,training_time, version)
    
    if best_model_details:
        model_type, k, file_path, silhouette, best_predictions, training_time, version = best_model_details
        save_model_info_to_db(f"{segmentation_name}-{model_type}-k{k}", file_path, db_path, silhouette, version, training_time)
        best_predictions = best_predictions.withColumnRenamed("prediction", "cluster")
        return best_predictions.withColumn("Timestamp", current_timestamp())

    return None
    

def load_data():
    db_path = "../data/raw/retail_data.db"
    return (
        read_dataframe_from_sqlite("customer_sdf", db_path),
        read_dataframe_from_sqlite("sales_sdf", db_path),
        read_dataframe_from_sqlite("product_sdf", db_path),
        read_dataframe_from_sqlite("store_sdf", db_path),
        read_dataframe_from_sqlite("loyalty_sdf", db_path),
        read_dataframe_from_sqlite("feedback_sdf", db_path)
    )

def calculate_rfm_with_time(sales_df):
    # Ensure Date is properly formatted and retained before aggregation
    sales_df = sales_df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))
    
    # Window specification to calculate the earliest date for each customer
    windowSpec = Window.partitionBy("Customer_ID")
    
    # Calculate maximum purchase date, frequency, and monetary while retaining Date for further operations
    rfm_df = sales_df.groupBy("Customer_ID").agg(
        max("Date").alias("Last_Purchase_Date"),
        min("Date").alias("First_Purchase_Date"),
        countDistinct("Transaction_ID").alias("Frequency"),
        sum("Sales_Amount").alias("Monetary")
    )

    # Calculate Recency and Customer Age
    rfm_df = rfm_df.withColumn("Recency", datediff(current_date(), col("Last_Purchase_Date")))
    rfm_df = rfm_df.withColumn("Customer_Age", datediff(current_date(), col("First_Purchase_Date")))
    
    return rfm_df

# Add other segmentation functions here
def loyalty_program_engagement_segmentation(loyalty_df):
    # Assuming Loyalty dataframe has necessary fields
    indexer = StringIndexer(inputCol="Membership_Tier", outputCol="Membership_Tier_Index")
    loyalty_df = loyalty_df.withColumn("Points_Redeemed", col("Points_Redeemed").cast(DoubleType()))
    loyalty_df = loyalty_df.groupBy("Customer_ID").agg(
        sum("Points_Earned").alias("Total_Points_Earned"),
        sum("Points_Redeemed").alias("Total_Points_Redeemed"),
        first("Membership_Tier").alias("Membership_Tier")
    )
    return indexer.fit(loyalty_df).transform(loyalty_df)

def customer_lifecycle_segmentation(rfm_df):
    # Adding a lifecycle segment based on recency
    lifecycle_df = rfm_df.withColumn(
        "Lifecycle_Segment",
        when(col("Recency") <= 30, "Active")
        .when((col("Recency") > 30) & (col("Recency") <= 90), "Warm")
        .otherwise("Inactive")
    )
    # Convert categorical column to numeric using StringIndexer
    indexer = StringIndexer(inputCol="Lifecycle_Segment", outputCol="Lifecycle_Segment_Index")
    lifecycle_df = indexer.fit(lifecycle_df).transform(lifecycle_df)
    return lifecycle_df


def product_affinity_segmentation(sales_df, product_df):
    # Assuming sales_df and product_df have been joined and necessary aggregations have been made
    return sales_df.join(product_df, "Product_ID").groupBy("Customer_ID", "Category").agg(
        countDistinct("Transaction_ID").alias("Category_Purchase_Count"),
        sum("Sales_Amount").alias("Category_Spending")
    )

def store_loyalty_segmentation(sales_df, store_df):
    # Assuming necessary data is available in store_df and has been appropriately joined
    return sales_df.join(store_df, "Store_ID").groupBy(
        "Customer_ID", "Store_Type").agg(
        countDistinct("Transaction_ID").alias("Store_Visit_Frequency"),
        sum("Sales_Amount").alias("Store_Total_Spending")
    )

def feedback_sentiment_segmentation(feedback_df):
    # Assume feedback_df contains 'Customer_ID' and 'Feedback_Rating'
    return feedback_df.groupBy("Customer_ID").agg(
        avg("Feedback_Rating").alias("Avg_Sentiment_Score")
    )
    
def main():
    customer_df, sales_df, product_df, store_df, loyalty_df, feedback_df = load_data()

    segmentations = {
        "RFM_with_Time": {
        "data_func": lambda: calculate_rfm_with_time(sales_df),
        "features": ["Recency", "Frequency", "Monetary", "Customer_Age"]
        },
        "Loyalty_Program_Engagement": {
            "data_func": lambda: loyalty_program_engagement_segmentation(loyalty_df),
            "features": ["Total_Points_Earned", "Total_Points_Redeemed", "Membership_Tier_Index"]
        },
        "Customer_Lifecycle": {
            "data_func": lambda: customer_lifecycle_segmentation(calculate_rfm_with_time(sales_df)),
            "features": ["Recency", "Lifecycle_Segment_Index"]
        },
        "Product_Affinity": {
            "data_func": lambda: product_affinity_segmentation(sales_df, product_df),
            "features": ["Category_Purchase_Count", "Category_Spending"]
        },
        "Store_Loyalty": {
            "data_func": lambda: store_loyalty_segmentation(sales_df, store_df),
            "features": ["Store_Visit_Frequency", "Store_Total_Spending"]
        },
        "Feedback_Sentiment": {
            "data_func": lambda: feedback_sentiment_segmentation(feedback_df),
            "features": ["Avg_Sentiment_Score"]
        }
    }

    
    model_types = {
        "KMeans": KMeans,
        "BisectingKMeans": BisectingKMeans,
        "GaussianMixture": GaussianMixture
    }
    k_values = [2, 3, 4, 5, 6, 7, 8, 9, 10]

    for seg_name, seg_info in segmentations.items():
        seg_data = seg_info["data_func"]()
        predictions = preprocess_and_cluster(seg_data, seg_info["features"], k_values, model_types, seg_name,"../data/raw/retail_data.db")
        if predictions:
            predictions_pandas = predictions.select("Customer_ID", "cluster", "Timestamp").toPandas()
            write_df_to_sqlite(predictions_pandas, f"{seg_name}_results", "../data/raw/retail_data.db")

    spark.stop()

def create_or_update_table(db_path):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    # This will create the table if it does not exist with the appropriate columns
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS model_info (
            Model_Name TEXT,
            File_Path TEXT,
            Saved_At TIMESTAMP,
            Silhouette_Score FLOAT,
            Version INTEGER,
            Training_Time FLOAT
        );
    ''')
    conn.commit()
    conn.close()

def write_df_to_sqlite(df, table_name, db_path):
    create_or_update_table(db_path)  # Ensure the table exists with correct schema
    conn = sqlite3.connect(db_path)
    df.to_sql(table_name, conn, if_exists='append', index=False)
    conn.close()
    
if __name__ == "__main__":
    create_or_update_table("../data/raw/retail_data.db")  # Ensure DB schema is correct
    main()

                                                                                

24/08/15 23:21:25 WARN Utils: Your hostname, Aadarshs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.0.0.50 instead (on interface en0)
24/08/15 23:21:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/08/15 23:21:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-08-15 23:21:30,311 - INFO - Database table 'model_info' checked or updated.
2024-08-15 23:21:30,311 - INFO - Reading DataFrame from SQLite: Table=customer_sdf, DB=../data/raw/retail_data.db
2024-08-15 23:21:33,897 - INFO - Reading DataFrame from SQLite: Table=sales_sdf, DB=../data/raw/retail_data.db
2024-08-15 23:21:33,947 - INFO - Reading DataFrame from SQLite: Table=product_sdf, DB=../data/raw/retail_data.db
2024-08-15 23:21:34,006 - INFO - Reading DataFrame from SQLite