In [1]:
import numpy as np 
from math import radians, sin, cos, sqrt, atan2
import timeit

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1544581478054_0010,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
def closest_point(p, ps, m):
    """
    given a (latitude/longitude) point 
    and an array of current center points
    returns the index in the array of 
    the center closest to the given poin
    :param p:  given point
    :param ps: center points
    :param m:  distance functions
    :return:   the index of the closest center point
    """
    ps = np.asarray(ps)
    return np.argmin(m(p, ps))


def add_points(p1, p2):
    """
    given two points, return a point 
    which is the sum of the two points.
    """
    return p1[0] + p2[0], p1[1] + p2[1]


def euclidean_distance(p1, p2):
    """
    calculate the eculidean distance between two points
    :param p1: pair of coordinate
    :param p2: pair of coordinate
    :return: eculidean distance
    """
    return np.sqrt(np.sum((np.array(p2) - np.array(p1))**2, axis = 1))


def great_circle_distance(p1, p2):
    lat1, lon1 = radians(p1[1]), radians(p1[0])
    lat2, lon2 = np.radians(p2[:, 1]), np.radians(p2[:, 0])

    sin_lat1, cos_lat1 = sin(lat1), cos(lat1)
    sin_lat2, cos_lat2 = np.sin(lat2), np.cos(lat2)

    d_lon = np.subtract(lon2, lon1)
    cos_d_lon, sin_d_lon = np.cos(d_lon), np.sin(d_lon)

    return np.arctan2(np.sqrt((cos_lat2 * sin_d_lon) ** 2 +
                 (cos_lat1 * sin_lat2 -
                  sin_lat1 * cos_lat2 * cos_d_lon) ** 2),
                 sin_lat1 * sin_lat2 + cos_lat1 * cos_lat2 * cos_d_lon)


def WCSS(ps, cps):
    """
    Within-Clusters Sum-of-Squares measure
    """
    return sum(np.sum((cps[i] - p) ** 2)  for (i, p) in ps)


def kmeans_cluster(data, converge_dist, m, k):
    cp = data.takeSample(False, k, 20181209)
    temp_dist = 1.0
    while temp_dist > converge_dist:
        closest = data.map(lambda p: (closest_point(p, cp, m), (p, 1)))
        point_stats = closest.reduceByKey(lambda p1, p2: (add_points(p1[0] , p2[0]), p1[1] + p2[1])).map(lambda o: (o[0], np.array(o[1][0]) /o[1][1])).collect()

        temp_dist = WCSS(point_stats, cp)
        for (i, p) in point_stats:
            cp[i] = p
    return cp

VBox()

In [3]:
accidents = spark.sparkContext.textFile("s3://cse427-kmeans/Accidents.txt")\
    .map(lambda line: line.split(','))\
    .map(lambda tokens: (tokens[1], tokens[2]))\
    .filter(lambda tokens: any(tokens[0]) and any(tokens[1]))\
    .map(lambda tokens: ((float(tokens[0])), float(tokens[1]))).cache()

VBox()

# euclidean with 3 kernals

In [14]:
start_time = timeit.default_timer()
centers_euclidean_3 = kmeans_cluster(accidents, 0.01, euclidean_distance, 3)
print(centers_euclidean_3)
accidents.map(lambda p: (closest_point(p, centers_euclidean_3, euclidean_distance), p))\
    .groupByKey()\
    .map(lambda o : (o[0],list(o[1])))\
    .saveAsTextFile('s3://cse427-kmeans/output/clusters_euclidean_3')
elapsed = timeit.default_timer() - start_time
print 'time cost: ', elapsed

VBox()

[array([-0.13638543, 51.697445  ]), array([-2.48161016, 51.78915769]), array([-2.32147019, 54.23874128])]
time cost:  108.196929932

# great circle with 3 kernals

In [6]:
start_time = timeit.default_timer()
centers_great_circle_3 = kmeans_cluster(accidents, 0.01, great_circle_distance, 3)
print(centers_great_circle_3)
accidents.map(lambda p: (closest_point(p, centers_great_circle_3, great_circle_distance), p))\
    .groupByKey()\
    .map(lambda o : (o[0],list(o[1])))\
    .saveAsTextFile('s3://cse427-kmeans/output/clusters_great_circle_3')
elapsed = timeit.default_timer() - start_time
print 'time cost: ', elapsed

VBox()

[array([-0.1229614 , 51.62027232]), array([-2.53474571, 51.74085758]), array([-2.21878297, 54.14401557])]
time cost:  141.767199993

# euclidean with 4 kernals

In [7]:
start_time = timeit.default_timer()
centers_euclidean_4 = kmeans_cluster(accidents, 0.01, euclidean_distance, 4)
print(centers_euclidean_4)
accidents.map(lambda p: (closest_point(p, centers_euclidean_4, euclidean_distance), p))\
    .groupByKey()\
    .map(lambda o : (o[0],list(o[1])))\
    .saveAsTextFile('s3://cse427-kmeans/output/clusters_euclidean_4')
elapsed = timeit.default_timer() - start_time
print 'time cost: ', elapsed

VBox()

[array([-1.83773237, 53.40048323]), array([-3.64748206, 56.0342849 ]), array([-3.13009712, 51.2819534 ]), array([-0.1407763 , 51.58322611])]
time cost:  265.239196062

# great circle with 4 kernals

In [8]:
start_time = timeit.default_timer()
centers_great_circle_4 = kmeans_cluster(accidents, 0.01, great_circle_distance, 4)
print(centers_great_circle_4)
accidents.map(lambda p: (closest_point(p, centers_great_circle_4, great_circle_distance), p))\
    .groupByKey()\
    .map(lambda o : (o[0],list(o[1])))\
    .saveAsTextFile('s3://cse427-kmeans/output/clusters_great_circle_4')
elapsed = timeit.default_timer() - start_time
print 'time cost: ', elapsed

VBox()

[array([-1.84545094, 53.232605  ]), array([-2.94337779, 55.62649661]), array([-3.08877061, 51.21737185]), array([-0.13707709, 51.55559929])]
time cost:  229.547693014

# euclidean with 5 kernals

In [9]:
start_time = timeit.default_timer()
centers_euclidean_5 = kmeans_cluster(accidents, 0.01, euclidean_distance, 5)
print(centers_euclidean_5)
accidents.map(lambda p: (closest_point(p, centers_euclidean_5, euclidean_distance), p))\
    .groupByKey()\
    .map(lambda o : (o[0],list(o[1])))\
    .saveAsTextFile('s3://cse427-kmeans/output/clusters_euclidean_5')
elapsed = timeit.default_timer() - start_time
print 'time cost: ', elapsed

VBox()

[array([-1.85416283, 53.40253243]), array([-3.6518545 , 56.03750859]), array([-3.19594034, 51.2786411 ]), array([-0.41698218, 51.465947  ]), array([ 0.584626  , 51.97964327])]
time cost:  248.011786938

# great circle with 5 kernals

In [10]:
start_time = timeit.default_timer()
centers_great_circle_5 = kmeans_cluster(accidents, 0.01, great_circle_distance, 5)
print(centers_great_circle_5)
accidents.map(lambda p: (closest_point(p, centers_great_circle_5, great_circle_distance), p))\
    .groupByKey()\
    .map(lambda o : (o[0],list(o[1])))\
    .saveAsTextFile('s3://cse427-kmeans/output/clusters_great_circle_5')
elapsed = timeit.default_timer() - start_time
print 'time cost: ', elapsed

VBox()

[array([-1.97635468, 53.34726255]), array([-3.06644638, 55.72194304]), array([-3.09394951, 51.23577487]), array([-0.19887222, 51.42031999]), array([-0.30460039, 52.52020118])]
time cost:  305.429926157

# euclidean with 7 kernals

In [11]:
start_time = timeit.default_timer()
centers_euclidean_7 = kmeans_cluster(accidents, 0.01, euclidean_distance, 7)
print(centers_euclidean_7)
accidents.map(lambda p: (closest_point(p, centers_euclidean_7, euclidean_distance), p))\
    .groupByKey()\
    .map(lambda o : (o[0],list(o[1])))\
    .saveAsTextFile('s3://cse427-kmeans/output/clusters_euclidean_7')
elapsed = timeit.default_timer() - start_time
print 'time cost: ', elapsed

VBox()

[array([-1.37273575, 52.93095813]), array([-3.69659338, 56.11906908]), array([-0.16274468, 51.51124376]), array([-2.24581146, 53.82580546]), array([-1.53326402, 51.23931357]), array([-3.5343953 , 51.25979581]), array([ 0.85708887, 52.00264821])]
time cost:  350.775312901

# great circle with 7 kernals

In [12]:
start_time = timeit.default_timer()
centers_great_circle_7 = kmeans_cluster(accidents, 0.01, great_circle_distance, 7)
print(centers_great_circle_7)
accidents.map(lambda p: (closest_point(p, centers_great_circle_7, great_circle_distance), p))\
    .groupByKey()\
    .map(lambda o : (o[0],list(o[1])))\
    .saveAsTextFile('s3://cse427-kmeans/output/clusters_great_circle_7')
elapsed = timeit.default_timer() - start_time
print 'time cost: ', elapsed

VBox()

[array([-1.70126452, 52.61458387]), array([-3.27776987, 55.86968434]), array([ 0.3615493 , 51.37762174]), array([-1.98305861, 53.70366341]), array([-0.49409788, 51.44862749]), array([-3.25214163, 51.14699026]), array([ 0.46408047, 52.48537297])]
time cost:  287.853466988