In [0]:
# Installation of pyspark 
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u252-b09-1~18.04).
0 upgraded, 0 newly installed, 0 to remove and 29 not upgraded.


In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
sc = SparkContext(appName="PythonKMeansOnIris") 
sqlContext = SQLContext(sc)

In [4]:
import numpy as np
import pandas as pd 
import pickle 
import sys 
import time
from numpy.linalg import norm 
from matplotlib import pyplot as plt

In [5]:
path = 'hdfs://localhost:9000/uber.csv'
spdf = sqlContext.read.csv(path, header=True) # requires spark 2.0
spdf.show()

+----------------+-------+--------+------+
|       Date/Time|    Lat|     Lon|  Base|
+----------------+-------+--------+------+
|5/1/2014 0:02:00|40.7521|-73.9914|B02512|
|5/1/2014 0:06:00|40.6965|-73.9715|B02512|
|5/1/2014 0:15:00|40.7464|-73.9838|B02512|
|5/1/2014 0:17:00|40.7463|-74.0011|B02512|
|5/1/2014 0:17:00|40.7594|-73.9734|B02512|
|5/1/2014 0:20:00|40.7685|-73.8625|B02512|
|5/1/2014 0:21:00|40.7637|-73.9962|B02512|
|5/1/2014 0:21:00|40.7252|-74.0023|B02512|
|5/1/2014 0:25:00|40.7607|-73.9625|B02512|
|5/1/2014 0:25:00|40.7212|-73.9879|B02512|
|5/1/2014 0:29:00|40.7255|-73.9986|B02512|
|5/1/2014 0:32:00|40.6467|-73.7901|B02512|
|5/1/2014 0:40:00|40.7613|-73.9788|B02512|
|5/1/2014 0:56:00|40.7807|-73.9497|B02512|
|5/1/2014 1:00:00|40.7585|-73.9708|B02512|
|5/1/2014 1:02:00|40.7163|-73.9895|B02512|
|5/1/2014 1:06:00|40.7265|-73.9958|B02512|
|5/1/2014 1:13:00|40.7559|-73.9867|B02512|
|5/1/2014 1:13:00|40.7671|-73.9956|B02512|
|5/1/2014 1:20:00|40.7707|-73.9944|B02512|
+----------

In [6]:
df = spdf.toPandas()
df.head()

Unnamed: 0,Date/Time,Lat,Lon,Base
0,5/1/2014 0:02:00,40.7521,-73.9914,B02512
1,5/1/2014 0:06:00,40.6965,-73.9715,B02512
2,5/1/2014 0:15:00,40.7464,-73.9838,B02512
3,5/1/2014 0:17:00,40.7463,-74.0011,B02512
4,5/1/2014 0:17:00,40.7594,-73.9734,B02512


In [7]:
df.dropna(axis=0,inplace=True)
df.head()

Unnamed: 0,Date/Time,Lat,Lon,Base
0,5/1/2014 0:02:00,40.7521,-73.9914,B02512
1,5/1/2014 0:06:00,40.6965,-73.9715,B02512
2,5/1/2014 0:15:00,40.7464,-73.9838,B02512
3,5/1/2014 0:17:00,40.7463,-74.0011,B02512
4,5/1/2014 0:17:00,40.7594,-73.9734,B02512


In [8]:
def parse_data(row):
    return (row[3],np.concatenate([(float(row[1]),), (float(row[2]),)]))
rdd = sc.parallelize([parse_data(row[1]) for row in df.iterrows()])
rdd.take(1)    

[('B02512', array([ 40.7521, -73.9914]))]

In [9]:
# Number of centroids
K = 3  
# Number of K-means runs that are executed in parallel. Equivalently, number of sets of initial points
RUNS = 1  
# For reproducability of results
RANDOM_SEED = 6000 
# The K-means algorithm is terminated when the change in the 
# location of the centroids is smaller than 0.1
converge_dist = 0.1

In [10]:
#Utility Functions
def print_log(s):
    '''
    Print progress logs
    '''
    sys.stdout.write(s + "\n")
    sys.stdout.flush()

def compute_entropy(d):
    '''
    Compute the entropy given the frequency vector `d`
    '''
    d = np.array(d)
    d = 1.0 * d / d.sum()
    return -np.sum(d * np.log2(d))


def choice(p):
    '''
    Generates a random sample from [0, len(p)),
    where p[i] is the probability associated with i. 
    '''
    random = np.random.random()
    r = 0.0
    for idx in range(len(p)):
        r = r + p[idx]
        if r > random:
            return idx
    assert(False)

In [11]:
#Initialization of Centroids
from operator import add
def kmeans_init(rdd, K, RUNS, seed):
    '''
    Select `RUNS` sets of initial points for `K`-means++
    '''
    # the `centers` variable is what we want to return
    n_data = rdd.count()
    shape = rdd.take(1)[0][1].shape[0]
    centers = np.zeros((RUNS, K, shape))
    def update_dist(vec, dist, k):
        new_dist = norm(vec - centers[:, k], axis=1)**2
        return np.min([dist, new_dist], axis=0)
    # The second element `dist` in the tuple below is the
    # closest distance from each data point to the selected
    # points in the initial set, where `dist[i]` is the
    # closest distance to the points in the i-th initial set
    data = (rdd
            .map(lambda p: (p, [np.inf] * RUNS)) \
            .cache())
    # Collect the feature vectors of all data points
    # beforehand, might be useful in the following
    # for-loop
    local_data = (rdd
                    .map(lambda t: t[1])
                    .collect())
    # Randomly select the first point for every run of
    # k-means++, i.e. randomly select `RUNS` points
    # and add it to the `centers` variable
    sample = [local_data[k] for k in
        np.random.randint(0, len(local_data), RUNS)]
    centers[:, 0] = sample
    for idx in range(K - 1):
        ########################################################
        # In each iteration, you need to select one point for
        # each set of initial points (so select `RUNS` points
        # in total). For each data point x, let D_i(x) be the
        # distance between x and the nearest center that has
        # already been added to the i-th set. Choose a new
        # data point for i-th set using a weighted probability
        # where point x is chosen with probability proportional
        # to D_i(x)^2 . Repeat each data point by 25 times
        # (for each RUN) to get 12140x25
        ########################################################
        #Update distance
        data = (data
            .map(lambda t:
                    ((t[0][0],t[0][1]),update_dist(t[0][1],t[1],idx)))
            .cache())
        #print(data.take(1)[1].shape[1])
        #Calculate sum of D_i(x)^2
        d1 = data.map(lambda t: (1,t[1]))
        d2 = d1.reduceByKey(lambda x,y: np.sum([x,y], axis=0))
        total = d2.collect()[0][1]
        #Normalize each distance to get the probabilities and
        #reshapte to 12140x25
        prob = (data
            .map(lambda t:
                np.divide(t[1],total))
            .collect())
        print(prob)
        prob = np.reshape(prob,(len(local_data), RUNS))
        #K'th centroid for each run
        data_id = [choice(prob[:,i]) for i in xrange(RUNS)]
        sample = [local_data[i] for i in data_id]
        centers[:, idx+1] = sample
    return centers

In [12]:
#KMeans++ Function
def get_closest(p, centers):
    '''
    Return the indices the nearest centroids of `p`.
    `centers` contains sets of centroids, where `centers[i]` is
    the i-th set of centroids.
    '''
    best = [0] * len(centers)
    closest = [np.inf] * len(centers)
    for idx in range(len(centers)):
        for j in range(len(centers[0])):
            temp_dist = norm(p - centers[idx][j])
            if temp_dist < closest[idx]:
                closest[idx] = temp_dist
                best[idx] = j
    return best


def kmeans(rdd, K, RUNS, converge_dist, seed):
    '''
    Run K-means++ algorithm on `rdd`, where `RUNS` is the number of
    initial sets to use.
    '''
    k_points = kmeans_init(rdd, K, RUNS, seed)
    print_log("Initialized.")
    temp_dist = 1.0

    iters = 0
    st = time.time()
    while temp_dist > converge_dist: 
        # Update all `RUNS` sets of centroids using standard k-means 
        # algorithm
        # Outline:
        #   - For each point x, select its nearest centroid in i-th 
        # centroids set
        #   - Average all points that are assigned to the same 
        # centroid
        #   - Update the centroid with the average of all points 
        # that are assigned to it
        temp_dist = np.max([
                np.sum([norm(k_points[idx][j] - new_points[(idx, 
                    j)]) for idx,j in new_points.keys()])
                            ])

        iters = iters + 1
        if iters % 5 == 0:
            print_log("Iteration %d max shift: %.2f (time: %.2f)" %
                      (iters, temp_dist, time.time() - st))
            st = time.time()

        # update old centroids
        # You modify this for-loop to meet your need
        for ((idx, j), p) in new_points.items():
            k_points[idx][j] = p

    return k_points

In [13]:
st = time.time()

np.random.seed(RANDOM_SEED)
centroids = kmeans(rdd, K, RUNS, converge_dist,np.random.randint(1000))
group = rdd.mapValues(lambda p: get_closest(p, centroids)) \
           .collect()

print("Time takes to converge:", time.time() - st)

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



NameError: name 'xrange' is not defined

In [None]:
def get_cost(rdd, centers):
    '''
    Compute the square of l2 norm from each data point in `rdd`
    to the centroids in `centers`
    '''
    def _get_cost(p, centers):
        best = [0] * len(centers)
        closest = [np.inf] * len(centers)
        for idx in range(len(centers)):
            for j in range(len(centers[0])):
                temp_dist = norm(p - centers[idx][j])
                if temp_dist < closest[idx]:
                    closest[idx] = temp_dist
                    best[idx] = j
        return np.array(closest)**2
    
    cost = rdd.map(lambda (name, v): _get_cost(v, 
           centroids)).collect()
    return np.array(cost).sum(axis=0)

cost = get_cost(rdd, centroids)
log2 = np.log2print "Min Cost:\t"+str(log2(np.max(cost)))
print "Max Cost:\t"+str(log2(np.min(cost)))
print "Mean Cost:\t"+str(log2(np.mean(cost)))