In [1]:
import pyspark
import pyspark.sql
import pyspark.ml
import pyspark.sql.types as pst
import pyspark.sql.functions
import pyspark.ml.stat
import numpy as np
import scipy
import scipy.spatial

In [2]:
sc = pyspark.SparkContext("local", "kNN")
ss = pyspark.sql.SparkSession(sc)

In [3]:
schema = pst.StructType(
    [
        pst.StructField("col_A", pst.FloatType()),
        pst.StructField("col_B", pst.FloatType()),
        pst.StructField("col_C", pst.FloatType()),
        pst.StructField("col_D", pst.FloatType()),
        pst.StructField("col_E", pst.FloatType()),
        pst.StructField("col_F", pst.FloatType()),
        pst.StructField("col_G", pst.FloatType()),
    ]
)

In [4]:
n_clusters = 3

In [5]:
df = ss.read.csv("data.csv", schema=schema, header=True)

In [6]:
va = pyspark.ml.feature.VectorAssembler(inputCols=df.columns, outputCol="features")
df = va.transform(df).select("features")

In [7]:
df.show(5)

+--------------------+
|            features|
+--------------------+
|[-0.7103070020675...|
|[0.50356602668762...|
|[1.11614501476287...|
|[0.24310199916362...|
|[0.48134401440620...|
+--------------------+
only showing top 5 rows



In [10]:
def calc_dists(df, n_clusters, clusters=None):
    if clusters is None:
        clusters = df.take(n_clusters)

    col = pyspark.sql.functions.col("features")
    dist_cols = []
    
    for i, cluster in enumerate(clusters):
        cls_udf = pyspark.sql.functions.udf(
            lambda row: float(scipy.spatial.distance.euclidean(row, cluster)),
            pst.FloatType())
        
        col_name = f"cluster_dist_{i}"
        df = df.withColumn(col_name, cls_udf(col))
        dist_cols.append(col_name)
    
    va = pyspark.ml.feature.VectorAssembler(inputCols=dist_cols, outputCol="clust_dists")
    
    if "clust_dists" in df.columns:
        df = df.drop("clust_dists")
    
    df = va.transform(df).select(["features", "clust_dists"])
    
    return df


def kmeans(df, n_clusters, n_iter=10):
    summ = pyspark.ml.stat.Summarizer.metrics("mean")
    
    argmin_udf = pyspark.sql.functions.udf(
        lambda row: int(np.argmin(row)),
        pst.IntegerType())
    
    clust_id_col = pyspark.sql.functions.col("clust_dists")
    
    fun_assign_cluster = argmin_udf(clust_id_col)
    clusters = None
    
    for i in range(n_iter):
        df = calc_dists(df, n_clusters, clusters=clusters)
        
        df = df.withColumn("cluster_id", fun_assign_cluster)
        
        clusters = (
            df.groupBy("cluster_id")
            .agg(summ.summary(df.features)
            .alias("cluster_coords"))
            .select("cluster_coords").collect()
        )
        
    return df

In [12]:
df = kmeans(df.select("features"), 3, n_iter=10)
df.show(5)

+--------------------+--------------------+----------+
|            features|         clust_dists|cluster_id|
+--------------------+--------------------+----------+
|[-0.7103070020675...|[32.3831634521484...|         2|
|[0.50356602668762...|[120.000160217285...|         1|
|[1.11614501476287...|[46.5675544738769...|         2|
|[0.24310199916362...|[66.8327865600586...|         0|
|[0.48134401440620...|[145.702499389648...|         1|
+--------------------+--------------------+----------+
only showing top 5 rows

