# MAPD-B: Pyspark implementation of kmeans || - kmeans++ and kmeans 
### Antonio Feltrin (2097126), Giosuè Sardo Infirri (2094560), Simone Toso (2095484)

In this project we will implement the ```kmeans||```, ```kmeans++``` and ```kmeans``` algorithms in PySpark as described in Bahmani's paper. We will use the ```kddCup99``` dataset as a testing ground.

The overall code will follow the procedure presented in the paper:
 - ```kmeans||```:  algorithm for the selection of possible initial centroids. It heavily relies on oversampling and will therefore pick on average more candidate centers than desired.
 - ```kmeans++```: used to select $k$ initial centroids out of the candidates proposed by ```kmeans||```
 - ```kmeans```: final algorithm for clusterization starting from the centroids obtained with ```kmeans++```.


# Spark context initialization

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("spark://10.67.22.124:7077")\
    .appName("prova iniziale")\
    .config("spark.executor.memory", "1g")\
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

23/09/17 08:45:43 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
spark

In [4]:
# Create a spark context
sc = spark.sparkContext
sc

In [5]:
sc.setLogLevel("ERROR")

In [8]:
#Import libraries
import numpy as np
import pandas as pd
import random
import sklearn.datasets 
import time
import matplotlib.pyplot as plt

from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.functions import col
from pyspark.sql.functions import mean
from pyspark.sql.functions import stddev
from pyspark.sql.functions import rand
from pyspark.sql.functions import lit
from pyspark.sql.functions import least
from pyspark.sql import Row
from pyspark.ml.feature import StandardScaler
from pyspark.sql.types import StringType
from pyspark.sql.functions import row_number
from pyspark.sql import Window
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import when
from pyspark.sql.functions import udf, array
from pyspark.sql.types import IntegerType, FloatType

from operator import add
from functools import reduce

In [None]:
time0 = time.time()

## Hyperparameters
Here we set the various hyperparameters. 

- ```k``` is the number of centroids we want to obtain
- ```len_df``` is the number of rows considered out of the whole dataset. The maximum value we consider is 494 021, corresponding to 10% of the entire dataset (note that the ```fetch_kddcup99()``` command takes automatically 10% of the dataset if not specified otherwise).
- ```G``` is used to evaluate the oversampling factor ```l```. In particular we will have $l = G\cdot \frac{k}{\log \phi}$, where $\phi$ is the initial cost. We expect that, after the implementation of ```kmeans||```, we will have around $k\cdot G$ possible centroids. The implementation of ```kmeans++``` will select $k$ centroids out of these possible candidates.

In [9]:
# Hyperparameters
k = 140
G = 3 
len_df = 494_021

# Import data and create a Spark dataframe

We now import the dataset and select randomly ```len_df``` rows. This will be the dataframe we will work on.

In [11]:
spark_X = spark.createDataFrame(sklearn.datasets.fetch_kddcup99(percent10 = True, as_frame = True)['frame']\
                                .iloc[random.sample(range(0, len_df), len_df)])

CPU times: user 1min 20s, sys: 1.57 s, total: 1min 21s
Wall time: 1min 23s


In [12]:
spark_X = spark_X.persist()

In [13]:
spark_X.rdd.getNumPartitions()
#spark_X = spark_X.repartition(4) # We use repartition and coalesce during benchmarking to choose the number of partitions
#spark_X = spark_X.coalesce(4)

In [15]:
# Check the number of partitions
spark_X.rdd.getNumPartitions()

4

In [16]:
# Define a function that is able to extract a single row from the dataframe
def getrows(df, rownums=None):
    return df.rdd.zipWithIndex().filter(lambda x: x[1] in rownums).map(lambda x: x[0])   

In [17]:
# Delete the non-numerical values
col_type = np.array(spark_X.dtypes)
types = col_type[:,1] 
colnames = col_type[:,0]
clean_X = spark_X.select([col(colnames[i]) for i in range(len(colnames)) if not types[i] == 'binary'])

In [18]:
# Define the number of rows and columns in the dataframe
n_rows = len_df
n_cols = len(getrows(clean_X, rownums=[0]).collect()[0])

                                                                                

In [19]:
n_cols

38

We print the two schemas to check that the non-numerical variables were removed correctly

In [None]:
spark_X.printSchema()

In [20]:
clean_X.printSchema()

root
 |-- duration: long (nullable = true)
 |-- protocol_type: binary (nullable = true)
 |-- service: binary (nullable = true)
 |-- flag: binary (nullable = true)
 |-- src_bytes: long (nullable = true)
 |-- dst_bytes: long (nullable = true)
 |-- land: long (nullable = true)
 |-- wrong_fragment: long (nullable = true)
 |-- urgent: long (nullable = true)
 |-- hot: long (nullable = true)
 |-- num_failed_logins: long (nullable = true)
 |-- logged_in: long (nullable = true)
 |-- num_compromised: long (nullable = true)
 |-- root_shell: long (nullable = true)
 |-- su_attempted: long (nullable = true)
 |-- num_root: long (nullable = true)
 |-- num_file_creations: long (nullable = true)
 |-- num_shells: long (nullable = true)
 |-- num_access_files: long (nullable = true)
 |-- num_outbound_cmds: long (nullable = true)
 |-- is_host_login: long (nullable = true)
 |-- is_guest_login: long (nullable = true)
 |-- count: long (nullable = true)
 |-- srv_count: long (nullable = true)
 |-- serror_rate: d

# kmeans|| - step 0: Useful functions

In [21]:
# Define the function for the squared distance between the cluster centers and the pandas dataframe (no more used)
def distance(xrow, centers, num_cols=n_cols):
    x = np.array(xrow)[:num_cols]
    the_ds = np.zeros(len(centers))
    
    for c in range(len(centers)):
        c_array = np.array(centers[c])[:num_cols]
        dist2 = np.linalg.norm(x - c_array)**2
        the_ds[c] = dist2
        
    return np.min(the_ds)

# Euclidean distance between two rows (limited to the first n_cols)
def euclid(xrow0, xrow1, num_cols = n_cols):
    return np.linalg.norm( np.array(xrow0[0:num_cols]) - np.array(xrow1[0:num_cols]) )

# Formula to evaluate the oversamping factor with the pre-factor G
def evaluate_l(log_phi, k, G):
    return G * k/log_phi # G = over-oversampling factor

# Function to select a row based on its probability
def select_row(x):
    if x > np.random.uniform(low = 0, high = 1):
        return True
    else:
        return False



# Kmeans|| - step 1. First sample and initial cost
To start things off, we randomly choose the first centroid and evaluate the cost function.

In [22]:
# Choose the first sample randomly: select the random row
random_n = [np.random.randint(0, n_rows)]
random_sample = getrows(clean_X, random_n).collect()

                                                                                

In [2]:
# Names of the dataset's columns.
colnames = list(clean_X.dtypes[i][0] for i in range(len(clean_X.dtypes)))

NameError: name 'clean_X' is not defined

In [25]:
# Add the colunm to keep the minimum distance between each point and the closest center
clean_X = (clean_X.select('*')
           .withColumn('minimum_cost', sum((col(colname)-random_sample[0][colname])**2 for colname in colnames)))

In [27]:
# Initial cost
initial_cost = np.log(clean_X.agg({"minimum_cost": "sum"}).collect()[0][0])
initial_cost

                                                                                

40.71877145069312

In [28]:
# Add the "minimum_cost" column to the center (with value 0), to avoid having different lengths in the final center list.
temp = random_sample[0].asDict()
temp["minimum_cost"] = 0.
random_sample[0] = Row(**temp)

We wanted to avoid having the centers scattered across multiple nodes, so we chose to broadcast the list of selected centers. If we didn't do it, each computation of euclidean distance would have been very heavy on the network communication.

In [29]:
# Broadcasting the row over the workers
bCent = sc.broadcast(random_sample)

# kmeans|| - step 3: Loop implementation
We now evaluate the number of iterations we want to cycle through and implement the for loop. In each iteration we will independently select centers based on their distance from those already selected.

In the loop, we evaluate the distance from the newly selected centers only and compare it to the previous minimum distance. This way, we avoid computing multiple times the distance from centers which we already know are not the closest ones from a given row.

In [30]:
# n_iterations = log(phi)
n_iter = int(initial_cost)
n_iter

40

In [31]:
#Initialize cost 
phi_iter = initial_cost

#Evaluate oversampling factor
l = evaluate_l(phi_iter, k, G)
l

10.314653046656463

In [None]:
i = 0 # count iterations
last_centers = 1 # how many centers were selected in the previous iteration. Initialized to 1

while i < n_iter: 

        '''
        Nel ciclo for:
            - Evaluate for each row l * d()^2 / phi
            - Sample with that probability
            - Broadcast centers to nodes
            - Evaluate new cost

        '''

        # Evaluate the probability and select the new center rows
        mod_phi = l/np.exp(phi_iter) #Normalization factor to obtain selection probability 
        new_rows = clean_X.select('*').withColumn('random_number', rand(seed=int(time.time())))\
                          .filter(col('random_number') < col('minimum_cost')*mod_phi).drop('random_number').collect()
        
        # Update the broadcast with newest at the beginning
        old_centers = bCent.value
        bCent.destroy()
        bCent = sc.broadcast(new_rows + old_centers)
    
        # Update the minimum distance
        if len(new_rows) == 1:
            clean_X = clean_X.select('*').\
                      withColumn('minimum_cost', least('minimum_cost', reduce(add, [(col(colname)-bCent.value[0][colname])**2 for colname in colnames]))).cache()

        elif len(new_rows) > 1:
            clean_X = clean_X.select('*').withColumn('dummy', least(*[reduce(add, [(col(colname)-bCent.value[center][colname])**2 for colname in colnames]) for center in range(len(new_rows))] )).\
                      withColumn('minimum_cost', least('minimum_cost', 'dummy')).\
                      drop('dummy').cache()

        last_centers = len(bCent.value)

        # Evaluate new cost
        phi_iter = np.log(clean_X.agg({"minimum_cost": "sum"}).collect()[0][0])

        i += 1

        print("\n at iteration", i ,", #values: ",len(bCent.value), " \n")

                                                                                

--- 1.3021507263183594 seconds ---
--- 0.019869565963745117 seconds ---


                                                                                

--- 2.0087106227874756 seconds ---

 at iteration 1 , #values:  2  



                                                                                

--- 0.8010632991790771 seconds ---
--- 0.02894139289855957 seconds ---


                                                                                

--- 5.651654958724976 seconds ---

 at iteration 2 , #values:  11  

--- 0.7130675315856934 seconds ---
--- 0.01677107810974121 seconds ---


                                                                                

--- 9.560967683792114 seconds ---

 at iteration 3 , #values:  28  

--- 0.7638812065124512 seconds ---
--- 0.019711732864379883 seconds ---


                                                                                

--- 7.5398359298706055 seconds ---

 at iteration 4 , #values:  41  

--- 0.7120873928070068 seconds ---
--- 0.029645919799804688 seconds ---


                                                                                

--- 12.50135326385498 seconds ---

 at iteration 5 , #values:  63  

--- 0.80338454246521 seconds ---
--- 0.014749288558959961 seconds ---


                                                                                

--- 8.033488750457764 seconds ---

 at iteration 6 , #values:  76  

--- 0.8093929290771484 seconds ---
--- 0.02459406852722168 seconds ---


                                                                                

--- 6.922570705413818 seconds ---

 at iteration 7 , #values:  86  

--- 0.8105731010437012 seconds ---
--- 0.020987987518310547 seconds ---


                                                                                

--- 6.072460412979126 seconds ---

 at iteration 8 , #values:  94  

--- 0.8704545497894287 seconds ---
--- 0.03604435920715332 seconds ---


                                                                                

--- 5.470468044281006 seconds ---

 at iteration 9 , #values:  101  



                                                                                

--- 0.890662431716919 seconds ---
--- 0.017138957977294922 seconds ---


                                                                                

--- 6.223142623901367 seconds ---

 at iteration 10 , #values:  110  

--- 0.8430938720703125 seconds ---
--- 0.02824568748474121 seconds ---


                                                                                

--- 7.190376043319702 seconds ---

 at iteration 11 , #values:  120  

--- 0.9445655345916748 seconds ---
--- 0.012666940689086914 seconds ---


                                                                                

--- 8.702638149261475 seconds ---

 at iteration 12 , #values:  133  



                                                                                

--- 1.0273563861846924 seconds ---
--- 0.028737783432006836 seconds ---


                                                                                

--- 5.99812126159668 seconds ---

 at iteration 13 , #values:  140  

--- 1.0069444179534912 seconds ---
--- 0.015172243118286133 seconds ---


                                                                                

--- 8.207391262054443 seconds ---

 at iteration 14 , #values:  152  

--- 1.0210638046264648 seconds ---
--- 0.007494688034057617 seconds ---


                                                                                

--- 6.4544031620025635 seconds ---

 at iteration 15 , #values:  160  



                                                                                

--- 1.0767366886138916 seconds ---
--- 0.00979161262512207 seconds ---


                                                                                

--- 7.37482762336731 seconds ---

 at iteration 16 , #values:  170  

--- 1.0891304016113281 seconds ---
--- 0.0114898681640625 seconds ---


                                                                                

--- 6.844731569290161 seconds ---

 at iteration 17 , #values:  179  



                                                                                

--- 1.151904821395874 seconds ---
--- 0.019900083541870117 seconds ---


                                                                                

--- 7.77961802482605 seconds ---

 at iteration 18 , #values:  189  

--- 1.1612231731414795 seconds ---
--- 0.009702920913696289 seconds ---


                                                                                

--- 8.744345664978027 seconds ---

 at iteration 19 , #values:  201  

--- 1.243210792541504 seconds ---
--- 0.02343297004699707 seconds ---


                                                                                

--- 12.11919093132019 seconds ---

 at iteration 20 , #values:  221  

--- 1.2077486515045166 seconds ---
--- 0.028949260711669922 seconds ---


                                                                                

--- 5.831791162490845 seconds ---

 at iteration 21 , #values:  227  

--- 1.1989903450012207 seconds ---
--- 0.0175168514251709 seconds ---


                                                                                

--- 9.444331407546997 seconds ---

 at iteration 22 , #values:  240  



                                                                                

--- 1.2848546504974365 seconds ---
--- 0.013524532318115234 seconds ---


                                                                                

--- 7.95656681060791 seconds ---

 at iteration 23 , #values:  249  

--- 1.285496711730957 seconds ---
--- 0.013084888458251953 seconds ---


                                                                                

--- 8.75012493133545 seconds ---

 at iteration 24 , #values:  261  



                                                                                

--- 1.4242756366729736 seconds ---
--- 0.01563286781311035 seconds ---


                                                                                

--- 9.232994556427002 seconds ---

 at iteration 25 , #values:  274  

--- 1.3975632190704346 seconds ---
--- 0.016738176345825195 seconds ---


                                                                                

--- 7.550790786743164 seconds ---

 at iteration 26 , #values:  282  

--- 1.485058307647705 seconds ---
--- 0.0286257266998291 seconds ---


                                                                                

--- 8.905394554138184 seconds ---

 at iteration 27 , #values:  292  

--- 1.6279282569885254 seconds ---
--- 0.006941795349121094 seconds ---


                                                                                

--- 8.82502555847168 seconds ---

 at iteration 28 , #values:  302  

--- 1.5473978519439697 seconds ---
--- 0.01551675796508789 seconds ---


                                                                                

--- 8.576607465744019 seconds ---

 at iteration 29 , #values:  311  

--- 1.605872392654419 seconds ---
--- 0.016425609588623047 seconds ---


                                                                                

--- 8.730584859848022 seconds ---

 at iteration 30 , #values:  321  

--- 1.6173851490020752 seconds ---
--- 0.008423566818237305 seconds ---


                                                                                

--- 13.12281322479248 seconds ---

 at iteration 31 , #values:  340  

--- 1.7660374641418457 seconds ---
--- 0.010366439819335938 seconds ---


                                                                                

--- 7.834041595458984 seconds ---

 at iteration 32 , #values:  347  

--- 1.8577003479003906 seconds ---
--- 0.012343883514404297 seconds ---


                                                                                

--- 11.143404960632324 seconds ---

 at iteration 33 , #values:  356  

--- 1.8668835163116455 seconds ---
--- 0.02301192283630371 seconds ---


                                                                                

--- 9.766727209091187 seconds ---

 at iteration 34 , #values:  366  

--- 1.9275741577148438 seconds ---
--- 0.0177462100982666 seconds ---


                                                                                

--- 7.805544137954712 seconds ---

 at iteration 35 , #values:  371  

--- 2.050144672393799 seconds ---
--- 0.02292919158935547 seconds ---


                                                                                

--- 11.059290409088135 seconds ---

 at iteration 36 , #values:  382  

--- 2.238307476043701 seconds ---
--- 0.007734775543212891 seconds ---


                                                                                

--- 11.896390199661255 seconds ---

 at iteration 37 , #values:  396  



                                                                                

--- 2.1909267902374268 seconds ---
--- 0.008402585983276367 seconds ---


In [None]:
print("Centers found: ", len(bCent.value))

# kmeans++ - step 0: Useful functions
We define a few functions which will be useful in the following code. They allow us, given a certain ```Row```, to find the closest center and the minimum distance.

In [None]:
# Find the closest center to a certain row
def find_min_euclid(xrow, broadC):
    distances = []
    for i in range(len(broadC.value)):
        distances.append(euclid(xrow, broadC.value[i]))
    return distances.index(min(distances))

# Find the minimum distance between row and centers
def min_euclid(xrow, broadC):
    distances = []
    for i in range(len(broadC.value)):
        distances.append(euclid(xrow, broadC.value[i])**2)
    return min(distances).item()

In [None]:
#Create a dataframe with the candidate centers
dfCent = spark.createDataFrame(bCent.value) 

In [None]:
dfCent.rdd.getNumPartitions()

# kmeans++ - step1: Evaluate weights

The weight $w$ of a center is the number of points in ```clean_X``` which are closest to it than to any other center

In [None]:
over_sampled_centers = dfCent.count()

In [None]:
wx = clean_X.rdd.map(lambda row: (find_min_euclid(row,bCent), 1)).\
        reduceByKey(lambda x,y: x+y).\
        takeOrdered(over_sampled_centers)

### Technical note

<em>Now we could have a problem. During the row sampling in the `kmeans||` process, it could happen that two identical rows (with indeces $i$ and $j$, $i<j$) are selected at the same loop iteration. This happens quite often if we have $G\gg1$. This results in one of the two centroids not appearing in ```wx```: all rows closest to $j$ are redirected towards $i$, even $j$ itself. So wx has a vacancy: the sequence goes 1,2,3, ..., $i$, ..., $j-1$, $j+1$, ...  . In order to solve this issue, we manually add missing entries and set their value to 0. This implies that the "twin" center has zero probability to be selected. All of this kerfuffle is just due to prevent a size error in the `rows_prob@wx` product, some cells below this line.<em/>

In [None]:
np.array(wx)[:,0]

In [None]:
counter = 0

while counter < len(wx):
    if wx[int(counter)][0] == counter:
        counter += 1
    else: 
        wx.insert(int(counter),(counter,0))
        counter += 1

In [None]:
#Take the weights and cast to float
wx = (np.array(wx)[:,1]).astype(float)

In [None]:
#Normalize weights
wx /= np.sum(wx)

# kmeans++ - step 2: Loop

In [None]:
# Draw 1 random center
first_index = np.random.choice(a=range(wx.shape[0]), size=1, p=wx) 
first_index

In [None]:
first_center = getrows(dfCent, first_index).collect()

In [None]:
# Adds the first center to the ultimate center broadcast
bCent_ultimate = sc.broadcast(first_center)
bCent_ultimate.value

In [None]:
# Define the total distance from other centers (minimum cost) over the chosen center
dfCent = (dfCent.select('*')
         .withColumn('minimum_cost', sum((col(colname)-first_center[0][colname])**2 for colname in colnames)))

Eliminiamo?

In [None]:
# Initialize the total cost value
#phi0 = np.log(dfCent.agg({"minimum_cost": "sum"}).collect()[0][0])
#phi0

In [None]:
ultimate_sample_n = len(bCent_ultimate.value)

while ultimate_sample_n < k: #n_iter or len(bCent.value) < k:

    # Evaluate the probability and select the new rows
    rows_prob = np.array(dfCent.rdd\
                .map(lambda row: distance(row,bCent_ultimate.value)).collect())
    
    # Sample new weighted random center
    another_index = np.random.choice(a=range(wx.shape[0]), size=1, p = rows_prob*wx/(rows_prob@wx) ) 
    another_center = getrows(dfCent, another_index).collect()
    
    # Update the broadcast
    old_centers = bCent_ultimate.value
    bCent_ultimate.destroy()    
    bCent_ultimate = sc.broadcast(another_center + old_centers)

    # For consistency reason:    
    # Update minimum cost
    #dfCent = dfCent.select('*').\
    #withColumn('minimum_cost', least('minimum_cost', sum((col(colname)-another_center[0][colname])**2 for colname in colnames) )).cache()
    
    # Evaluate new cost
    #phi0 = np.log(dfCent.agg({"minimum_cost": "sum"}).collect()[0][0])
    
    ultimate_sample_n = len(bCent_ultimate.value)
    print("i: ",ultimate_sample_n)
    

# k means
Finally, we implement ```kmeans``` on our dataset with the initial centroids obtained thus far. We stop when the relative variation of the cost function 

In [None]:
clean_X = clean_X.persist()

In [None]:
#Evaluate the initial value of the cost function
min_cost_udf = udf(lambda row: min_euclid(row, bCent_ultimate), FloatType())
clean_X = clean_X.withColumn("minimum_cost",min_cost_udf(array([col for col in colnames]))).persist()
phi0 = clean_X.agg({"minimum_cost": "sum"}).collect()[0][0]

In [None]:
n = 0
phi1 = phi0
still_different = True
eps = 1e-4

while still_different:

    # Create the weights and switch to indexed dataframe
    find_closest_udf = udf(lambda row: find_min_euclid(row, bCent_ultimate), IntegerType())
    final_df = clean_X.withColumn("closest", find_closest_udf(array([col for col in colnames]))).persist()

    # Update new centers:
    new_centers = final_df.groupBy('closest').agg(*[mean(c).alias(c) for c in colnames]).drop('closest').collect()
    #bCent_ultimate.destroy()
    bCent_ultimate = sc.broadcast(new_centers)

    # Update the minimum cost function
    min_cost_udf = udf(lambda row: min_euclid(row, bCent_ultimate), FloatType())
    clean_X = clean_X.withColumn("minimum_cost",min_cost_udf(array([col for col in colnames]))).persist()

    # Evaluate new cost
    phi1 = clean_X.agg({"minimum_cost": "sum"}).collect()[0][0]
    print('Relative cost variation:', abs(phi1 - phi0)/phi0)
    if abs(phi1 - phi0)/phi0 < eps:
        still_different = False
    else:
        phi0 = phi1

    n += 1

    #Persist to avoid stack overflow
    clean_X = clean_X.persist()

print('It took', n, 'iterations to converge')

# Plot results
We plot the final centers, projecting on two variables.

In [None]:
dim1 = "dst_bytes"
dim2 = "duration"

In [None]:
src_np = clean_X.select(dim1).collect()

In [None]:
dst_np = clean_X.select(dim2).collect()

In [None]:
plt.scatter(src_np[1:], dst_np[1:])

for i in range(len(bCent_ultimate.value)):
   plt.scatter(bCent_ultimate.value[i][dim1],bCent_ultimate.value[i][dim2],color='black', alpha = 0.5)
#plt.yscale('log')


In [None]:
np.log10(np.exp(26.7))

In [None]:
print('final time', time.time() - time0)

# TO DO:
- usare udf ovunque (fatto?)
- persist/unpersist sui vari dataframe in giro per non intasare la memoria

# Stop worker and master
Stop the running Spark context (sc) and Spark session (spark)

In [None]:
# sc.stop()
# spark.stop()