# Parallel KMeans implementation
Based on J. Y. Q. H. Z. W. a. J. C. Bowen Wang, “Parallelizing K-means-based Clustering on Spark,” International Conference on Advanced Cloud and Big Data, 2016. 

## Parallel partition-based algorithm outline
1. Initialize centroids by randomly selecting k points from the data set. Broadcast selected centroids to all nodes
1. While centroids are moving:
    1. Broadcast current centroids
    1. For each point
        1. Compute the distance to all centroids
        1. Asign the closest cluster
    1. For each cluster
        1. Compute local mean
    1. Compute the mean for each cluster for each partition
 

## Adaptations made to suggested implementation of the algorithm:
1. The authors suggest using SparseVector, with chosen data sets it is better to use regular arrays
1. We use the random sample for centroids initialization as described in *Scalable K-Means++* because the quality of initial centroids has a major effect on the quality
1. We use crisp clustering only i. e. each point can be a member of one cluster only
1. We use Euclidian data because it is the recommended distance function for dense data.

## Description of production cluster on Azure:
We are using Azure HDIsight in order to run a spark cluster. We are using a cluster with the following configuration:
* two master nodes Standard_D12_V2 4 CPU Cores 28GB RAM
* eight slave nodes Standard_D13_V2 8 CPU Cores 56GB RAM


In [42]:
import os
import requests
import time
from pprint import pprint

from numpy import array
import numpy as np
import pandas as pd
from itertools import groupby

In [21]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import split, col, size, trim, lit
from pyspark.ml.linalg import Vectors, DenseVector

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("FirstSparkJob") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

sc = spark.sparkContext

In [3]:
RANDOM_SEED = 301191
K = 3 

In [4]:
A3_DATASET_URL = "https://cs.joensuu.fi/sipu/datasets/a3.txt"
DATA_FOLDER = "/home/jovyan/work/data"
A3_LOCAL_PATH = os.path.join(DATA_FOLDER, "a3.txt")

In [5]:
# Download Data
response = requests.get(A3_DATASET_URL)
if not os.path.exists(A3_LOCAL_PATH):
    with open(A3_LOCAL_PATH, 'wb') as file:
        file.write(response.content)

In [9]:
# Load clean data into spark
data = sc.textFile(A3_LOCAL_PATH)
parsed_data = data.map(lambda row: Vectors.dense([float(x) for x in row.strip().split()]))
parsed_data.cache()

parsed_data.take(5)

                                                                                

[DenseVector([53920.0, 42968.0]),
 DenseVector([52019.0, 42206.0]),
 DenseVector([52570.0, 42476.0]),
 DenseVector([54220.0, 42081.0]),
 DenseVector([54268.0, 43420.0])]

In [59]:
def compute_distance(p, centroid):
    return np.sqrt(np.sum((np.array(p) - np.array(centroid))**2))

def kmeans(rdd, centroids, max_iters=10, tolerance=0.1):
    for i in range(max_iters):
        timer = time.time()
        
        # Broadcast the centroids to all nodes
        broadcast_centroids = spark.sparkContext.broadcast(centroids)
        clustered_rdd = rdd.map(lambda p: (np.argmin([compute_distance(p, c) for c in broadcast_centroids.value]), (p, 1)))

        # Recompute centroids by averaging points in each cluster
        new_centroids = (
            clustered_rdd
            .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))  # Sum points and count
            .map(lambda x: (x[0], x[1][0] / x[1][1]))  # Compute new centroids
            .collectAsMap()
        )

        new_centroids_arr = np.array([new_centroids[j] for j in range(len(centroids))], dtype=np.float64)
        
        # Calculate sum of shifts for each centroid
        shifts = np.sum(np.linalg.norm(new_centroids_arr - centroids, axis=1))
        
        print(f"Iteration:{i}\ttime taken:")
        if shifts < tolerance:
            break

        # Update the centroids
        centroids = new_centroids_arr
    
        # Free memory
        broadcast_centroids.unpersist()

    return centroids

# Run the K-Means with broadcasting
centroids = np.array(parsed_data.takeSample(False, K, seed=RANDOM_SEED), dtype=np.float64)
final_centroids = kmeans(parsed_data, centroids)
print("Final centroids:", final_centroids)

24/10/18 16:00:13 WARN TaskSetManager: Lost task 0.0 in stage 148.0 (TID 302) (172.18.0.6 executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1237, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyspark/rdd.py", line 5434, in pipeline_func
  File "/usr/local/lib/python3.11/site-packages/pyspark/rdd.py", line 5434, in pipeline_func
  File "/usr/local/lib/python3.11/site-packages/pyspark/rdd.py", line 840, in func
  File "/usr/local/lib/python3.11/site-packages/pyspark/rdd.py", line 3983, in combineLocally
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 256, in mergeValues
    for k, v in iterator:
  File "/opt/bitnami/spark/python/lib/pyspark.zip/p

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 148.0 failed 4 times, most recent failure: Lost task 0.3 in stage 148.0 (TID 308) (172.18.0.6 executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1237, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyspark/rdd.py", line 5434, in pipeline_func
  File "/usr/local/lib/python3.11/site-packages/pyspark/rdd.py", line 5434, in pipeline_func
  File "/usr/local/lib/python3.11/site-packages/pyspark/rdd.py", line 840, in func
  File "/usr/local/lib/python3.11/site-packages/pyspark/rdd.py", line 3983, in combineLocally
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 256, in mergeValues
    for k, v in iterator:
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_21/4239780651.py", line 8, in <lambda>
  File "/tmp/ipykernel_21/4239780651.py", line 8, in <listcomp>
  File "/tmp/ipykernel_21/4239780651.py", line 2, in compute_distance
  File "/.local/lib/python3.11/site-packages/numpy/linalg/_linalg.py", line 2833, in norm
    return sqrt(add.reduce(s, axis=axis, keepdims=keepdims))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
numpy.exceptions.AxisError: axis 1 is out of bounds for array of dimension 1

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1046)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:407)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1045)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at jdk.internal.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1237, in process
    out_iter = func(split_index, iterator)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/pyspark/rdd.py", line 5434, in pipeline_func
  File "/usr/local/lib/python3.11/site-packages/pyspark/rdd.py", line 5434, in pipeline_func
  File "/usr/local/lib/python3.11/site-packages/pyspark/rdd.py", line 840, in func
  File "/usr/local/lib/python3.11/site-packages/pyspark/rdd.py", line 3983, in combineLocally
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/shuffle.py", line 256, in mergeValues
    for k, v in iterator:
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_21/4239780651.py", line 8, in <lambda>
  File "/tmp/ipykernel_21/4239780651.py", line 8, in <listcomp>
  File "/tmp/ipykernel_21/4239780651.py", line 2, in compute_distance
  File "/.local/lib/python3.11/site-packages/numpy/linalg/_linalg.py", line 2833, in norm
    return sqrt(add.reduce(s, axis=axis, keepdims=keepdims))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
numpy.exceptions.AxisError: axis 1 is out of bounds for array of dimension 1

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
