In [0]:
from pyspark import *
import random
import pandas as pd
import statistics as st
from pyspark.sql import *
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import calinski_harabasz_score, adjusted_rand_score

In [0]:
def kmeans_algorithm(data, k, convergence_threshold=0.0001, num_iterations=30):
    # Convert DataFrame to Pandas DataFrame and apply MinMaxScaler
    pandas_df = MinMaxScaler().fit_transform(data.toPandas())
    pandas_df = pd.DataFrame(pandas_df, columns=data.columns)

    # Create Spark DataFrame from Pandas DataFrame
    df = spark.createDataFrame(pandas_df)

    # Convert df to rdd
    data_rdd = df.rdd
    
    # Change data types with map
    data_rdd = data_rdd.map(lambda row: [float(x) for x in row])
    
    # Step 1: Randomly pick K data points from the data as initial centroids
    initial_centroids = data_rdd.takeSample(False, k)
    
    # Distance between two points
    euclidean_distance = lambda p, center: (next(i for i, c in enumerate(initial_centroids) if c == center), sum((p[i] - center[i])**2 for i in range(len(p)-1))**0.5)

    check = lambda observation, j: 1 if observation[-1] == j else 0

    for i in range(num_iterations):
        is_converged = 1
        # Step 2 b: Assign data points to the nearest centroid
        assign_cluster = lambda p: [item for item in p] + [int(min((euclidean_distance(p,center) for center in initial_centroids), key = lambda x: x[1])[0])]
        classification = data_rdd.map(assign_cluster)


        # Step 3: Calculate new centroids
        sum_points = lambda p1, p2: [p1[i] + p2[i] for i in range(len(p1))]
        for j in range(k):
            cluster_j = classification.filter(lambda obs: check(obs, j))
            sum_cluster = cluster_j.reduce(sum_points)
            last_center_at_j = initial_centroids[j]
            initial_centroids[j] = [x / cluster_j.count() for x in sum_cluster]


            if euclidean_distance(last_center_at_j, initial_centroids[j])[1] >= convergence_threshold:
                is_converged = 0
            # STOP because is converged 
            else:
                is_converged = 1
                break
                

        # Stop
        if is_converged == 1:
            break

    return classification


In [0]:
def check_each_data(data, k_values, exp):
    for k in k_values:
        print(f"K = {k}")
        ch_scores = []  # Store CH scores for each experiment
        ari_scores = []  # Store ARI scores for each experiment
        for r in range(exp):  # Run 'exp' experiments
            classification = kmeans_algorithm(data, k)
            ch_true = classification.map(lambda row: row[:-2]).collect()
            ch_pred = classification.map(lambda row: row[-1]).collect()
            ari_true = data.select("Class").toPandas()["Class"].tolist()
            ari_pred = classification.map(lambda row: row[-1]).collect()

            # Calculate CH and ARI scores
            ch_score = calinski_harabasz_score(ch_true, ch_pred)
            ari_score = adjusted_rand_score(ari_true, ari_pred)

            ch_scores.append(ch_score)
            ari_scores.append(ari_score)

        avg_ch_score = sum(ch_scores) / len(ch_scores)
        std_ch_score = st.stdev(ch_scores)
        avg_ari_score = sum(ari_scores) / len(ari_scores)
        std_ari_score = st.stdev(ari_scores)
        print(f"The average and standard deviation values of the CH measure across all {exp} experiments: ({avg_ch_score:.2f} ; {std_ch_score:.2f})")
        print(f"The average and standard deviation values of the ARI measure across all {exp} experiments: ({avg_ari_score:.2f} ; {std_ari_score:.2f})")
        print("")



In [0]:
dataset_path = "/FileStore/tables/iris.csv"

# values of K
k_values = [2, 3, 4, 5, 6]

exp = 10
print ("iris")
file_type = "csv"
data = spark.read.csv(dataset_path, header=True)
check_each_data(data, k_values, exp)


iris
K = 2
The average and standard deviation values of the CH measure across all 10 experiments: (353.37 ; 0.00)
The average and standard deviation values of the ARI measure across all 10 experiments: (0.57 ; 0.00)

K = 3
The average and standard deviation values of the CH measure across all 10 experiments: (302.55 ; 71.51)
The average and standard deviation values of the ARI measure across all 10 experiments: (0.62 ; 0.14)

K = 4
The average and standard deviation values of the CH measure across all 10 experiments: (260.41 ; 71.50)
The average and standard deviation values of the ARI measure across all 10 experiments: (0.60 ; 0.11)

K = 5
The average and standard deviation values of the CH measure across all 10 experiments: (241.13 ; 42.56)
The average and standard deviation values of the ARI measure across all 10 experiments: (0.56 ; 0.10)

K = 6
The average and standard deviation values of the CH measure across all 10 experiments: (212.57 ; 43.55)
The average and standard deviation

In [0]:

dataset_path = "/FileStore/tables/glass.csv"

# values of K
k_values = [2, 3, 4, 5, 6]

exp = 10
print ("glass")
file_type = "csv"
data = spark.read.csv(dataset_path, header=True)
check_each_data(data, k_values, exp)



glass
K = 2
The average and standard deviation values of the CH measure across all 10 experiments: (121.71 ; 47.55)
The average and standard deviation values of the ARI measure across all 10 experiments: (0.16 ; 0.10)

K = 3
The average and standard deviation values of the CH measure across all 10 experiments: (101.10 ; 4.59)
The average and standard deviation values of the ARI measure across all 10 experiments: (0.15 ; 0.04)

K = 4
The average and standard deviation values of the CH measure across all 10 experiments: (84.48 ; 11.19)
The average and standard deviation values of the ARI measure across all 10 experiments: (0.16 ; 0.04)

K = 5
The average and standard deviation values of the CH measure across all 10 experiments: (71.45 ; 11.68)
The average and standard deviation values of the ARI measure across all 10 experiments: (0.14 ; 0.03)

K = 6
The average and standard deviation values of the CH measure across all 10 experiments: (65.91 ; 9.94)
The average and standard deviation va

In [0]:

dataset_path = "/FileStore/tables/parkinsons.csv"

# values of K
k_values = [2, 3, 4, 5, 6]

exp = 10
print ("parkinsons")
file_type = "csv"
data = spark.read.csv(dataset_path, header=True)
check_each_data(data, k_values, exp)



parkinsons
K = 2
The average and standard deviation values of the CH measure across all 10 experiments: (84.22 ; 0.00)
The average and standard deviation values of the ARI measure across all 10 experiments: (0.05 ; 0.00)

K = 3
The average and standard deviation values of the CH measure across all 10 experiments: (75.74 ; 1.38)
The average and standard deviation values of the ARI measure across all 10 experiments: (0.07 ; 0.02)

K = 4
The average and standard deviation values of the CH measure across all 10 experiments: (60.87 ; 7.28)
The average and standard deviation values of the ARI measure across all 10 experiments: (0.08 ; 0.04)

K = 5
The average and standard deviation values of the CH measure across all 10 experiments: (58.04 ; 6.30)
The average and standard deviation values of the ARI measure across all 10 experiments: (0.07 ; 0.05)

K = 6
The average and standard deviation values of the CH measure across all 10 experiments: (55.54 ; 2.65)
The average and standard deviation va