In [6]:
from pyspark import SparkContext, SparkConf
from pyspark.mllib.clustering import KMeans
from sklearn.preprocessing import StandardScaler
import numpy as np

In [4]:
conf = SparkConf().setMaster("local").setAppName("SparkKMeans")
sc = SparkContext(conf=conf)

In [8]:
def create_clustered_data(n, k):
    np.random.seed(0)
    points_per_cluster = float(n / k)
    X = []
    for i in range(k):
        income_centroid = np.random.uniform(20000.0, 200000.0)
        age_centroid = np.random.uniform(20.0, 70.0)
        for j in range(int(points_per_cluster)):
            X.append([np.random.normal(income_centroid, 10000.0), 
                     np.random.normal(age_centroid, 2.0)])
    return np.array(X)

In [12]:
K = 5
scaler = StandardScaler()
data = scaler.fit_transform(create_clustered_data(100, K))
data = sc.parallelize(data)

In [13]:
clusters = KMeans.train(data, K, maxIterations=10, runs=10, initializationMode="random")



In [14]:
resultRDD = data.map(lambda point: clusters.predict(point)).cache()

In [16]:
print("Counts by Value")
counts = resultRDD.countByValue()
print(counts)

Counts by Value
defaultdict(<class 'int'>, {1: 20, 3: 27, 0: 18, 4: 23, 2: 12})


In [17]:
print("Cluster assignments:")
results = resultRDD.collect()
print(results)

Cluster assignments:
[1, 1, 1, 3, 3, 1, 3, 1, 3, 3, 1, 1, 3, 1, 3, 3, 3, 1, 3, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 4, 4, 0, 0, 0, 4, 4, 3, 3, 4, 4, 3, 4, 2, 3, 3, 0, 2, 2, 3, 3, 4, 4, 4, 4, 0, 2, 4, 4, 4, 2, 2, 4, 2, 2, 4, 4, 2, 4, 2, 2, 4, 4, 2, 4, 4, 1, 1, 3, 3, 3, 3, 3, 1, 3, 3, 3, 1, 1, 1, 1, 3, 1, 1, 1, 1]


In [28]:
def wcss_error(point):
    center = clusters.centers[clusters.predict(point)]
    return np.sqrt(np.sum([x ** 2 for x in (point - center)]))

WCSS = data.map(lambda point: wcss_error(point)).reduce(lambda x, y: x + y)
print(f"WCSS: {WCSS}")

WCSS: 55.642078339133086
