In [0]:
from sklearn.preprocessing import MinMaxScaler
import pandas as pd
import numpy as np



In [0]:
def minMax_Scale(data):
    """
    Function to Min Max Scale the Data using sklearn MinMaxScaler.

    :param data: (pyspark.sql.dataframe.DataFrame) The data.
    :return: pyspark.sql.dataframe.DataFrame: Normalized Data
    """
    scaler = MinMaxScaler()
    # Cast to Pandas DataFrame
    pandas_df = data.toPandas()
    # Scale
    scaled_data = pd.DataFrame(scaler.fit_transform(pandas_df), columns=pandas_df.columns)
    # Slice off the last column and convert back to a Spark DataFrame
    scaled_data = scaled_data.iloc[:, :-1]
    scaled_spark = spark.createDataFrame(scaled_data)
    return scaled_spark


def Kmeans(data, k, ct=0.0001, iterations=30, initial_centroids=None):
    """
    Perform K-means clustering using MapReduce.

    Parameters:
        data (pyspark.sql.dataframe.DataFrame): A dataset in pyspark.sql.dataframe.DataFrame format
        k: the number of clusters
        ct: Convergence threshold (parameter - default is set to 0.0001)
        iterations: Number of iteration per experiment (parameter - default is set to 30)
        initial_centroids (list): - List of initial centroid locations where each centroid is represented by a tuple of the location

    Returns:
        list: A list of the centroids calculated by the algorithm so that each centroid is represented by a tuple of its location
    """
    # Min Max Scale
    data = minMax_Scale(data)
    # Cast to RDD of NumPy arrays
    rdd = data.rdd.map(lambda row: np.array(row))

    # Initialize centroids if not entered by the user by takeSample(False, k)
    if initial_centroids is None:
        centroids = rdd.takeSample(False, k)
    else:
        centroids = initial_centroids

    # Preform Iterations
    for i in range(iterations):
        # Function for the Map step
        def Map_Step(point):
            """
            Assign points to the nearest centroid

            Parameters:
                point: point to assign to new cluster

            Returns:
                nearest centroid of the point
            """
            distances = [np.linalg.norm(point - centroid) for centroid in centroids]
            return np.argmin(distances), point
        
        # Reduce - Recalculate centroids as the mean of assigned points
        def Reduce_Step(point1, point2):
            """
            Combines two points

            Parameters:
                point1: First point
                point2: Second point

            Returns:
                points combined
            """
            return point1[0] + point2[0], point1[1] + point2[1]
        
        # Send to all workers
        bc_centroids = sc.broadcast(centroids)

        # For each data point xi: Find the nearest centroid and Assign the point to that cluster
        temp_clustered_points = rdd.map(Map_Step)

        # Calculate new centroids
        new_centroids = (
            temp_clustered_points
            .mapValues(lambda point: (point, 1))  # Map step on points
            .reduceByKey(Reduce_Step)  # Reduce Step
            .mapValues(lambda x: x[0] / x[1])  # Calculate the mean for each cluster
            .collect()
        )

        # For each cluster j=1,…,k: new centroid = average of all points assigned to cluster c
        new_centroids = [centroid[1] for centroid in sorted(new_centroids)]

        # Check if the change in centroid positions is less than the specified threshold (ct).
        check_converged = True
        for new, old in zip(new_centroids, centroids):
            if np.linalg.norm(np.array(new) - np.array(old)) >= ct:
                check_converged = False
                break

        centroids = new_centroids

        # Break if the change in centroid positions is less than the specified threshold (ct).
        if check_converged:
            break
    
    final_centroids = [tuple(round(num, 5) for num in tup) for tup in centroids]
    return final_centroids

In [0]:
# """ ******************* Importing the data ******************* """

Iris = "/FileStore/tables/Iris.csv"
file_type = "csv"
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","
iris = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(Iris)

""" ******************* Example for testing ******************* """
new_centroids = Kmeans(iris, 2, initial_centroids=[(0.5,0.5,0.5,0.5,0.5),(0.3,0.3,0.3,0.3,0.3)])
round_new_centroids=[tuple(round(num, 5) for num in tup) for tup in new_centroids]
expected_new_centroids=[(0.16871,0.19553,0.58252,0.08475,0.06618),(0.67067,0.54882,0.36532,0.66478,0.65951)]
if (not len(new_centroids)==len(expected_new_centroids)):
    print("Failed - Number of clusters is different than requested")
if set(round_new_centroids)==set(expected_new_centroids):
    print("The test passed successfully")
else:
    print("The test failed")

new_centroids = Kmeans(iris, 3, initial_centroids=[(0.7,0.7,0.7,0.7,0.7),(0.5,0.5,0.5,0.5,0.5),(0.3,0.3,0.3,0.3,0.3)])
round_new_centroids=[tuple(round(num, 5) for num in tup) for tup in new_centroids]
expected_new_centroids=[(0.84235, 0.65366, 0.41933, 0.77894, 0.81117),
 (0.51298, 0.44864, 0.31368, 0.55836, 0.51965),
 (0.16443, 0.19611, 0.59083, 0.07864, 0.06)]
if (not len(new_centroids)==len(expected_new_centroids)):
    print("Failed - Number of clusters is different than requested")
if set(round_new_centroids)==set(expected_new_centroids):
    print("The test passed successfully")
else:
    print("The test failed")

The test passed successfully
The test passed successfully
