In [0]:
import math
import random
from pyspark.sql import Row
import numpy as np 
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as f
from sklearn.metrics import calinski_harabasz_score
from sklearn.metrics import adjusted_rand_score
import pandas as pd


In [0]:
def Load_dataset(path):
  ##Load dataset , normalize it and return it as a normalized rdd , a features-only list of vectors, and original
  ## class list.
  # File location and type
  file_location = path
  file_type = "csv"

  # read CSV
  df = spark.read.format(file_type) \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .option("sep", ",") \
    .load(file_location)
  
  #normalize the data using MinMax method
  columns_to_scale = [col for col in df.columns if col not in 'class']
  assemblers = [VectorAssembler(inputCols=[col], outputCol=col + "_vec") for col in columns_to_scale]
  scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
  pipeline = Pipeline(stages=assemblers + scalers)
  scalerModel = pipeline.fit(df)
  scaledData = scalerModel.transform(df)
  #rearrange the dataframe
  names = {x + "_scaled": x for x in columns_to_scale}
  names["class"] = "class"
  scaledData = scaledData.select([f.col(c).alias(names[c]) for c in names.keys()])

  return scaledData.rdd ,scaledData.drop("class"), scaledData.select("class")

In [0]:
def get_rand_k(k , rdd , seed=1):
  ##samples k different rows from df as started centorid location. 
  c = []
  n_rows = rdd.count()
  if k>=n_rows:
    raise ValueError(" K is too large")
  else:
    clust = rdd.takeSample(False, k , seed=seed)
  return clust

In [0]:
def clean_cluster(clust_lst):
  ##cleans a list of rows to be features only numpy array.
  out=[]
  for i in range(len(clust_lst)):
    vec= np.array([x for y,x in clust_lst[i].asDict().items() if y not in "class"])
    out.append((i,vec))
  return sc.parallelize(out)

In [0]:
def ret_nearest_centroid(row , clust_df):
  ##gets a row and a centroid dataframe - calculates the eucleadean distance between the row and every centroid.
  ## returns the closest centorid in the form: (closest centroid, (row_vector , 1))
  
  dist= {}
  row_vec = np.array([x for y,x in row.asDict().items() if y not in "class"])
  for i in range(len(clust_df)):
    cluter_i = clust_df[i]
    clust_vec = cluter_i[1]
    euc_dist = np.linalg.norm(row_vec - clust_vec)
    dist[i] = euc_dist

  return (min(dist, key=dist.get),(row_vec , 1 ))


In [0]:
def vec_addition(row1,row2):
  ##gets two rows , sums their coordinates and counts how many rows were summed in the procces.
  ## return form : (sum of coordinates ,  total rows summed so far)
  vec_1 =np.array(row1[0])
  vec_2 = np.array(row2[0])

  return (vec_1+vec_2 , row1[1]+row2[1])

In [0]:
def get_change_rate(old_clust,new_clust):
  ##calculates the eucledean distance between all the old and new centroids.
  sum = 0 
  for i in range(len(old_clust)):
    sum+=np.linalg.norm(old_clust[i][1]-new_clust[i][1])
  return sum

In [0]:
  
def get_pred(end_cluster,df_rdd):
  ##returns a classification of the data using centroids.
  pred = df_rdd.map(lambda x: ret_nearest_centroid(x ,clust_df =end_cluster)).map(lambda x: x[0])
  return pred.collect()

In [0]:
def k_means_MR(df_rdd, K , CT =1.e-4 , I = 30  , Exp = 10):
  ##func - iterates over data until final centroids location calculated.
  
  #sample k clean clusters from the rdd
  clust_rdd = clean_cluster(get_rand_k(K, df_rdd , seed=1+Exp))

  for epoch in range(I):
    old_clust = clust_rdd.collect()
    classified = df_rdd.map(lambda x: ret_nearest_centroid(x ,clust_df =old_clust)) 
    reduced = classified.reduceByKey(lambda x, y: vec_addition(x,y))
    new_centroid = reduced.mapValues(lambda summed: np.array(summed[0]) / float(summed[1]))
    new_centroid_sorted = new_centroid.sortBy(keyfunc = lambda x: x[0],ascending=True)
    clust_rdd=new_centroid_sorted
    change_rate = get_change_rate(old_clust,new_centroid_sorted.collect())

    if change_rate < CT:
      break
     
  return new_centroid_sorted.collect()


In [0]:
def K_wrap(path , K , CT =1.e-4 , I = 30  , Exp = 10):
  ##wrapper function for multiple runs of the model
  
  df_rdd ,only_features, true_lbl = Load_dataset(path)
  true_lbl_lst  = [x[0] for x in true_lbl.collect()]
  feature_lst = [np.array([y[0] for y in x]) for x in only_features.collect()]

  CH_lst = []
  ARI_lst= []
  
  for test in range(Exp):
    # getting the centroids the model has found
    end_cluster = k_means_MR(df_rdd , K , CT , I, test)
    pred_lbl = get_pred(end_cluster,df_rdd)
    #perform test to assest the model accuracy
    CH_lst.append(calinski_harabasz_score(feature_lst,pred_lbl ))
    ARI_lst.append(adjusted_rand_score(true_lbl_lst, pred_lbl))
  return round(np.mean(ARI_lst),3), round(np.std(ARI_lst),3), round(np.mean(CH_lst), 3), round(np.std(CH_lst),3)


In [0]:
DSnames= ["iris"]*5+["glass"]*5+["parkinsons"]*5
Kvals= list(range(2,7))+list(range(5,10))+list(range(2,7))

ARI_Avg=[]
ARI_Std=[]
CH_Avg=[]
CH_Std=[]

result = { "Dataset name": DSnames,
          "The value of K": Kvals,
          "Average of ARI": [0]*15,
          "Standard deviation of ARI": [0]*15,
          "Average of CH": [0]*15,
          "Standard deviation of CH": [0]*15
         }

df = pd.DataFrame (result, columns = ['Dataset name','The value of K','Average of ARI','Standard deviation of ARI','Average of CH','Standard deviation of CH'])

# run all the required tests to fill the table
for i in range(15):
  x,y,z,w = K_wrap("/FileStore/tables/"+DSnames[i]+".csv",Kvals[i])
  ARI_Avg.append(x)
  ARI_Std.append(y)
  CH_Avg.append(z)
  CH_Std.append(w)
  


# add the results to the dataframe
df["Average of ARI"] =ARI_Avg
df["Standard deviation of ARI"] =ARI_Std
df["Average of CH"] =CH_Avg
df["Standard deviation of CH"] =CH_Std


In [0]:
display(df)

Dataset name,The value of K,Average of ARI,Standard deviation of ARI,Average of CH,Standard deviation of CH
iris,2,0.568,0.0,353.367,0.0
iris,3,0.656,0.114,325.89,61.189
iris,4,0.598,0.031,298.434,17.375
iris,5,0.575,0.082,266.838,19.181
iris,6,0.533,0.087,234.612,20.178
glass,5,0.197,0.046,78.287,10.204
glass,6,0.182,0.035,74.686,7.282
glass,7,0.155,0.019,72.439,8.488
glass,8,0.177,0.013,69.782,6.77
glass,9,0.176,0.035,62.619,3.868
