## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
import numpy as np
import os
import pandas as pd
from scipy.spatial.distance import euclidean
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import calinski_harabasz_score, adjusted_rand_score

In [0]:
def map_dist(point, centroids):
    # calculate the distances between the point and each centroid
    dist = []
    for c in centroids:
        distance = euclidean(np.asarray(point), np.asarray(c))
        dist.append(distance)
    # minimum distance between the point and centroid
    index = dist.index(min(dist))

    return (index, point)

In [0]:
def calinski(rdd, centroids):
    tuples = rdd.map(lambda x: map_dist(x, centroids)).collect()
    new_tuples = []
    for val in tuples:
        new_tuples.append(val[0])
    tuples = new_tuples
    calinski = calinski_harabasz_score(np.asarray(rdd.collect()), np.asarray(tuples))

    return calinski

In [0]:
def ari(class_labels, rdd, centroids):
    tuples = rdd.map(lambda x: map_dist(x, centroids)).collect()
    new_tuples = []
    for v in tuples:
        new_tuples.append(v[0])
    tuples = new_tuples
    ari = adjusted_rand_score(np.asarray(class_labels), np.asarray(tuples))
    return ari


In [0]:
def k_Means_one_exp(csv, K, CT=0.0001, I=30, Exp=10):
    #read CSV
    file_name = os.path.basename(csv)
    file_location = f'/FileStore/tables/{file_name}'
    #convert the file into a DataFrame
    df = spark.read.csv(file_location, header=True, inferSchema=True)
    #select class column
    class_labels = df.select('class').rdd.map(lambda x: x[0]).collect()
    #remove the class column from the DataFrame
    df = df[df.columns[:-1]]
    rdd = df.rdd
    
    #normalize the rdd by MinMaxScaler
    rdd = spark.sparkContext.parallelize(MinMaxScaler().fit_transform(rdd.collect()))
    
    results = []
    centroids = []
    centroids = rdd.takeSample(False, K)
        
    #I- Number of iteration per experiment
    for i in range(I):
        mapped_points = rdd.map(lambda x: map_dist(x, centroids))
        new_centroids = mapped_points.map(lambda x: (x[0], (x[1],1)))
        new_centroids = new_centroids.reduceByKey(lambda x, y: (tuple(x[0][i] + y[0][i] for i in range(len(x[0]))), x[1] + y[1]))
        new_centroids = new_centroids.mapValues(lambda x: tuple(x[0][i] / x[1] for i in range(len(x[0]))))

        collected_centroids = new_centroids.collect()
        new_centroids = []
        for c in collected_centroids:
            centroid = c[1]
            new_centroids.append(centroid)

        # Check if the algorithm has converged- all centorids moves less than ct
        less_than_ct = 0 
        for j in range(K):
            if euclidean(np.array(new_centroids[j]) , np.array(centroids[j])) > CT:
                continue          
            else:
                less_than_ct +=1
        if less_than_ct == K:
            stop=True
        else:
            stop=False

        centroids = new_centroids

        # Stop iterating if the algorithm has converged
        if stop==True:
            break
        
    ch1 = calinski(rdd, centroids)
    ari1 = ari(class_labels, rdd, centroids)

    return(ch1,ari1)
    
    


In [0]:
def k_means_total(csv, CT=0.0001, I=30, Exp=10):
    res= []
    ks =[2,3,4,5,6]
    for i in ks:
        ch_values = []
        ari_values = []
        for e in range(Exp):
            output = k_Means_one_exp(csv,i,CT=0.0001, I=30, Exp=10)
            ch_values.append(output[0])
            ari_values.append(output[1])
        ch_mean = np.mean(ch_values)
        ch_std = np.std(ch_values)
        ari_mean = np.mean(ari_values)
        ari_std = np.std(ari_values)
        res.append((i,(ch_mean, ch_std), (ari_mean, ari_std)))
    data = {
    'k': [item[0] for item in res],
    'ch_mean': [item[1][0] for item in res],
    'ch_std': [item[1][1] for item in res],
    'ari_mean': [item[2][0] for item in res],
    'ari_std': [item[2][1] for item in res]
    }

    # Create a DataFrame from the dictionary
    df = pd.DataFrame(data)

    # Print the DataFrame
    return df

In [0]:
print(k_means_total("C:/Users/admin/Desktop/iris.csv"))
print(k_means_total("C:/Users/admin/Desktop/glass.csv"))
print(k_means_total("C:/Users/admin/Desktop/parkinsons.csv"))

   k     ch_mean        ch_std  ari_mean       ari_std
0  2  353.367403  5.684342e-14  0.568116  1.110223e-16
1  3  354.314311  4.252907e+00  0.708604  7.737707e-03
2  4  289.014067  2.024856e+01  0.600832  4.317271e-02
3  5  250.313582  3.124448e+01  0.527754  5.673505e-02
4  6  249.341078  8.667907e+00  0.444443  4.906509e-02
   k     ch_mean     ch_std  ari_mean   ari_std
0  2  121.718931  45.111072  0.160822  0.091818
1  3   87.335520  22.081006  0.150814  0.062779
2  4   86.264952   9.018010  0.162450  0.048207
3  5   79.861992   9.267530  0.154188  0.018694
4  6   76.304776   8.756932  0.182704  0.022389
   k    ch_mean    ch_std  ari_mean   ari_std
0  2  84.215403  0.002291  0.049991  0.003000
1  3  76.159637  1.144218  0.073659  0.015327
2  4  71.207502  4.415010  0.098153  0.043647
3  5  63.592962  3.763051  0.097184  0.054201
4  6  59.200200  2.232983  0.079758  0.036050
