## KMeans with MapReduce


Here the implementation of KMeans from scratch and using the MapReduce approach.

---

Definition of auxiliary functions:

In [None]:
#Import libraries
import numpy as np

#Set global variables
MAX_ITERATIONS = 50
epsilon = 0.02

#Function to remove first line
def remove_header(rdd):
    header = rdd.first()
    res = rdd.filter(lambda line: line != header)
    return res

#Function to prepare the dataset
def preprocessing(rdd):
    rdd = remove_header(rdd) #remove first line (only contains column names)
    #Then:
    # - remove first column (only contains indexes)
    # - cast strings to floats
    # - cast list of floats to numpy array
    rdd = rdd.map(lambda row: row.split(",")[1:])\
                .map(lambda row: [float(i) for i in row])\
                .map(lambda row: np.array(row))    
    return rdd

#Function to initialize centroids to random points in our dataset
def init_centroids(dataset, k):
    initial_centroids = dataset.takeSample(False, k)
    return initial_centroids

#Function to compute euclidean distance between points
def eu_dist(v1, v2):
    dist = np.linalg.norm(v1 - v2)
    return dist

#Function to be called by map
def mapper(point, centroids):
    distances = np.zeros(len(centroids))
    for i in range(len(centroids)):
        distances[i] = eu_dist(point, centroids[i])
    idx_min = np.argmin(distances)
    return idx_min

#Function to calculate new centroids after the reduce step
#reduce_result = (i, (sum x_i, |S_i|)) for i = 0, ..., k - 1
def update_centroids(reduce_result, k):
    new_centroids = []
    result = reduce_result.take(k)
    for i in range(k):
        new_centroids.append(result[i][1][0] / result[i][1][1])
    return new_centroids

#Function to broadcast the new centroids at every iteration
def broadcasting(current_centroids):
    centroids_broadcast = sc.broadcast(current_centroids)
    new_centroids = centroids_broadcast.value
    return new_centroids

#Define our loss function to check how "different" new centroids are
#L(c1,...,ck) = ∑_i=1,..,n min_j||xi − cj||2
def error_function(old_centroids, new_centroids, k):
    distances = np.zeros(k)
    for i in range(k):
        distances[i] = eu_dist(old_centroids[i], new_centroids[i])   #distances = [d1, d2, d3, ..., dk]
    mean_distance = np.mean(distances)
    return mean_distance

#Function to define the stopping condition of our program
#(looks at the global variables epsilon and MAX_ITERATIONS)
def stop_condition(mean_distance, n_iter):
    stop = False
    if (mean_distance <= epsilon or n_iter == MAX_ITERATIONS):
        stop = True
    return stop

#Function to be applied on the final RDD before saving to file
def toCSVRow_components(data_row):
    temp = str(data_row[0]).strip('[').strip(']').strip().replace('  ', ' ') + ' ' + str(data_row[1])
    return temp.replace(' ', ',')

#Function to be applied on the header of the RDD before saving to file
def toCSVRow_header(data_row):
    res = ','.join(e for e in data_row)
    return res

---------
<br>

Executing our KMeans:

In [None]:
#MAIN PROGRAM

#Load data and preprocess it
components = sc.textFile('s3://admhw4/principal5.csv')
components_prep = preprocessing(components)

#KMEANS-------------------------------------------------------------------------------------------------
#Initialization
k = 9                                                      #init number of clusters
initial_centroids = init_centroids(components_prep, k)     #init centroids to random points
centroids_broadcast = broadcasting(initial_centroids)      #broadcast initial centroids
n = 0                                                      #init number of iterations
stop = False                                               #init boolean flag

#Start iterating
while stop == False:
    
    #Map
    components_rdd = components_prep.map(lambda row: (mapper(row, centroids_broadcast), (row, 1)))
    #vector -> (cluster, (vector, 1))
    
    #Reduce
    result = components_rdd.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))
    #(cluster_i, (vector_a, 1)), (cluster_i, (vector_b, 1)) -> (cluster_i, (vector_a + vector_b, 1 + 1))
    
    #Update number of iterations
    n += 1
    
    #Update centroids
    new_centroids = update_centroids(result, k)
    
    #Check distance between centroids
    error = error_function(initial_centroids, new_centroids, k)
    
    #Evaluate stop condition
    stop = stop_condition(error, n)
    
    #If we can't stop, broadcast the new centroids and update the initial ones
    if stop == False:
        centroids_broadcast = broadcasting(new_centroids)
        initial_centroids = centroids_broadcast

#-------------------------------------------------------------------------------------------------------

components_with_clusters = components_rdd.map(lambda row: (row[1][0], row[0]))

---------
<br>

Save the final RDD to file:

In [None]:
#Add back the header
header = sc.parallelize(np.array([["PC1", "PC2", "PC3", "PC4", "PC5", "cluster"]]))
header.take(1)

In [None]:
#Format the lines of components RDD
components_to_save = components_with_clusters.map(lambda row: toCSVRow_components(row))

In [None]:
#Format the header
header_to_save = header.map(lambda row: toCSVRow_header(row))

In [None]:
#Concatenate the header and the components RDD
df = header_to_save.union(components_to_save)

In [None]:
#Finally save to S3 bucket our file
df.saveAsTextFile('s3://admhw4/mapreduce_kmeans.csv')