In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings("ignore")

In [3]:
spark = SparkSession.builder \
        .appName("MallCustomerClustering") \
        .master("local[4]") \
        .config("spark.executor.memory", "4g") \
        .getOrCreate()

In [4]:
df = spark.read.format("csv") \
    .option("header", True) \
    .option("sep", ",") \
    .option("inferSchema", True) \
    .load("C:/Users/htcso/OneDrive/Masaüstü/pySpark/dataset/MallCustomers.csv")

In [5]:
df.limit(5).toPandas().head()

Unnamed: 0,CustomerID,Gender,Age,AnnualIncome,SpendingScore
0,1,Male,19,15,39
1,2,Male,21,15,81
2,3,Female,20,16,6
3,4,Female,23,16,77
4,5,Female,31,17,40


In [6]:
df.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- AnnualIncome: integer (nullable = true)
 |-- SpendingScore: integer (nullable = true)



In [7]:
df.describe().show()

+-------+------------------+------+-----------------+-----------------+------------------+
|summary|        CustomerID|Gender|              Age|     AnnualIncome|     SpendingScore|
+-------+------------------+------+-----------------+-----------------+------------------+
|  count|               200|   200|              200|              200|               200|
|   mean|             100.5|  NULL|            38.85|            60.56|              50.2|
| stddev|57.879184513951124|  NULL|13.96900733155888|26.26472116527124|25.823521668370173|
|    min|                 1|Female|               18|               15|                 1|
|    max|               200|  Male|               70|              137|                99|
+-------+------------------+------+-----------------+-----------------+------------------+



In [9]:
vector_assembler = VectorAssembler() \
    .setInputCols(["Age", "AnnualIncome", "SpendingScore"]) \
    .setOutputCol("features")

In [10]:
standard_scaler = StandardScaler() \
    .setInputCol("features") \
    .setOutputCol("scaled_features")

In [11]:
kmeans_obj = KMeans() \
    .setK(5) \
    .setSeed(142) \
    .setFeaturesCol("scaled_features") \
    .setPredictionCol("cluster")

In [12]:
pipeline_obj = Pipeline() \
    .setStages([vector_assembler, standard_scaler, kmeans_obj])

In [13]:
pipeline_model = pipeline_obj.fit(df)

In [14]:
transformed_df = pipeline_model.transform(df)

In [15]:
transformed_df.limit(5).toPandas().head()

Unnamed: 0,CustomerID,Gender,Age,AnnualIncome,SpendingScore,features,scaled_features,cluster
0,1,Male,19,15,39,"[19.0, 15.0, 39.0]","[1.360153914235199, 0.5711082903036444, 1.5102...",2
1,2,Male,21,15,81,"[21.0, 15.0, 81.0]","[1.503328010470483, 0.5711082903036444, 3.1366...",0
2,3,Female,20,16,6,"[20.0, 16.0, 6.0]","[1.431740962352841, 0.6091821763238874, 0.2323...",2
3,4,Female,23,16,77,"[23.0, 16.0, 77.0]","[1.6465021067057672, 0.6091821763238874, 2.981...",0
4,5,Female,31,17,40,"[31.0, 17.0, 40.0]","[2.2191984916469036, 0.6472560623441304, 1.548...",2


In [16]:
transformed_df.groupby("cluster").count().show()

+-------+-----+
|cluster|count|
+-------+-----+
|      1|   58|
|      3|   34|
|      4|   39|
|      2|   47|
|      0|   22|
+-------+-----+



In [20]:
# optimal k (Silhouette Score)

def runKMeans(df, k):
    kmeans_obj = KMeans() \
    .setK(k) \
    .setSeed(142) \
    .setFeaturesCol("scaled_features") \
    .setPredictionCol("cluster")
    
    pipeline_obj = Pipeline() \
    .setStages([vector_assembler, standard_scaler, kmeans_obj])
    
    pipeline_model = pipeline_obj.fit(df)
    
    return pipeline_model


for k in range(2,11):
    pipeline_model = runKMeans(df, k)
    transformed_df = pipeline_model.transform(df)
    
    evaluator = ClusteringEvaluator() \
        .setFeaturesCol("scaled_features") \
        .setPredictionCol("cluster") \
        .setMetricName("silhouette")
    
    score = evaluator.evaluate(transformed_df)
    print("k:" , k, score)

k: 2 0.4941138177999588
k: 3 0.4551097395569455
k: 4 0.584499291657808
k: 5 0.5836269996802264
k: 6 0.6133451611507313
k: 7 0.5800427933568477
k: 8 0.577588821499726
k: 9 0.5755257731206147
k: 10 0.5432447861610747


In [21]:
spark.stop()