In [5]:
from functools import reduce
import random
import math
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import adjusted_rand_score, calinski_harabasz_score
import statistics
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [18]:
# File location and type
file_glass = "/content/glass.csv"
file_iris = "/content/iris.csv"
file_parkinsons = "/content/parkinsons.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_iris)

df.show()

+---+---+---+---+-----+
| f1| f2| f3| f4|class|
+---+---+---+---+-----+
|5.1|3.5|1.4|0.2|    0|
|4.9|3.0|1.4|0.2|    0|
|4.7|3.2|1.3|0.2|    0|
|4.6|3.1|1.5|0.2|    0|
|5.0|3.6|1.4|0.2|    0|
|5.4|3.9|1.7|0.4|    0|
|4.6|3.4|1.4|0.3|    0|
|5.0|3.4|1.5|0.2|    0|
|4.4|2.9|1.4|0.2|    0|
|4.9|3.1|1.5|0.1|    0|
|5.4|3.7|1.5|0.2|    0|
|4.8|3.4|1.6|0.2|    0|
|4.8|3.0|1.4|0.1|    0|
|4.3|3.0|1.1|0.1|    0|
|5.8|4.0|1.2|0.2|    0|
|5.7|4.4|1.5|0.4|    0|
|5.4|3.9|1.3|0.4|    0|
|5.1|3.5|1.4|0.3|    0|
|5.7|3.8|1.7|0.3|    0|
|5.1|3.8|1.5|0.3|    0|
+---+---+---+---+-----+
only showing top 20 rows



In [19]:
def csv_to_df(datapath):
    file_location = datapath
    file_type = "csv"

    # CSV options
    infer_schema = "true"
    first_row_is_header = "true"
    delimiter = ","

    # The applied options are for CSV files. For other file types, these will be ignored.
    df = spark.read.format(file_type) \
      .option("inferSchema", infer_schema) \
      .option("header", first_row_is_header) \
      .option("sep", delimiter) \
      .load(file_location)

    return df

In [20]:
def find_nearest_centroid(centroids, point):

    min_dist = float("inf")

    centro_id = 0
    for cid in range(len(centroids)):
        euc_dist =0
        for  i in range(len(centroids[cid])):
            euc_dist+= (point[i]-centroids[cid][i])**2
        distance  = math.sqrt(euc_dist)
        if distance < min_dist:
            min_dist = distance
            centroid_id = cid
    return (centroid_id,point)

In [21]:
def convergence_check(new, curr,threshold):
    num_centroids = len(new)
    dimensions = len(new[0])
    for i in range(num_centroids):
        temp_dist = 0
        for j in range(dimensions):
            temp_dist += (new[i][j] - curr[i][j])**2
        temp_dist = math.sqrt(temp_dist)
        if temp_dist > threshold:
            return False
    return True

In [22]:
def kmeans_map_reduce(dataset, kclusters, ct = 0.0001, maxiter = 30, n = 10):
    data = csv_to_df(dataset)

    data_features = data.select(data.columns[:-1])
    data_class = data.select('class')

    # Normalize data using MinMaxScaler
    assembler = VectorAssembler(inputCols=data_features.columns, outputCol="features")
    feature_vector = assembler.transform(data_features)

    scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
    scaler_model = scaler.fit(feature_vector)
    scaled_data = scaler_model.transform(feature_vector)

    normalized_features_rdd = scaled_data.select("scaled_features").rdd.map(lambda x: x[0] )
    normalized_data_tup = normalized_features_rdd.map(lambda p: tuple(p))


    centroids = normalized_data_tup.takeSample(True, kclusters)

    for iteration in range(maxiter):

        mapped_points = normalized_data_tup.map(lambda point: find_nearest_centroid(centroids, point))
        groupby_key = mapped_points.map(lambda point: (point[0], (point[1],1)))
        reduceby_key = groupby_key.reduceByKey(lambda x,y: (tuple(x[0][i]+y[0][i] for i in range(len(x[0]))), x[1]+y[1]))
        new_centroids = reduceby_key.mapValues(lambda x: tuple(x[0][i]/x[1] for i in range(len(x[0]))))
        new_centroids = new_centroids.sortByKey().collect()
        new_centroids = [c[1] for c in new_centroids]

        og_labels = [row['class'] for row in data_class.collect()]
        predicted_labels = [item[0] for item in mapped_points.collect()]

        data_features_ch = [list(row) for row in normalized_data_tup.collect()]

        ch_score = calinski_harabasz_score(data_features_ch, predicted_labels)
        rand_score = adjusted_rand_score(og_labels, predicted_labels)


        if (convergence_check(new_centroids, centroids, ct)):
            return ch_score, rand_score
        else:
            centroids = new_centroids

    return ch_score, rand_score



print(kmeans_map_reduce(file_iris, 7, maxiter = 1))

(225.53552597341655, 0.436715767759047)


In [25]:
def kmeans_k_many_experiments(dataset, ct = 0.0001, maxiter = 30, n = 10):
    ret = []
    k_clusters = [2,3,4,5,6]
    for k in k_clusters:
        ch_scores = []
        rand_scores = []
        for i in range(n):
            scores = kmeans_map_reduce(dataset, k, ct, maxiter)
            ch_scores.append(scores[0])
            rand_scores.append(scores[1])
        avg_ch = statistics.mean(ch_scores)
        std_ch = statistics.stdev(ch_scores)
        avg_rand = statistics.mean(rand_scores)
        std_rand = statistics.stdev(rand_scores)
        ret.append((k, (avg_ch, std_ch), (avg_rand, std_rand)))
    print(ret)



kmeans_k_many_experiments(file_iris, n=10)

[(2, (353.36740323251195, 0.0), (0.5681159420289855, 0.0)), (3, (327.59160689878, 65.30232030289393), (0.6588639134918161, 0.12117468350111307)), (4, (298.81355614031577, 31.720556329328307), (0.6313195353060308, 0.04680681104323954)), (5, (250.74076590366252, 21.784278287214967), (0.5940452163833395, 0.059477874694107466)), (6, (238.4158212060635, 34.90048154343617), (0.522023796160944, 0.07357835865887383))]
