In [8]:
import findspark 
findspark.init()

from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans

In [9]:
spark = SparkSession.builder.appName('Clustering mall customers into Gold - Silver - Bronze customers depending on their spending score ').getOrCreate()

## Reading dataset 

In [10]:
df = spark.read.csv('/Users/ihebd/PycharmProjects/Spark-with-machine-learning-/datasets/Mall_Customers.csv',
                    header=True, inferSchema=True)

### Let's explore our dataset

In [11]:
df.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Annual Income (k$): integer (nullable = true)
 |-- Spending Score (1-100): integer (nullable = true)



In [12]:
df.describe()

DataFrame[summary: string, CustomerID: string, Gender: string, Age: string, Annual Income (k$): string, Spending Score (1-100): string]

In [13]:
df.take(5)

[Row(CustomerID=1, Gender='Male', Age=19, Annual Income (k$)=15, Spending Score (1-100)=39),
 Row(CustomerID=2, Gender='Male', Age=21, Annual Income (k$)=15, Spending Score (1-100)=81),
 Row(CustomerID=3, Gender='Female', Age=20, Annual Income (k$)=16, Spending Score (1-100)=6),
 Row(CustomerID=4, Gender='Female', Age=23, Annual Income (k$)=16, Spending Score (1-100)=77),
 Row(CustomerID=5, Gender='Female', Age=31, Annual Income (k$)=17, Spending Score (1-100)=40)]

In [15]:
df.count()

200

### create features vector 

In [17]:
# This dataset contains only one feature which is spending score as we aim to cluster customers depending on their speding score 
# Import VectorAssembler to create our features vector 
from pyspark.ml.feature import VectorAssembler
features = ['Spending Score (1-100)']

feature_vector = VectorAssembler(inputCols=features, outputCol='features')

ds = feature_vector.transform(df)

In [18]:
ds.columns

['CustomerID',
 'Gender',
 'Age',
 'Annual Income (k$)',
 'Spending Score (1-100)',
 'features']

## Create our clustering kmeans model 

In [44]:
# Create kmeans model
kmeans = KMeans(featuresCol='features', k=3)

# fit kmeans model 
model = kmeans.fit(ds)



In [45]:
print(model.summary)

<pyspark.ml.clustering.KMeansSummary object at 0x0000016AC43F12E0>


In [46]:
# make predictions  

predictions = model.transform(ds)
from pyspark.ml.evaluation import ClusteringEvaluator
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

In [47]:
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.8172327476215493


In [48]:
model.transform(ds).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|   47|
|         2|   59|
|         0|   94|
+----------+-----+



We managed to cluster our customers into three groups depending on their spending score 

                * Cluster 1 : 47 customers
                * Cluster 2 : 59 customers
                * Cluster 3 : 94 customers 


## Analyse and visualize our clusters 

In [49]:
model.transform(ds).groupBy('prediction').max('Annual Income (k$)').show()

+----------+-----------------------+
|prediction|max(Annual Income (k$))|
+----------+-----------------------+
|         1|                    137|
|         2|                    137|
|         0|                     99|
+----------+-----------------------+



In [50]:
model.transform(ds).groupBy('prediction').min('Annual Income (k$)').show()

+----------+-----------------------+
|prediction|min(Annual Income (k$))|
+----------+-----------------------+
|         1|                     16|
|         2|                     15|
|         0|                     15|
+----------+-----------------------+



In [51]:
model.transform(ds).groupBy('prediction').sum('Annual Income (k$)').show()

+----------+-----------------------+
|prediction|sum(Annual Income (k$))|
+----------+-----------------------+
|         1|                   3158|
|         2|                   3826|
|         0|                   5128|
+----------+-----------------------+



In [52]:
model.transform(ds).groupBy('prediction').mean('Age').show()

+----------+-----------------+
|prediction|         avg(Age)|
+----------+-----------------+
|         1|42.95744680851064|
|         2|29.89830508474576|
|         0|42.41489361702128|
+----------+-----------------+



In [53]:
model.transform(ds).groupBy('prediction').sum('Spending Score (1-100)').show()

+----------+---------------------------+
|prediction|sum(Spending Score (1-100))|
+----------+---------------------------+
|         1|                        686|
|         2|                       4825|
|         0|                       4529|
+----------+---------------------------+



In [59]:
model.transform(ds).groupBy('prediction').avg('Spending Score (1-100)').show()

+----------+---------------------------+
|prediction|avg(Spending Score (1-100))|
+----------+---------------------------+
|         1|         14.595744680851064|
|         2|          81.77966101694915|
|         0|         48.180851063829785|
+----------+---------------------------+

