## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
#imports
import numpy as np
import os
import json
import findspark
import random
findspark.init()
import pyspark
import random
from sklearn.preprocessing import MinMaxScaler
from sklearn import metrics
from sklearn.metrics import pairwise_distances
from sklearn import datasets
import statistics as st

In [0]:
#read csv function, gets the location of the csv
def read_csv(file_location):
  #reading the csv
  df = spark.read.csv(file_location, inferSchema = True, header = True)
  #creating rdd
  ds = df.rdd.collect()
  size = len(df.columns) 
  lst = ['f' + str(i) for i in range(1, size)] # creating columns' names 
  #creating the list 'points' of the Points from the dataset 
  points = []
  #creating labels list for the ARI evaluating
  labels_true = []
  for row in ds:
    vec = []
    #saving the labels
    labels_true.append(row["class"])
    for col in lst:
      vec.append(row[col])
    points.append(vec)
  # normalize the points
  scaler = MinMaxScaler()
  scaler.fit(points)
  scaler.data_max_
  norm_pts = scaler.transform(points)
  points = norm_pts.tolist()
  # convert the points list to rdd object
  Pointsrdd = spark.sparkContext.parallelize(points)
  return (points,Pointsrdd,labels_true)

def kRandom(K, points):
  #select random points from the dataset to be the initial centroids 
  centroids_index = random.sample(range(len(points)), K)
  centroids = [points[i] for i in centroids_index]
  return centroids

def euclidean_distance(p, c):
  #they have the same size
  size = len(c)
  dist = 0
  # calculating the euclidean distance 
  for i in range(size):
    dist += (p[i]-c[i])**2
  return dist

def centroidForPoint(p, C):
  dist = []
  for cent in C:
    dist.append(oklidi(p, cent))
  # each point's centroid is the closest one from all of the centroids  
  ind = np.argmin(np.array(dist))
  return ind

def addPoints(x1, x2):
  res_list = [x1[i] + x2[i] for i in range(len(x1))] # add between two points
  return res_list

def dividePoints(x1, num):
  res_list = [x1[i] / num for i in range(len(x1))] # divide between two points
  return res_list  
    

In [0]:
# K-means function
def kMeans(dataset, K, CT = 0.0001, I = 30, Exp = 10):
  Points = read_csv(dataset)
  # true labels for the evaluation calculate
  labels_true = Points[2] 
  #lists for the evaluation results
  CH = []
  ARI = []
  #repeating the experiment 
  for i in range(Exp):
    KCentroids = kRandom(K, Points[0])
    iter = 0
    dist = 1 # the convergence rate 
    while(iter < I and dist >= CT):
      iter+=1
      #mapping each point to it's centrid with "centroidForPoint"
      mapper = Points[1].map(lambda p: (centroidForPoint(p, KCentroids), (p,1)))
      #counting for each centroid the amount of points belong to it, adding between all of the points and numbering the centroids
      #using reduceByKey
      reducer = mapper.reduceByKey(lambda p1_c1, p2_c2: (addPoints(p1_c1[0],p2_c2[0]), p1_c1[1] + p2_c2[1]))
      #calculate the new centroid, by the average for each cluster of points
      newCents = reducer.map(lambda st: (st[0], dividePoints(st[1][0], st[1][1])))
      #calculating the convergence rate by summerise the distances between the old centroids to the new ones
      dist = np.sum(np.array([oklidi(newCents.collect()[i][1], KCentroids[i]) for i in range(newCents.count())]))
      #saving the new centroids
      KCentroids = [newCents.collect()[i][1] for i in range(newCents.count())]
    # mapping according to the last iteration 
    mapper = Points[1].map(lambda p: (centroidForPoint(p, KCentroids), (p,1)))
    #saving the points
    X = np.array([np.array(mapper.collect()[i][1][0]) for i in range(mapper.count())])
    #saving the pedictions
    pred_labels = np.array([mapper.collect()[i][0] for i in range(mapper.count())])
    #calculating CH
    CH.append(metrics.calinski_harabasz_score(X, pred_labels))
    #calculating ARI
    ARI.append(metrics.adjusted_rand_score(labels_true, pred_labels))
  mean_CH = st.mean(CH)
  std_CH = st.stdev(CH)
  mean_ARI = st.mean(ARI)
  std_ARI = st.stdev(ARI)
  return (mean_CH, std_CH, mean_ARI, std_ARI)
    

In [0]:
# Run
dataset1 = "/FileStore/tables/iris-1.csv"
dataset2 = "/FileStore/tables/glass-1.csv"
dataset3 = "/FileStore/tables/parkinsons-3.csv"
datasets = [dataset1, dataset2, dataset3]

for ds in range(len(datasets)):
  if ds == 0:
    name = "Iris"
  elif ds == 1:
    name = "Glass"
  elif ds == 2:
    name = "Parkinson"
  print("Dataset: " + name)
  for K in range(2, 7):
    print("K: " + str(K))
    result = kMeans(datasets[ds], K)
    print("CH results: " + str((result[0], result[1])))
    print("ARI results: " + str((result[2], result[3])))
    print("----------------")
