## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [3]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764027 sha256=64845c8e1f4c668f42f64e937befddc78db5f7b130e5d8d2a8cddc3dac7ae80f
  Stored in directory: c:\users\amitf\appdata\local\pip\cache\wheels\05\75\73\81f84d174299abca38dd6a06a5b98b08ae25fce50ab8986fa1
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0
Note: you may need to restart the kernel to use updated packages.


In [None]:
# File location and type
# File location and type
file_location_iris = "/FileStore/tables/iris.csv"
file_location_glass = "/FileStore/tables/glass.csv"
file_location_parkisons = "/FileStore/tables/parkinsons.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location_parkisons)

display(df)

In [5]:
import numpy as np
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.functions import vector_to_array
from pyspark.ml import Pipeline
import math
from sklearn.metrics.cluster import adjusted_rand_score
from sklearn.cluster import KMeans
from sklearn import metrics


In [None]:
def scale_data(df):
    # convert to vector for calculations
    assembler = VectorAssembler(inputCols=df.columns[:-1], outputCol="features")
    # scale with min max scale
    scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
    pipeline = Pipeline(stages=[assembler, scaler])
    scalerModel = pipeline.fit(df)
    scaledData = scalerModel.transform(df)
    points_scale = scaledData.select("scaledFeatures")
    points_with_array = points_scale.withColumn("scaledFeatures", vector_to_array("scaledFeatures"))
    points_change = points_with_array.rdd.map(lambda p: p[0])
    return_points = points_change.map(lambda p: tuple(p))
    return return_points

In [None]:
def sum_points(pointsA,pointsB):
    final_list = list(np.array(pointsA) + np.array(pointsB))
    return final_list
 

def distance(centroid, point):
    sub_list = list(np.array(centroid) - np.array(point))
    distance_vector_square = np.power(sub_list, 2)
    final = np.sqrt(np.sum(distance_vector_square))
    return final
        
def divide(all_g, c):
    return [val / c for val in all_g]


In [6]:
# allocate centroids
def assign_centroids(points,centroids):
    min_dist = math.inf
    nearest_centroid = 0
    for i in range(len(centroids)):
        distance_point = distance(centroids[i], points)
        if(distance_point < min_dist):
            min_dist = distance_point
            nearest_centroid = i
    return (nearest_centroid, points)


# function that checks that all the clusters are in the right the threshold 
def threshold_check(last_c, new_c, threshold):
    avg_difference = []
    for i in range(len(new_c)):
        lst = []
        for j in range(len(new_c[0])):
            lst.append(np.sqrt(np.power(new_c[i][j] - last_c[i][j],2)))
        avg_difference.append(np.mean(lst))
    counter = 0
    for avg in avg_difference:
        if avg < threshold:
            counter += 1
    # means that all centers diffrence were minor (less than 0.0001)
    return True if counter == len(avg_difference) else False


In [None]:
def K_means(data,k,CT=0.0001,I=30):
    # take all scaled points from the data
    points = scale_data(data)
    # initial centroids
    centroids = points.takeSample(False, k)
    count=0
    while (count < I):
        # map raduce functions
        map_func = lambda val: (val[0], (val[1], 1))
        reduce_func = lambda val1, val2: ((sum_points(val1[0], val2[0]), val1[1] + val2[1]))
        points2 = points.map(lambda val: assign_centroids(val,centroids))
        last_center = centroids
        # do the map reduce using rdd 
        summ  = points2.map(map_func)
        now_centroids = summ.reduceByKey(reduce_func).mapValues(lambda p: divide(p[0],p[1])).collect()
        # ectracr the centroids
        centroids = [p[1] for p in now_centroids]
        # if all pass the threshold- return the centers
        if threshold_check(last_center,centroids,CT):
            return now_centroids
        # else count = count +1 and continue to next iteration
        count += 1
    return now_centroids

In [None]:
# calculate the Calinski and harabasz score of the new centroids and the division of the points.
def get_ch(end_center_points, data):
    # get new class
    new_class = end_center_points.map(lambda d: d[0])
    points = end_center_points.map(lambda d: d[1])
    # evaluate the algorithem 
    get_ch = metrics.calinski_harabasz_score(points.collect(), new_class.collect())
    return get_ch

# Calculate the Adjusted rand index. Computes a similarity measure between two clusterings, the original and the new one.
def get_ari(end_center_points, data):
    # get original class 
    classes = data.select("class")
    class_original = classes.rdd.map(lambda d: d[0])
    # get new class
    new_class = end_center_points.map(lambda d: d[0])
    #calculate similarity
    get_ari = adjusted_rand_score(new_class.collect(), class_original.collect())         
    return get_ari

In [None]:
def K_means_scores(datapath,k,CT=0.0001,I=30,Exp=10):
    # read the data
    data = spark.read.csv(datapath, header = True, inferSchema = True)
    CH,ARI = [], []
    centroids = K_means(data,k,CT,I)
    points = scale_data(data)
    for _ in range(Exp):
        centroids = K_means(data,k,CT,I)
        centroids_vals = [i[1] for i in centroids]
        points_rdd = points.map(lambda d: assign_centroids(d,centroids_vals))
        get_ch_check = get_ch(points_rdd, data)
        CH.append(get_ch_check)
        get_ari_check = get_ari(points_rdd, data)
        ARI.append(get_ari_check)
    mean_CH,mean_ARI = np.mean(CH),np.mean(ARI)
    std_CH, std_ARI = np.std(CH),np.std(ARI)
    CH_values = mean_CH, std_CH
    ARI_values = mean_ARI, std_ARI
    result = [CH_values, ARI_values]
    return result

In [None]:
print(K_means_scores(file_location_iris,4))

[(294.5619181149922, 15.516159235247006), (0.6027895424100469, 0.018428318966144205)]


In [None]:
k_range_1, k_range_2, k_range_3 = [3,5,7,9,11],[2,4,6,8,10], [2,3,5,7,9]

# file_location_iris
for k in k_range_1:
    result = K_means_scores(file_location_iris, k = k)
    print(f"the scores for {k} clusters in iris:")
    print(f" mean of chalinski is {round(result[0][0],2)}, std chalinski is {round(result[0][1],2)}")
    print(f" mean of ari is {round(result[1][0],2)}, std ari is {round(result[1][1],2)}")

the scores for 3 clusters in iris:
 mean of chalinski is 320.67, std chalinski is 60.55
 mean of ari is 0.65, std ari is 0.11
the scores for 5 clusters in iris:
 mean of chalinski is 250.9, std chalinski is 25.28
 mean of ari is 0.56, std ari is 0.07
the scores for 7 clusters in iris:
 mean of chalinski is 223.41, std chalinski is 19.09
 mean of ari is 0.46, std ari is 0.07
the scores for 9 clusters in iris:
 mean of chalinski is 215.51, std chalinski is 17.51
 mean of ari is 0.41, std ari is 0.05
the scores for 11 clusters in iris:
 mean of chalinski is 186.89, std chalinski is 18.39
 mean of ari is 0.36, std ari is 0.06


In [None]:
# file_location_glass
for k in k_range_2:
    result = K_means_scores(file_location_glass, k = k)
    print(f"the scores for {k} clusters in glass:")
    print(f" mean of chalinski is {round(result[0][0],2)}, std chalinski is {round(result[0][1],2)}")
    print(f" mean of ari is {round(result[1][0],2)},  std  ari is {round(result[1][1],2)}")
    

the scores for 2 clusters in glass:
 mean of chalinski is 173.28, std chalinski is 0.24
 mean of ari is 0.21,  std  ari is 0.01
the scores for 4 clusters in glass:
 mean of chalinski is 93.83, std chalinski is 6.99
 mean of ari is 0.16,  std  ari is 0.06
the scores for 6 clusters in glass:
 mean of chalinski is 83.17, std chalinski is 9.11
 mean of ari is 0.18,  std  ari is 0.03
the scores for 8 clusters in glass:
 mean of chalinski is 75.06, std chalinski is 9.0
 mean of ari is 0.17,  std  ari is 0.03
the scores for 10 clusters in glass:
 mean of chalinski is 63.27, std chalinski is 7.53
 mean of ari is 0.19,  std  ari is 0.03


In [None]:
# file_location_parkinsons
for k in k_range_3:
    result = K_means_scores(file_location_glass, k = k)
    print(f"the scores for {k} clusters in glass:")
    print(f" mean of chalinski is {round(result[0][0],2)}, std ari is {round(result[0][1],2)}")
    print(f" mean of ari is {round(result[1][0],2)},  std ari is {round(result[1][1],2)}")

the scores for 2 clusters in glass:
 mean of chalinski is 173.39, std ari is 0.23
 mean of ari is 0.2,  std ari is 0.01
the scores for 3 clusters in glass:
 mean of chalinski is 112.77, std ari is 8.88
 mean of ari is 0.2,  std ari is 0.06
the scores for 5 clusters in glass:
 mean of chalinski is 88.22, std ari is 9.74
 mean of ari is 0.19,  std ari is 0.03
the scores for 7 clusters in glass:
 mean of chalinski is 79.79, std ari is 12.01
 mean of ari is 0.17,  std ari is 0.02
the scores for 9 clusters in glass:
 mean of chalinski is 69.06, std ari is 9.89
 mean of ari is 0.19,  std ari is 0.02
