In [1]:
import findspark
findspark.init('/opt/spark')
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('assignment4').getOrCreate()

In [4]:
df = spark.read.csv('wholesale-customers-data.csv', inferSchema=True, header=True)

In [5]:
df.printSchema()

root
 |-- Channel: integer (nullable = true)
 |-- Region: integer (nullable = true)
 |-- Fresh: integer (nullable = true)
 |-- Milk: integer (nullable = true)
 |-- Grocery: integer (nullable = true)
 |-- Frozen: integer (nullable = true)
 |-- Detergents_Paper: integer (nullable = true)
 |-- Delicassen: integer (nullable = true)



In [6]:
# We could also use code commented below, but I could not give specific names for columns in that way
# Thus I used pyspark.sql.functions library below 
# df.groupBy("Region", "Channel").agg({"Delicassen": "count", "Milk": "avg", "Grocery": "avg", "Frozen": "avg"}).show()

import pyspark.sql.functions as func
df.groupBy("Region", "Channel").agg(func.count("Delicassen").alias("Count"), func.mean("Delicassen").alias("Avg(Delicassen)"), func.mean("Milk").alias("Avg(Milk)"), func.mean("Grocery").alias("Avg(Grocery)")).orderBy(func.desc("Region")).show()

+------+-------+-----+------------------+------------------+------------------+
|Region|Channel|Count|   Avg(Delicassen)|         Avg(Milk)|      Avg(Grocery)|
+------+-------+-----+------------------+------------------+------------------+
|     3|      1|  211|1518.2843601895734|3486.9810426540284| 3886.734597156398|
|     3|      2|  105|1826.2095238095237|10981.009523809524|15953.809523809523|
|     2|      2|   19|            1239.0|  9190.78947368421|16326.315789473685|
|     2|      1|   28| 1105.892857142857|           2304.25|            4395.5|
|     1|      2|   18|1871.9444444444443|           10784.0|18471.944444444445|
|     1|      1|   59|1197.1525423728813|3870.2033898305085| 4026.135593220339|
+------+-------+-----+------------------+------------------+------------------+



In [7]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler


ml_df = df["Fresh", "Detergents_Paper", "Delicassen"]
feat_cols = ["Fresh", "Detergents_Paper", "Delicassen"]
vec_assembler = VectorAssembler(inputCols = feat_cols, outputCol='features')
vec_data = vec_assembler.transform(ml_df)

# normalize features
scaler = StandardScaler(inputCol="features", outputCol="norm_features", withStd=True, withMean=False)
scalerModel = scaler.fit(vec_data)
norm_fin_data = scalerModel.transform(vec_data)

In [8]:
print(norm_fin_data)
for i in range(2,6):
    print("For k=" + str(i) + ":")
    kmeans = KMeans(featuresCol='norm_features').setK(i).setSeed(2459501)
    model4 = kmeans.fit(norm_fin_data)
    wssse_4 = model4.computeCost(norm_fin_data)
    print("Cost is " + str(wssse_4))
    print("Prediction distribution table: ")
    model4.transform(norm_fin_data).groupBy('prediction').count().show()

model4.transform(norm_fin_data).show()
#just to check values of prediction 1 
print(model4.transform(norm_fin_data).where("prediction == 1").collect())
    

DataFrame[Fresh: int, Detergents_Paper: int, Delicassen: int, features: vector, norm_features: vector]
For k=2:
Cost is 1041.2845273708995
Prediction distribution table: 
+----------+-----+
|prediction|count|
+----------+-----+
|         1|    1|
|         0|  439|
+----------+-----+

For k=3:
Cost is 761.2088587153988
Prediction distribution table: 
+----------+-----+
|prediction|count|
+----------+-----+
|         1|    1|
|         2|   70|
|         0|  369|
+----------+-----+

For k=4:
Cost is 509.2171027652562
Prediction distribution table: 
+----------+-----+
|prediction|count|
+----------+-----+
|         1|    1|
|         3|   35|
|         2|   64|
|         0|  340|
+----------+-----+

For k=5:
Cost is 465.39255314808526
Prediction distribution table: 
+----------+-----+
|prediction|count|
+----------+-----+
|         1|    1|
|         3|   76|
|         4|  327|
|         2|    1|
|         0|   35|
+----------+-----+

+-----+----------------+----------+------------------