## Clustering using SparkML


----


In [None]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

### Import Required Libraries




In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

#import ML classes/functions
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler


## Create a spark session


In [4]:
spark = SparkSession.builder.appName("clustering model").getOrCreate()

24/04/24 10:44:58 WARN Utils: Your hostname, nasr-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.142.128 instead (on interface ens33)
24/04/24 10:44:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/24 10:44:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load the data in a csv file into a dataframe


Load the dataset into the spark dataframe


In [8]:
customer_dataset= spark.read.csv("customers.csv",header=True,inferSchema=True)

schema of the dataset


In [9]:
customer_dataset.printSchema()

root
 |-- Fresh_Food: integer (nullable = true)
 |-- Milk: integer (nullable = true)
 |-- Grocery: integer (nullable = true)
 |-- Frozen_Food: integer (nullable = true)



In [None]:
# Each row in this dataset is about a customer. The columns indicate the orders placed
# by a customer for Fresh_food, Milk, Grocery and Frozen_Food

Show top 5 rows from the dataset


In [11]:
customer_dataset.show(n=5, truncate=False)

+----------+----+-------+-----------+
|Fresh_Food|Milk|Grocery|Frozen_Food|
+----------+----+-------+-----------+
|12669     |9656|7561   |214        |
|7057      |9810|9568   |1762       |
|6353      |8808|7684   |2405       |
|13265     |1196|4221   |6404       |
|22615     |5410|7198   |3915       |
+----------+----+-------+-----------+
only showing top 5 rows



## Create a feature vector


In [13]:
# Assemble the features into a single vector column
feature_cols = ['Fresh_Food', 'Milk', 'Grocery', 'Frozen_Food']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
customer_transformed_dataset = assembler.transform(customer_dataset)


print transformed customer dataset

In [14]:
customer_transformed_dataset.show(n=5, truncate=False)

+----------+----+-------+-----------+------------------------------+
|Fresh_Food|Milk|Grocery|Frozen_Food|features                      |
+----------+----+-------+-----------+------------------------------+
|12669     |9656|7561   |214        |[12669.0,9656.0,7561.0,214.0] |
|7057      |9810|9568   |1762       |[7057.0,9810.0,9568.0,1762.0] |
|6353      |8808|7684   |2405       |[6353.0,8808.0,7684.0,2405.0] |
|13265     |1196|4221   |6404       |[13265.0,1196.0,4221.0,6404.0]|
|22615     |5410|7198   |3915       |[22615.0,5410.0,7198.0,3915.0]|
+----------+----+-------+-----------+------------------------------+
only showing top 5 rows



# KMeans number of clusters


In [None]:
number_of_clusters = 3

## Task 4 - Create a clustering model


Create a KMeans clustering model


In [None]:
kmeans = KMeans(k = number_of_clusters)


Train/Fit the model on the dataset<br>


In [None]:
model = kmeans.fit(customer_transformed_data)


## Task 5 - Print Cluster Details


Your model is now trained. Time to evaluate the model.


In [None]:
# Make predictions on the dataset
predictions = model.transform(customer_transformed_data)

In [None]:
# Display the results
predictions.show(5)

Display how many customers are there in each cluster.


In [None]:
predictions.groupBy('prediction').count().show()

In [None]:
#stop spark session
spark.stop()