# BELLAKHAL Mohamed KMEANS DEMO

In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\spark-2.4.7-bin-hadoop2.7'

In [2]:
from pyspark.mllib.clustering import KMeans
from numpy import array, random
from math import sqrt
from pyspark import SparkConf, SparkContext
from sklearn.preprocessing import scale

In [17]:
K = 5

In [9]:
# Boilerplate Spark stuff:
conf = SparkConf().setMaster("local").setAppName("SparkKMeans")
sc = SparkContext(conf = conf)
sc

In [10]:
#Create fake income/age clusters for N people in k clusters
def createClusteredData(N, k):
    random.seed(10)
    pointsPerCluster = float(N)/k
    X = []
    for i in range (k):
        incomeCentroid = random.uniform(20000.0, 200000.0)
        ageCentroid = random.uniform(20.0, 70.0)
        for j in range(int(pointsPerCluster)):
            X.append([random.normal(incomeCentroid, 10000.0), random.normal(ageCentroid, 2.0)])
    X = array(X)
    return X


In [18]:
# Load the data; note I am normalizing it with scale() - very important!
#scale: standardize the date along axis
data = sc.parallelize(scale(createClusteredData(100, K)))

In [19]:
# Build the model (cluster the data)
clusters = KMeans.train(data, K, maxIterations=10,
        runs=10, initializationMode="random")



In [20]:
# Print out the cluster assignments
resultRDD = data.map(lambda point: clusters.predict(point)).cache()

print("Counts by value:")
counts = resultRDD.countByValue()
print(counts)

print("Cluster assignments:")
results = resultRDD.collect()
print(results)


Counts by value:
defaultdict(<class 'int'>, {0: 14, 1: 6, 2: 40, 4: 19, 3: 21})
Cluster assignments:
[0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]


In [21]:
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))


Within Set Sum of Squared Error = 33.286343946178945


### What happens to WSSSE as you increase or decrease K? Why? 


In [23]:
# let's try decreasing k 
K = 2
data = sc.parallelize(scale(createClusteredData(100, K)))
clusters = KMeans.train(data, K, maxIterations=10,
        runs=10, initializationMode="random")
resultRDD = data.map(lambda point: clusters.predict(point)).cache()

print("Counts by value:")
counts = resultRDD.countByValue()
print(counts)

print("Cluster assignments:")
results = resultRDD.collect()
print(results)
WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))



Counts by value:
defaultdict(<class 'int'>, {1: 50, 0: 50})
Cluster assignments:
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Within Set Sum of Squared Error = 23.79538138211942


In [24]:
# let's try increasing k 
K = 6
data = sc.parallelize(scale(createClusteredData(100, K)))
clusters = KMeans.train(data, K, maxIterations=10,
        runs=10, initializationMode="random")
resultRDD = data.map(lambda point: clusters.predict(point)).cache()

print("Counts by value:")
counts = resultRDD.countByValue()
print(counts)

print("Cluster assignments:")
results = resultRDD.collect()
print(results)
WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))



Counts by value:
defaultdict(<class 'int'>, {5: 12, 4: 9, 0: 11, 3: 16, 2: 32, 1: 16})
Cluster assignments:
[5, 4, 5, 5, 5, 5, 4, 5, 4, 5, 4, 0, 5, 5, 5, 5, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 4, 0, 0, 4, 4, 0, 4, 0, 0, 0, 0, 0, 5, 0, 0, 4, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
Within Set Sum of Squared Error = 20.684159154887354


### Observation : 


Different local optimas for the same value of K ==> k-means is non-deterministic with respect to the initial cluster.
Variation of WSSSE based on the values of K ==> the variation can be rather positive or negative if we increase the K values. 
==> In order to find the perfect value for K we need to compute a lot of different K values, plot the WSSSE curve and find the optimal value. 


## What happens if you don't normalize the input data before clustering?

In [25]:
# Non normalized data
K = 5
data = sc.parallelize(createClusteredData(100, K))
clusters = KMeans.train(data, K, maxIterations=10,
        runs=10, initializationMode="random")
resultRDD = data.map(lambda point: clusters.predict(point)).cache()

print("Counts by value:")
counts = resultRDD.countByValue()
print(counts)

print("Cluster assignments:")
results = resultRDD.collect()
print(results)
WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

Counts by value:
defaultdict(<class 'int'>, {4: 20, 0: 15, 1: 17, 2: 28, 3: 20})
Cluster assignments:
[4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1, 2, 2, 2, 2, 2, 2, 1, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 1, 2, 1, 2, 1, 2, 2, 1, 2, 2, 2, 1, 2, 1, 1, 2, 2, 2, 1, 1]
Within Set Sum of Squared Error = 597770.0334061437


### Observation 

WSSSE=597770.033 which is a very high value ==> the model loses it's credibility
==> this experiment highlights the importance of normalizing our data

## What happens if you change the maxIterations or runs parameters?

In [30]:
# Max iterations = 200
K = 5
data = sc.parallelize(scale(createClusteredData(100, K)))
clusters = KMeans.train(data, K, maxIterations=200,
        runs=10, initializationMode="random")
resultRDD = data.map(lambda point: clusters.predict(point)).cache()

print("Counts by value:")
counts = resultRDD.countByValue()
print(counts)

print("Cluster assignments:")
results = resultRDD.collect()
print(results)
WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))



Counts by value:
defaultdict(<class 'int'>, {0: 20, 2: 20, 3: 23, 1: 20, 4: 17})
Cluster assignments:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 4, 4, 4, 4, 4, 4, 4, 4, 4, 3, 4, 4, 4, 4, 4, 3, 3, 4, 4, 4]
Within Set Sum of Squared Error = 20.385885709661082


In [31]:
# Max iterations = 500
K = 5
data = sc.parallelize(scale(createClusteredData(100, K)))
clusters = KMeans.train(data, K, maxIterations=500,
        runs=10, initializationMode="random")
resultRDD = data.map(lambda point: clusters.predict(point)).cache()

print("Counts by value:")
counts = resultRDD.countByValue()
print(counts)

print("Cluster assignments:")
results = resultRDD.collect()
print(results)
WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))



Counts by value:
defaultdict(<class 'int'>, {1: 20, 3: 20, 2: 40, 4: 12, 0: 8})
Cluster assignments:
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 4, 4, 4, 0, 0, 0, 4, 4, 0, 0, 4, 0, 4, 4, 4, 0, 0, 4, 4, 4, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
Within Set Sum of Squared Error = 21.77217779843022


In [32]:
# Max iterations = 1500
K = 5
data = sc.parallelize(scale(createClusteredData(100, K)))
clusters = KMeans.train(data, K, maxIterations=1500,
        runs=10, initializationMode="random")
resultRDD = data.map(lambda point: clusters.predict(point)).cache()

print("Counts by value:")
counts = resultRDD.countByValue()
print(counts)

print("Cluster assignments:")
results = resultRDD.collect()
print(results)
WSSSE = data.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))



Counts by value:
defaultdict(<class 'int'>, {0: 20, 4: 20, 1: 20, 3: 20, 2: 20})
Cluster assignments:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 1, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3]
Within Set Sum of Squared Error = 20.33374187668045


### Observation 

When increasing the maxiterations value, we got a better convergence for our model but when we reached a certain value of iterations, the maxiterations would'nt change the WSSSE to a better value