<a href="https://colab.research.google.com/github/Haadhi-Mohammed/Projects/blob/main/Customer_Segmentation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# installing PySpark

!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=e89ee1578f39b0e13be8f7f07f97e0c5f9f59274e4f15f1a857ee24ea5cb91d4
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("Colab")\
        .config('spark.ui.port', '4040')\
        .getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f55e023a5f0>


In [None]:
# Installing Necessary Packages

from pyspark.sql.functions import col, isnan, when, count
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler, PCA
from pyspark.ml import Pipeline

from pyspark.ml.feature import PCA
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [None]:
# Load dataset
df = spark.read.csv("/content/Cleaned_CusSeg.csv", header=True, inferSchema=True)
df.show(5)

+----+---+----------+--------------+------+-------+--------+-----------+-------+-----+------+------------+------------+-------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+------------+------------+------------+------------+------------+--------+--------+
|  ID|Age| Education|Marital_Status|Income|Kidhome|Teenhome|Dt_Customer|Recency|Wines|Fruits|MeatProducts|FishProducts|SweetProducts|GoldProducts|NumDealsPurchases|NumWebPurchases|NumCatalogPurchases|NumStorePurchases|NumWebVisitsMonth|AcceptedCmp3|AcceptedCmp4|AcceptedCmp5|AcceptedCmp1|AcceptedCmp2|Complain|Response|
+----+---+----------+--------------+------+-------+--------+-----------+-------+-----+------+------------+------------+-------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+------------+------------+------------+------------+------------+--------+--------+
|5524| 57|Graduation|        Single| 581

In [None]:
# Converting categorical columns to numerical using StringIndexer and OneHotEncoder
categorical_columns = ['Education', 'Marital_Status']
indexer = [StringIndexer(inputCol=col, outputCol=col + "_index") for col in categorical_columns]
encoder = [OneHotEncoder(inputCols=[col + "_index"], outputCols=[col + "_vec"]) for col in categorical_columns]

In [None]:
# Numeric feature columns
numeric_columns = ['Age', 'Income', 'Kidhome', 'Teenhome', 'Dt_Customer', 'Recency',
                   'Wines', 'Fruits', 'MeatProducts', 'FishProducts', 'SweetProducts',
                   'GoldProducts', 'NumDealsPurchases', 'NumWebPurchases', 'NumCatalogPurchases',
                   'NumStorePurchases', 'NumWebVisitsMonth', 'AcceptedCmp3', 'AcceptedCmp4',
                   'AcceptedCmp5', 'AcceptedCmp1', 'AcceptedCmp2', 'Complain', 'Response']

In [None]:
# Assembling features into a single vector
assembler = VectorAssembler(inputCols=[i + "_vec" for i in categorical_columns] + numeric_columns,
                            outputCol="features")

# Standardizing the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

In [None]:
# pipeline
pipeline = Pipeline(stages=indexer + encoder + [assembler, scaler])

# Fit and transform the data
model = pipeline.fit(df)
df_transformed = model.transform(df)

In [None]:
# PCA
pca = PCA(k=len(numeric_columns) + len(categorical_columns), inputCol="scaled_features", outputCol="pca_features")
pca_model = pca.fit(df_transformed)

# Explained variance
explained_variance = pca_model.explainedVariance.toArray()

# Cumulative explained variance
cumulative_variance = np.cumsum(explained_variance)

# Number of components to keep to explain 90% of the variance
threshold = 0.90
num_components = np.where(cumulative_variance >= threshold)[0][0] + 1

print('Number of components to retain',threshold*100,'% variance:',num_components)

Number of components to retain 90.0 % variance: 21


In [None]:
# Apply PCA with 21 components
pca = PCA(k=21, inputCol="scaled_features", outputCol="pca_features")
pca_model = pca.fit(df_transformed)
df_pca = pca_model.transform(df_transformed)

In [None]:
# Silhouette Score Calculation
for k in range(2, 10):
    kmeans = KMeans(k=k, seed=1, featuresCol="pca_features", predictionCol="cluster")
    kmeans_model = kmeans.fit(df_pca)
    df_clusters = kmeans_model.transform(df_pca)
    evaluator = ClusteringEvaluator(predictionCol='cluster', featuresCol='pca_features', metricName='silhouette')
    silhouette_score = evaluator.evaluate(df_clusters)
    print('Silhouette Score for k=',k,':',silhouette_score)

Silhouette Score for k= 2 : 0.3292707148738423
Silhouette Score for k= 3 : 0.33916962913549553
Silhouette Score for k= 4 : 0.22228086143399467
Silhouette Score for k= 5 : 0.23437840352882616
Silhouette Score for k= 6 : 0.24289909487283445
Silhouette Score for k= 7 : 0.15771609718939686
Silhouette Score for k= 8 : 0.15929187270698264
Silhouette Score for k= 9 : 0.16149693063315124


In [None]:
# K-Means Clustering with k=3
k = 3
kmeans = KMeans(k=k, seed=1, featuresCol="pca_features", predictionCol="cluster")
kmeans_model = kmeans.fit(df_pca)
df_clusters = kmeans_model.transform(df_pca)

# Show the results
df_clusters.select("Id", "cluster").show(truncate=False)

+----+-------+
|Id  |cluster|
+----+-------+
|5524|0      |
|2174|1      |
|4141|0      |
|6182|1      |
|5324|1      |
|7446|0      |
|965 |0      |
|6177|1      |
|4855|1      |
|5899|1      |
|387 |1      |
|2125|0      |
|8180|1      |
|2569|1      |
|2114|0      |
|9736|1      |
|4939|1      |
|6565|0      |
|2278|1      |
|9360|1      |
+----+-------+
only showing top 20 rows



In [None]:
df_clusters.groupBy('cluster').count().show()

+-------+-----+
|cluster|count|
+-------+-----+
|      1| 1317|
|      2|   20|
|      0|  867|
+-------+-----+



In [None]:
# extract components from the Vector
def extract_pca_components(v):
    return v.toArray().tolist()

extract_pca_components_udf = udf(extract_pca_components, ArrayType(DoubleType()))

# create an array column
df_clusters = df_clusters.withColumn("pca_array", extract_pca_components_udf(col("pca_features")))

# Split the array column into separate columns
for i in range(num_components):
    df_clusters = df_clusters.withColumn(f"PC{i+1}", col("pca_array")[i])

# Drop the temporary array column
df_clusters = df_clusters.drop("pca_array")

In [None]:
# List of columns
columns = [
    'ID', 'Age', 'Education', 'Marital_Status', 'Income', 'Kidhome', 'Teenhome',
    'Dt_Customer', 'Recency', 'Wines', 'Fruits', 'MeatProducts', 'FishProducts',
    'SweetProducts', 'GoldProducts', 'NumDealsPurchases', 'NumWebPurchases',
    'NumCatalogPurchases', 'NumStorePurchases', 'NumWebVisitsMonth', 'AcceptedCmp3',
    'AcceptedCmp4', 'AcceptedCmp5', 'AcceptedCmp1', 'AcceptedCmp2', 'Complain',
    'Response', 'cluster', 'PC1', 'PC2', 'PC3']

df_final = df_clusters.select(columns)

In [None]:
# Save the DataFrame to CSV
df_final.toPandas().to_csv('Clustered_Data.csv', index=False)