# **Phase 3**

# Distributed Data Cleaning and Pre-Processing

In [47]:
spark.stop()

In [48]:
import re
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Initialize Spark Session
# Initialize Spark Session
spark = SparkSession.builder.appName("Retail Data Cleaning").getOrCreate()
sc = spark.sparkContext

# Read the CSV file
xls_file = pd.read_excel('Online Retail.xlsx')
xls_file.to_csv('Online_Retail.csv', index=False)
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('Online_Retail.csv')

# DISTRIBUTED OPERATION 1: Remove duplicates using distributed distinct operation
df = df.distinct()

# DISTRIBUTED OPERATION 2: Clean and transform data using distributed map operations
df = df.dropna() \
    .withColumn("CustomerID", col("CustomerID").cast("integer")) \
    .withColumn("InvoiceDate", to_timestamp("InvoiceDate")) \
    .withColumn("Total_Price", col("Quantity") * col("UnitPrice")) \
    .withColumn("Year", year("InvoiceDate")) \
    .withColumn("Month", month("InvoiceDate")) \
    .withColumn("DayOfWeek", dayofweek("InvoiceDate"))

# DISTRIBUTED OPERATION 3: Season calculation using distributed UDF
@udf(returnType=StringType())
def get_season(month):
    if month in [12, 1, 2]: return 'Winter'
    elif month in [3, 4, 5]: return 'Spring'
    elif month in [6, 7, 8]: return 'Summer'
    else: return 'Autumn'

df = df.withColumn("Season", get_season(col("Month")))

# DISTRIBUTED OPERATION 4: Filter invalid data using distributed filtering
df = df.filter((col("Quantity") > 0) & (col("UnitPrice") > 0))

# DISTRIBUTED OPERATION 5: Text cleaning using distributed UDF
@udf(returnType=StringType())
def clean_text(text):
    if text is None: return None
    # Remove special characters and extra spaces
    cleaned = re.sub(r'[^\w\s]', '', str(text)).strip()
    return cleaned

string_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]
for column in string_columns:
    df = df.withColumn(column, clean_text(col(column)))

# DISTRIBUTED OPERATION 6: RFM Analysis using window functions
window_spec = Window.orderBy("InvoiceDate")
max_date = df.agg(max("InvoiceDate")).collect()[0][0]

rfm_df = df.groupBy("CustomerID").agg(
    datediff(lit(max_date), max("InvoiceDate")).alias("Recency"),
    countDistinct("InvoiceNo").alias("Frequency"),
    sum("Total_Price").alias("Monetary")
)

# Calculate quartiles using window functions
window_quartile = Window.orderBy("Recency")
rfm_df = rfm_df.withColumn("R", ntile(4).over(window_quartile))
window_quartile = Window.orderBy("Frequency")
rfm_df = rfm_df.withColumn("F", ntile(4).over(window_quartile))
window_quartile = Window.orderBy("Monetary")
rfm_df = rfm_df.withColumn("M", ntile(4).over(window_quartile))

# Create RFM Score
rfm_df = rfm_df.withColumn("RFM_Score",
    concat(col("R").cast("string"),
          col("F").cast("string"),
          col("M").cast("string")))

# Join RFM metrics back to main dataframe
df = df.join(rfm_df, "CustomerID", "left")

# Cache the DataFrame for better performance in subsequent operations
df.cache()

# Show the results
df.select("CustomerID", "Recency", "Frequency", "Monetary", "RFM_Score").show(5)

+----------+-------+---------+-----------------+---------+
|CustomerID|Recency|Frequency|         Monetary|RFM_Score|
+----------+-------+---------+-----------------+---------+
|     15727|     16|        7|          5159.06|      144|
|     13623|     30|        5|727.7400000000001|      243|
|     13623|     30|        5|727.7400000000001|      243|
|     15727|     16|        7|          5159.06|      144|
|     15727|     16|        7|          5159.06|      144|
+----------+-------+---------+-----------------+---------+
only showing top 5 rows



# Dag Visualization

In [49]:
!pip install -q pyngrok
from pyngrok import ngrok


In [50]:
!ngrok config add-authtoken 2od6RWKaNlm979WXnmxhHKvD6r7_4ENE12xfLibJZ654juXn


Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [51]:
print("Spark UI running on:", sc.uiWebUrl)

Spark UI running on: http://3da39760f058:4040


In [52]:
ngrok.kill()


In [54]:
public_url = ngrok.connect(4040)
print("Spark UI:", public_url)





Spark UI: NgrokTunnel: "https://bf18-34-86-2-41.ngrok-free.app" -> "http://localhost:4040"


# Algorithms

In [55]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.clustering import BisectingKMeans, KMeans, GaussianMixture
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression, LinearSVC, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.sql.functions import col, when
from pyspark.sql.types import DoubleType
import time

# Sample the data
sample_size = 10000
df_sample = df.sample(withReplacement=False, fraction=sample_size/df.count(), seed=42)
# Prepare features
feature_cols = ['Recency', 'Frequency', 'Monetary']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_vector = assembler.transform(df_sample)
# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(df_vector)
df_scaled = scaler_model.transform(df_vector)

In [56]:
# 1. Hierarchical Clustering (Using BisectingKMeans as alternative since MLlib doesn't have hierarchical)
start_time = time.time()
bkm = BisectingKMeans(k=4, featuresCol="scaled_features")
model_bkm = bkm.fit(df_scaled)
df_bkm = model_bkm.transform(df_scaled)
evaluator = ClusteringEvaluator(predictionCol="prediction", featuresCol="scaled_features")
silhouette_bkm = evaluator.evaluate(df_bkm)
end_time = time.time()
execution_time = end_time - start_time
print(f"Bisecting KMeans Silhouette Score: {silhouette_bkm:.4f}")
print(f"Execution Time: {execution_time:.4f} seconds")

Bisecting KMeans Silhouette Score: 0.8470
Execution Time: 112.6334 seconds


In [57]:
# 2. K-Means
start_time = time.time()
kmeans = KMeans(k=4, featuresCol="scaled_features")
model_kmeans = kmeans.fit(df_scaled)
df_kmeans = model_kmeans.transform(df_scaled)
silhouette_kmeans = evaluator.evaluate(df_kmeans)
end_time = time.time()
execution_time = end_time - start_time
print(f"KMeans Silhouette Score: {silhouette_kmeans:.4f}")
print(f"Execution Time: {execution_time:.4f} seconds")

KMeans Silhouette Score: 0.8614
Execution Time: 43.5863 seconds


In [58]:
# 3. Linear Regression
start_time = time.time()
assembler_lr = VectorAssembler(inputCols=['Recency', 'Frequency'], outputCol="features")
df_lr = assembler_lr.transform(df_sample)
lr = LinearRegression(featuresCol="features", labelCol="Monetary")
lr_model = lr.fit(df_lr)
predictions_lr = lr_model.transform(df_lr)
evaluator_lr = RegressionEvaluator(labelCol="Monetary", predictionCol="prediction")
r2 = evaluator_lr.evaluate(predictions_lr, {evaluator_lr.metricName: "r2"})
mse = evaluator_lr.evaluate(predictions_lr, {evaluator_lr.metricName: "mse"})
end_time = time.time()
execution_time = end_time - start_time
print(f"Linear Regression R2: {r2:.4f}")
print(f"Linear Regression MSE: {mse:.4f}")
print(f"Execution Time: {execution_time:.4f} seconds")

Linear Regression R2: 0.3686
Linear Regression MSE: 620738043.0074
Execution Time: 10.3524 seconds


In [59]:
# 4. Logistic Regression
start_time = time.time()
df_sample = df_sample.withColumn("RFM_Score_numeric", col("RFM_Score").cast(DoubleType()))
feature_cols_log = ['Recency', 'Frequency', 'Monetary', 'Total_Price', 'Quantity']
assembler_log = VectorAssembler(inputCols=feature_cols_log, outputCol="features")
df_log = assembler_log.transform(df_sample)
lr_classifier = LogisticRegression(featuresCol="features", labelCol="RFM_Score_numeric")
lr_model = lr_classifier.fit(df_log)
predictions_lr = lr_model.transform(df_log)
# Evaluation metrics
evaluator_lr = MulticlassClassificationEvaluator(labelCol="RFM_Score_numeric", predictionCol="prediction")
accuracy_lr = evaluator_lr.evaluate(predictions_lr)
end_time = time.time()
execution_time = end_time - start_time
print(f"Logistic Regression Accuracy: {accuracy_lr:.4f}")
print(f"Execution Time: {execution_time:.4f} seconds")

Logistic Regression Accuracy: 0.7578
Execution Time: 210.2223 seconds


In [60]:
# 5. Random Forest Model
start_time = time.time()
distinct_rfm_scores = df_log.select("RFM_Score_numeric").distinct().collect()
rfm_mapping = {row["RFM_Score_numeric"]: idx for idx, row in enumerate(distinct_rfm_scores)}
rfm_mapping_broadcast = sc.broadcast(rfm_mapping)
mapping_expr = when(col("RFM_Score_numeric").isNull(), None)
for key, value in rfm_mapping_broadcast.value.items():
    mapping_expr = mapping_expr.when(col("RFM_Score_numeric") == key, value)
df_log = df_log.withColumn("label", mapping_expr)
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=10)
rf_model = rf.fit(df_log)
predictions_rf = rf_model.transform(df_log)
# Evaluation metrics
evaluator_rf = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_rf = evaluator_rf.evaluate(predictions_rf)
end_time = time.time()
execution_time = end_time - start_time
print(f"Random Forest Accuracy: {accuracy_rf:.4f}")
print(f"Execution Time: {execution_time:.4f} seconds")

Random Forest Accuracy: 0.8541
Execution Time: 65.3989 seconds


In [61]:
# 6. Gaussian Mixture Model (GMM)
start_time = time.time()
gmm = GaussianMixture(k=4, featuresCol="scaled_features")
model_gmm = gmm.fit(df_scaled)
df_gmm = model_gmm.transform(df_scaled)
silhouette_gmm = evaluator.evaluate(df_gmm)
end_time = time.time()
execution_time = end_time - start_time
print(f"Gaussian Mixture Model Silhouette Score: {silhouette_gmm:.4f}")
print(f"Execution Time: {execution_time:.4f} seconds")

Gaussian Mixture Model Silhouette Score: 0.1937
Execution Time: 106.1357 seconds


## **BONUS**

https://dicproject-yvcnsqljtsyh7tqccnpml9.streamlit.app/

The above link is the streamlit app for the product developed for the customer segmentation using Kmeans Algorithm.

