In [None]:
# general imports
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import os
import pandas as pd

# dataset
from sklearn.datasets import fetch_kddcup99
from sklearn.preprocessing import StandardScaler

# pyspark module
from pyspark.rdd import RDD

# src module
from src.utils import sparkSetup
from src.kmeans import compute_centroidDistances, get_clusterId, get_minDistance

: 

In [None]:
# creating the zipped environment if it doesn't already exist
! if [ ! -f "environment.tar.gz" ]; then venv-pack -o "environment.tar.gz" ; fi
# creating the zipped module src
! if [ -f "src.tar.gz" ]; then rm src.tar.gz ; fi
! tar -czf src.tar.gz src

: 

In [None]:
# starting the cluster
! $SPARK_HOME/sbin/start-all.sh

: 

In [None]:
# telling spark where to find the python binary
os.environ["PYSPARK_PYTHON"] = "./environment/bin/python"

: 

In [None]:
# creating a sparkSession
spark = sparkSetup("kMeans")
sc = spark.sparkContext
# exporting the src module
sc.addPyFile("src.tar.gz") 

: 

### Naming conventions

The single datum is named as `datumName`, while the RDD that is a collection of one or more data is called `datumName_rdd`.

Example: `compute_clusterDistances` returns `clusterDistances` (i.e. a numpy array of distances between a point `x` and the `centroids`). 
The RDD that collects all the `clusterDistances` will be called `clusterDistances_rdd`, and here is a sample implementation of that:
```python
def compute_centroidDistances(x, centroids):
    return np.sum((centroids - x)**2, axis = 1)

# `data_rdd` is an RDD
# `centroids` is a numpy array
clusterDistances_rdd = data_rdd \
    .map(lambda x: compute_clusterDistances(x, centroids))
```

### Load and preprocess the dataset

In [None]:
#Real dataset

# Change percent10 to 'False' to fetch the full dataset (4M rows)
# for local works is better to leave it as 'True'
kdd = fetch_kddcup99(shuffle=True,percent10=True) 
kdd_data = kdd.data

# Remove string features and standardize them
data = np.delete(kdd_data,np.arange(1,4,1),axis = 1) 
scaler = StandardScaler()
data = scaler.fit_transform(data)

#parallelize
data_rdd = sc.parallelize([row for row in data])
data_rdd = data_rdd.persist()

: 

In [None]:
def kMeansPlusPlus_init(
    data: npt.NDArray,
    k: int,
    weights: npt.NDArray = np.array([])
) -> npt.NDArray:
    """
    Standard kMeans++ initialization method:
    given `data` (eventually weighted), returns `k` cluster centroids
    """
    if weights.shape[0] == 0:
        weights = np.ones(shape=(data.shape[0],1))
    
    centroids = data[np.random.randint(0, data.shape[0]),:].reshape(1, -1) # reshaping for easier stacking
    
    while (centroids.shape[0] < k):
        # since the original functions are made for map
        # we need to loop over the data
        minDistance_array = np.array(
            [get_minDistance(compute_centroidDistances(datum, centroids)) for datum in data]
        ) * weights # multiplyling by the weight simulates multiple copies of the same datum
        total_minDistance = np.sum(minDistance_array)
        # sampling probability proportional to minDistance
        new_centroid_idx = np.random.choice(minDistance_array.shape[0], size = 1, p = minDistance_array / total_minDistance)
        new_centroid = data[new_centroid_idx,:].reshape(1, -1)

        # edge case in which the same centroid is selected twice:
        # redo the iteration without saving the centroid
        if any(np.array_equal(new_centroid, row) for row in centroids): continue
        centroids = np.concatenate((centroids, new_centroid), axis = 0)

    return centroids

: 

In [None]:
def kMeansNaive(
    data: npt.NDArray,
    centroids: npt.NDArray,
    epochs: int = 5
) -> npt.NDArray:
    """
    Standard kMeans algorithm:
    given `data`, updates the (k) `centroids` for `epochs` times,
    improving the clustering each time
    """
    k = centroids.shape[0]
    for _ in range(epochs):
        assignments = np.array(
            [get_clusterId(compute_centroidDistances(x, centroids)) for x in data]
        )
        centroids = np.array(
            [np.mean(data[assignments==i,:], axis = 0) for i in range(k)]
        )
    return centroids

: 

In [None]:
def kMeansParallel_init(
    data_rdd: RDD,
    k: int,
    l: float
) -> npt.NDArray:
    """
    kMeans|| initialization method:
    returns `k` good `centroids`.
    `l` controls the probability of each point
    in `data_rdd` of being sampled as a pre-processed centroid.
    """

    centroids = np.array(
        data_rdd.takeSample(num=1, withReplacement=False)
    )
    
    minDistance_rdd = data_rdd \
        .map(lambda x: (x, get_minDistance(compute_centroidDistances(x, centroids)))) \
        .persist()

    cost = minDistance_rdd \
        .map(lambda x: x[1]) \
        .sum()

    iterations = int(np.ceil(np.log(cost))) if (cost > 1) else 1
    for _ in range(iterations):
        new_centroids = np.array(
            minDistance_rdd \
                .filter(lambda x: np.random.rand() < np.min((l * x[1] / cost, 1))) \
                .map(lambda x: x[0]) \
                .collect()
        )
        # edge case in which no new centroid is sampled:
        # this avoids the following `np.concatenate` to fail
        if len(new_centroids.shape) < 2:
            continue

        minDistance_rdd.unpersist()
        centroids = np.unique(
            np.concatenate((centroids, new_centroids), axis = 0), 
            axis = 0
        )

        minDistance_rdd = data_rdd \
            .map(lambda x: (x, get_minDistance(compute_centroidDistances(x, centroids)))) \
            .persist()
        cost = minDistance_rdd \
            .map(lambda x: x[1]) \
            .sum()
    
    minDistance_rdd.unpersist()
    clusterCounts = data_rdd \
        .map(lambda x: (get_clusterId(compute_centroidDistances(x, centroids)), 1)) \
        .countByKey()
    
    clusterCounts = np.array([w[1] for w in clusterCounts.items()])
    centroids = kMeansNaive(
        centroids, 
        kMeansPlusPlus_init(centroids, k, clusterCounts)
    )
    
    return centroids

: 

In [None]:
k = 10
l = k * 10
centroids = kMeansParallel_init(data_rdd, k, l)

: 

In [None]:
def miniBatchKMeans(
    data_rdd: RDD,
    centroids: npt.NDArray,
    iterations: int = 10,
    batch_fraction: float = 0.1
) -> npt.NDArray:
    k = centroids.shape[0] # number of centroids
    clusterCounters = np.zeros((k,)) # 1 / learning_rate
    for iter in range(iterations):
        # sample uniformly from the partitions a batch_fraction of the total rdd dataset
        miniBatch_rdd = data_rdd \
            .sample(withReplacement=False, fraction=batch_fraction)
        # map procedure: assign each point to a cluster based on the distance with respect to the centroids and persist it 
        miniBatch_rdd = miniBatch_rdd \
            .map(lambda x: (get_clusterId(compute_centroidDistances(x, centroids)), 1, x)) \
            .persist()
        
        # reduce procedure: counting how many assigments per cluster
        clusterCounts_dict = miniBatch_rdd \
            .map(lambda x: (x[0], x[1])) \
            .countByKey()
        
        # count how many points per cluster in this minibatch
        clusterCounts = np.array(
            [clusterCounts_dict[i] if i in clusterCounts_dict.keys() else 0 for i in range(k)]
        )
        # update the counts for the global counter
        clusterCounters += clusterCounts
        
        # edge case in which a cluster has no assignments:
        # if also its counter is zero the whole iteration is repeated
        if any(np.isclose(v, 0) for v in clusterCounters): 
            iter -= 1
            miniBatch_rdd.unpersist()
            continue
        # otherwise its count will be set to 1 to avoid division by 0 in the update step
        clusterCounts = np.where(clusterCounts >= 1, clusterCounts, 1)

        # summing all points assigned to the same cluster
        # (in the update step this will be divided by the counts 
        # in order to get the mean for every cluster).
        # reduce procedure: in this case a dict is used for convenience and consistency with clusterCounts 
        clusterSums_dict = dict(miniBatch_rdd \
            .map(lambda x: (x[0], x[2])) \
            .reduceByKey(lambda x, y: x + y) \
            .collect()
        )
        # edge case in which a cluster has no assignments:
        # the centroid is returned instead of 0 
        # (which would have been the sum of its assigned points) 
        # in order to not update its position 
        # (note how the terms cancel out in the update step)
        clusterSums = np.array(
            [clusterSums_dict[i] if i in clusterSums_dict.keys() else centroids[i,:] for i in range(k)]
        )

        # update step: c <- (1 - eta) * c + eta * x_mean
        # (note x_mean = x_sums / c_count)
        centroids = (1 - 1 / clusterCounters).reshape(-1, 1) * centroids + \
                    (1 / (clusterCounters * clusterCounts)).reshape(-1, 1) * clusterSums
        
        # free the memory 
        miniBatch_rdd.unpersist()
        
    return centroids

: 

In [None]:
final_centroids = miniBatchKMeans(data_rdd, centroids, 10, 0.1)

: 

In [None]:
sc.stop()
spark.stop()

: 

In [None]:
# stopping the cluster
! $SPARK_HOME/sbin/stop-all.sh

: 