# 1. Setting up PySpark



In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [3]:
!pip install -q findspark
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
sc=spark.sparkContext

In [5]:
!wget --continue https://github.com/yoavfreund/UCSD_BigData_2016/raw/master/Data/Weather/stations_projections.pickle -O /content/stations_projections.pickle

--2023-02-27 16:23:10--  https://github.com/yoavfreund/UCSD_BigData_2016/raw/master/Data/Weather/stations_projections.pickle
Resolving github.com (github.com)... 140.82.113.3
Connecting to github.com (github.com)|140.82.113.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/yoavfreund/UCSD_BigData_2016/master/Data/Weather/stations_projections.pickle [following]
--2023-02-27 16:23:10--  https://raw.githubusercontent.com/yoavfreund/UCSD_BigData_2016/master/Data/Weather/stations_projections.pickle
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2816937 (2.7M) [application/octet-stream]
Saving to: ‘/content/stations_projections.pickle’


2023-02-27 16:23:11 (215 MB/s) - ‘/content/stations_projection

# 2. Get weather dataset

In [6]:
# library
# !pip install pathos
# from pathos.multiprocessing import ProcessingPool as Pool

from multiprocessing.pool import ThreadPool as Pool
from functools import partial
import os

import numpy as np 
import pickle
import sys 
import time
from numpy.linalg import norm 
from matplotlib import pyplot as plt

In [7]:
def parse_data(row):
    '''
    Parse each pandas row into a tuple of 
    (station_name,  feature_vec),`l
    where feature_vec is the concatenation of the projection vectors
    of TAVG, TRANGE, and SNWD.
    '''
    return (row[0],
            np.concatenate([row[1], row[2], row[3]]))
## Read data
data = pickle.load(open("stations_projections.pickle", "rb"), encoding='latin1')
rdd = sc.parallelize([parse_data(row[1]) 
          for row in data.iterrows()])\
        .map(lambda p: p[1]) # p = (name, vec)
                      

  data = pickle.load(open("stations_projections.pickle", "rb"), encoding='latin1')


In [8]:
rdd.take(2)

[array([ 3.04796236e+03,  1.97434852e+03,  1.50560792e+02, -2.90363288e+03,
        -2.36907268e+02,  1.47021791e+02,  1.91503001e-01,  1.87262808e-01,
        -4.01379553e-02]),
 array([ 2.07214900e+03,  8.80454659e+02, -1.94039657e+01, -1.58834407e+03,
         2.20915926e+01,  5.39057098e+01,  3.15437799e-01,  1.26292084e-01,
         7.92078997e-01])]

# 3. Kmeans|| implementation

## 3.1 set up util functions and declare hyper-parameters

In [9]:
def print_log(s):
    '''
    Print progress logs
    '''
    sys.stdout.write(s + "\n")
    sys.stdout.flush()

def compute_entropy(d):
    '''
    Compute the entropy given the frequency vector `d`
    '''
    d = np.array(d)
    d = 1.0 * d / d.sum()
    return -np.sum(d * np.log2(d))

def choice(p):
    '''
    Generates a random sample from [0, len(p)],
    where p[i] is the probability associated with i. 
    '''
    random = np.random.random()
    r = 0.0
    for idx in range(len(p)):
        r = r + p[idx]
        if r > random:
            return idx
    assert(False)

def samplePartition(index, pointCosts, total, l, seed):
    #############################################################
    # sample E(l) points from a partition
    #############################################################
    # first set a distinct state
    seed = seed ^ index
    np.random.RandomState(seed)
    samples = filter(lambda p: # p = point, cost
                        np.random.random()*total < l*p[1]
                  ,pointCosts)
    return map(lambda p: p[0], samples)

def get_closest(p, centers):
    '''
    Return the indices the nearest centroids of `p`.
    `centers` contains sets of centroids
    '''
    best = 0
    closest = np.inf
    for j in range(len(centers)):
        temp_dist = norm(p - centers[j])
        if temp_dist < closest:
            closest = temp_dist
            best = j
    return best

def get_loss(rdd, centers):
    costs = rdd.map(lambda p: norm(p-get_closest(p,centers), axis=0)**2) 
    return costs.sum()


In [10]:
# Number of centroids
K = 5  
# For reproducability of results
RANDOM_SEED = 60295531 
# The K-means algorithm is terminated when the change in the 
# location of the centroids is smaller than 0.1
converge_dist = 0.1

## 3.2 MapReduce k-means++ for benchmark

In [11]:
def kmeans_pp_init(rdd, K, seed):
    #######################################################
    # parallelize k-means++
    #######################################################
    local_data = rdd.collect()
    def update_cost(vec, center, dist):
        return np.min([dist, norm(vec - center, axis=0)**2], axis=0)
    # cost vector
    costs = rdd.map(lambda _: np.inf)

    # Randomly select the first point for every run of k-means++
    newCenter = rdd.takeSample(False, 1, seed)[0]
    centers = [newCenter]

    for idx in range(K - 1):
        ########################################################
        # In each iteration, you need to select one point. 
        # For each data point x, let D(x) be the
        # distance between x and the nearest center. Choose a new
        # data point using a weighted probability proportional
        # to D(x)^2
        ########################################################
        bcNewCenter = sc.broadcast(newCenter)
        # Update cost
        costs = (rdd.zip(costs)
                    .map(lambda p: #p = (point, cost)
                                update_cost(p[0], bcNewCenter.value, p[1]))
                    .persist())
        # compute sum(D(x)^2)
        total = costs.sum()
        bcNewCenter.unpersist()
        # Normalize each distance to get the probabilities
        prob = costs.map(lambda p: p/total).collect()

        newCenter = local_data[choice(prob)]
        centers += [newCenter]
    return centers

## 3.3 k-means|| a.k.a kmeans_ss_init implementation

### a) local k-means++ implementation 

In [12]:
def _kmeans_pp_init(data, K, seed, w=None):
    ##########################################################
    # run k-means++ locally
    ##########################################################
    # sample the first center based on weight 
    n_data = len(data)
    if w is None: # uniformize weight vector if not exist
        w = [1]*n_data
        sample = np.random.randint(n_data)
    else: # pick sample based on distribution vector w
        sample = np.random.choice(range(n_data), p=[k/np.sum(w) for k in w])

    costs = [np.inf]*n_data
    # the `centers` variable is what we want to return
    centers = np.zeros((K, data[0].shape[0]))
    # Randomly select the first point for every run of k-means++
    centers[0] = data[sample]

    def update_cost(vec, dist, k):
        new_dist = norm(vec - centers[k], axis=0)**2
        return np.min([dist, new_dist], axis=0)
    for idx in range(K - 1):
        # Update cost D(x)^2
        costs = [update_cost(point, cost, idx) for point, cost in zip(data, costs)]
        # Calculate sum of w(x)*D(x)^2
        total = np.sum(np.multiply(w, costs))
        # Normalize each distance to get the probabilities
        prob = np.divide([weight*cost for weight, cost in zip(w, costs)], total)
        #K'th centroid for each run
        centers[idx+1] = data[choice(prob)]
    return centers

### b) MapReduce k-means|| implementation

In [13]:
# %%file utill.py
# from numpy.linalg import norm 
# def cost_p(center, point): 
#     return norm(point-center, axis=0)**2

In [14]:
# from utill import cost_p
def pointCost(centers, point):
    # with Pool(processes = os.cpu_count()) as pool:
    #     partial_dist = partial(cost_p, point=point)
    #     dist = pool.map(partial_dist, centers)
    #     print(dist)
    # return dist
    return [norm(point - center, axis=0)**2 for center in centers] 

def kmeans_ss_init(rdd, K, seed):
    #############################################################
    # overseeding l = 5K, t = 5  chosen according to Bachem.2017
    #############################################################
    l = 2*K # number of points in each iteration
    t = 5 # number of round
    # Initialize empty centers and point squared distance.
    costs = rdd.map(lambda _: np.inf)

    # Initialize the first center to a random point.
    newCenters = rdd.takeSample(False, 1, seed)
    centers = newCenters

    for idx in range(t):
        bcNewCenters = sc.broadcast(newCenters)
        # update cost vector again new centers
        costs = (rdd.zip(costs)
                    .map(lambda p: #p = (point, cost)
                       np.min(pointCost(bcNewCenters.value, p[0])+[p[1]]))
                    .persist())
        # compute sum(D(x)^2)
        total = costs.sum()
        bcNewCenters.unpersist()

        # sample E(l) new centers
        newCenters = (rdd.zip(costs)
                         .mapPartitionsWithIndex(
                              lambda index, pointCosts: 
                              samplePartition(index, pointCosts, total, l, seed^(idx<<16))
                              )
                         .collect())
        centers += newCenters
    #############################################################
    # Reclustering l*t+1 points into K points
    #############################################################
    costs.unpersist()
    distCenters = np.unique(centers, axis=0)
    bcCenters = sc.broadcast(distCenters) 
    # count number of points within clusters
    countMap = (rdd.map(lambda p: # p = point
                        get_closest(p, bcCenters.value))
                   .countByValue())
    bcCenters.destroy()
    # compute the weight vector
    weights = [countMap.get(i, 0) for i in range(len(distCenters))] 
    # locally perform kmeans++ on the shrinked dataset
    return _kmeans_pp_init(distCenters, K, seed, weights)




## 3.4 MapReduce k-means Lloyd iteration implementation

In [15]:
def kmeans_assign(rdd, k_points):
    label = (rdd
              .map(lambda p: # p = point
                  (get_closest(p, k_points), 
                    (p, 1))) 
              .reduceByKey(lambda x,y: 
                            np.sum([x, y], axis=0))
              .map(lambda p:  # p = (i, (sum(Si), |Si|))
                  (p[0], p[1][0]/p[1][1]))
              .collect())
    return dict(label)


def kmeans(rdd, K, converge_dist, seed, init_method):
    '''
    Run K-means++ algorithm on `rdd`, where `RUNS` is the number of
    initial sets to use.
    '''
    k_points = init_method(rdd, K, seed)
    print_log("Initialized.")
    temp_dist = 1.0

    iters = 0
    st = time.time()
    while temp_dist > converge_dist: 
        ###############################################################
        # Update all `RUNS` sets of centroids using standard k-means 
        ###############################################################
        # assign new centers
        new_points = kmeans_assign(rdd, k_points)

        # shift in centers
        temp_dist = np.max([
                np.sum([norm(k_points[j] - new_points[j]) 
                for j in new_points.keys()])
                            ])

        iters = iters + 1
        if iters % 5 == 0:
            print_log("Iteration %d max shift: %.2f (time: %.2f)" %
                      (iters, temp_dist, time.time() - st))
            st = time.time()

        # update old centroids
        # You modify this for-loop to meet your need
        for (j, p) in new_points.items():
            k_points[j] = p

    return k_points

## 3.5 Benchmark

In [16]:
# bench mark parallel k-means++
st = time.time()

np.random.seed(RANDOM_SEED)
centroids = kmeans(rdd, K, converge_dist, 
                   np.random.randint(1000), kmeans_pp_init)
print("Loss = ", get_loss(rdd, centroids))
# group = rdd.mapValues(lambda p: get_closest(p, centroids)) \
#            .collect()

print("Time takes parallel k-means++ to converge:", time.time() - st)


Initialized.
Iteration 5 max shift: 1923.19 (time: 6.99)
Iteration 10 max shift: 221.77 (time: 3.04)
Iteration 15 max shift: 118.59 (time: 3.97)
Iteration 20 max shift: 53.96 (time: 2.97)
Iteration 25 max shift: 43.26 (time: 2.55)
Iteration 30 max shift: 43.03 (time: 2.59)
Iteration 35 max shift: 8.19 (time: 2.45)
Iteration 40 max shift: 3.65 (time: 3.94)
Loss =  237752343938.56964
Time takes parallel k-means++ to converge: 35.75950884819031


In [17]:
# bench mark parallel k-means||
st = time.time()

np.random.seed(RANDOM_SEED)
centroids = kmeans(rdd, K, converge_dist, 
                  np.random.randint(1000), kmeans_ss_init)
print("Loss = ", get_loss(rdd, centroids))

# group = rdd.mapValues(lambda p: get_closest(p, centroids)) \
#            .collect()

print("Time takes parallel k-means|| to converge:", time.time() - st)

Initialized.
Iteration 5 max shift: 481.24 (time: 3.53)
Iteration 10 max shift: 211.39 (time: 2.51)
Iteration 15 max shift: 40.73 (time: 2.46)
Iteration 20 max shift: 7.39 (time: 2.77)
Loss =  237717607151.1062
Time takes parallel k-means|| to converge: 21.654953002929688


In [18]:
# bench mark random k-means
def kmeans_rand_init(rdd, K, seed):
    return rdd.takeSample(False, K, seed)
st = time.time()

np.random.seed(RANDOM_SEED)
centroids = kmeans(rdd, K, converge_dist, 
                   np.random.randint(1000), kmeans_rand_init)
print("Loss = ", get_loss(rdd, centroids))

# group = rdd.mapValues(lambda p: get_closest(p, centroids)) \
#            .collect()

print("Time takes local k-means++ to converge:", time.time() - st)

Initialized.
Iteration 5 max shift: 1333.51 (time: 2.84)
Iteration 10 max shift: 1432.02 (time: 2.56)
Iteration 15 max shift: 792.09 (time: 2.80)
Iteration 20 max shift: 570.63 (time: 2.39)
Iteration 25 max shift: 119.48 (time: 3.57)
Iteration 30 max shift: 62.97 (time: 2.43)
Iteration 35 max shift: 39.63 (time: 2.85)
Iteration 40 max shift: 37.14 (time: 2.46)
Iteration 45 max shift: 10.41 (time: 2.93)
Iteration 50 max shift: 3.53 (time: 3.17)
Iteration 55 max shift: 0.00 (time: 2.86)
Loss =  237706475377.01685
Time takes local k-means++ to converge: 31.54694437980652
