## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location1 = "/FileStore/tables/iris__1_-3.csv"
file_location2 = "/FileStore/tables/glass__1_-3.csv"

file_location3 = "/FileStore/tables/parkinsons__1_-3.csv"

file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
iris = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location1)

glass = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location2)

parkinsons = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location3)



In [0]:
import math
import numpy as np
from sklearn import metrics
from sklearn.cluster import KMeans
from pyspark.sql.functions import lit
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import regexp_replace
from sklearn.metrics import calinski_harabasz_score, adjusted_rand_score

In [0]:
def K_means(data,k,CT=0.0001,I=30):
    
    # scale the data
    tovector = VectorAssembler(inputCols=data.columns[:-1], outputCol="features").transform(data)     # convert each row to vector- create the function
    pointsscale = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(tovector).transform(tovector).select("scaledFeatures")      # create MinMaxScaler
    array_points = pointsscale.withColumn("scaledFeatures", vector_to_array("scaledFeatures"))     # change back to list
    points = array_points.rdd.map(lambda x: x[0]).map(lambda x: tuple(x))    # show only the vectors and convert to tuple
       
#   
    def WCSS(point ,centroids):
         # Saves the distance of the closest center and the center index
        min_distance, index_center = float("inf"),0
        for i in range(len(centroids)):
            lst = 0
            for m in range(len(point)):
                lst += (centroids[i][m] - point[m])**2
            new_distance = lst**0.5   
            if(new_distance < min_distance): # update min_distance to be the minimum distance from all the centers 
                min_distance, index_center = new_distance, i
        return (index_center, point)
    
    
    centroids = points.takeSample(False, k) # peak randomly the first samples 
    for i in range(I): 
        center_old = centroids
        points_wcss = points.map(lambda x: WCSS(x,centroids))
        lst_centers = points_wcss.map(lambda x: x[0])

        # create new distance    
        prapare_to_reduce = points_wcss.map(lambda x: (x[0], (x[1], 1)))   # Prapare to - reduce, the left value will be the key
        avg_reduce = prapare_to_reduce.reduceByKey(lambda x,y: ([x1 + y1 for (x1, y1) in zip(x[0], y[0])], x[1] + y[1]))  # Count how much points in each center and sum together the rows
        new_centers = avg_reduce.mapValues(lambda x: [(h / x[1]) for h in x[0]]).collect()  # Divide each value by the number of points to create the new center
        
        centroids = [i[1] for i in new_centers]
        
        # checks the distance between old to new center -less then CT
        lst = 0
        count =0
        for i in range(len(center_old)):
            count += 1
            for m in range(len(center_old[0])):                
                lst += (centroids[i][m] - center_old[i][m])**2
                distance_oldnew = lst**0.5  # Distance between new to old
            if distance_oldnew > CT: # if Distance bigger then CT continue
                    break
            if count == len(center_old):                             
                return (new_centers, lst_centers.collect())  # stop and return
            
    return (new_centers, lst_centers.collect())
       
    


In [0]:
def results(location, k, name, CT = 0.0001, I = 10, Exp = 10):
    
    data = location.toPandas()
    true_clusterings = data['class'].tolist() # create list with the true centers
    lst_ch = []
    lst_ari = []
    for i in range(Exp):  # foe each k iter 10 times

        points = K_means(location, k, 0.0001,30)
        pred_center = points[1]   # list of the new centers
       
        ari = adjusted_rand_score(true_clusterings, pred_center)# calcualte the ari measure
        lst_ari.append(ari) #  append adjusted_rand_score to list
        ch = calinski_harabasz_score(data, pred_center) # calcualte the ch measure
        lst_ch.append(ch) # append calinski_harabasz_score to list
       
        
    lst_ch = np.array(lst_ch)
    mean_ch = np.mean(lst_ch)  # calculate the mean 
    std_ch = np.std(lst_ch)  # calculate the std
    
    lst_ari = np.array(lst_ari)
    mean_ari = np.mean(lst_ari) # calculate the mean 
    std_ari = np.std(lst_ari)  # calculate the std 
    # print the results for each dataset
    print(f'Dataset: {name} | K: {k} | CH: ({mean_ch:.2f}, {std_ch:.2f}) | ARI: ({mean_ari:.2f}, {std_ari:.2f}))')
    



In [0]:

for k in range(2, 7):
    results(iris, k, 'IRIS')

for k in range(2, 7):
    results(glass, k, 'GLASS')
    
for k in range(2, 7):
    results(parkinsons, k, 'PARKINSON')
  

    
  

Dataset: IRIS | K: 2 | CH: (493.88, 0.00) | ARI: (0.57, 0.00))
Dataset: IRIS | K: 3 | CH: (496.73, 80.95) | ARI: (0.68, 0.08))
Dataset: IRIS | K: 4 | CH: (430.42, 51.65) | ARI: (0.60, 0.03))
Dataset: IRIS | K: 5 | CH: (391.20, 44.36) | ARI: (0.53, 0.05))
