In [97]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, DoubleType, StringType, IntegerType
import math
from operator import add
import numpy as np

In [125]:
spark = SparkSession.builder.appName("KMeansWithMapReduce").getOrCreate()

schema = StructType() \
    .add("rowID", IntegerType(), True) \
    .add("hpwren_timestamp", StringType(), True) \
    .add("air_pressure", DoubleType(), True) \
    .add("air_temp", DoubleType(), True) \
    .add("avg_wind_direction", DoubleType(), True) \
    .add("avg_wind_speed", DoubleType(), True) \
    .add("max_wind_direction", DoubleType(), True) \
    .add("max_wind_speed", DoubleType(), True) \
    .add("min_wind_direction", DoubleType(), True) \
    .add("min_wind_speed", DoubleType(), True) \
    .add("rain_accumulation", DoubleType(), True) \
    .add("rain_duration", DoubleType(), True) \
    .add("relative_humidity", DoubleType(), True)

df = spark.read.format("csv").option("header", True).schema(schema).load("./data/minute_weather.csv")
df.na.drop()
df.count()



1587257

In [99]:
df.show()

+-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+
|rowID|   hpwren_timestamp|air_pressure|air_temp|avg_wind_direction|avg_wind_speed|max_wind_direction|max_wind_speed|min_wind_direction|min_wind_speed|rain_accumulation|rain_duration|relative_humidity|
+-----+-------------------+------------+--------+------------------+--------------+------------------+--------------+------------------+--------------+-----------------+-------------+-----------------+
|    0|2011-09-10 00:00:49|       912.3|   64.76|              97.0|           1.2|             106.0|           1.6|              85.0|           1.0|             null|         null|             60.5|
|    1|2011-09-10 00:01:49|       912.3|   63.86|             161.0|           0.8|             215.0|           1.5|              43.0|           0.2|              0.0|          0.0|         

In [100]:
# k clusters
k = 3
# threshold between new and old centroid
threshold = 0.4

# randomly select centroids
centroids = df.rdd.takeSample(False, k, seed=0)
centroids = [[c.air_pressure, c.air_temp, c.relative_humidity] for c in centroids]
# centroid structure: (index, [centroid features]) e.g., (1, [2.0, 3.0, 5.0, 0.2])
centroids = [(idx, centroid) for idx, centroid in enumerate(centroids)]

                                                                                

In [101]:
# show centroid structure
for _ in range(3):
    print(centroids[_])

(0, [920.5, 39.02, 89.2])
(1, [913.8, 68.18, 24.9])
(2, [915.9, 69.98, 38.6])


In [102]:
# set points
points = df.rdd
# point structure: ([point features], count) e.g., ([2.0, 5.0, 2.5, 0.6], 1)
points_rdd = points.map(lambda p: ([p.air_pressure, p.air_temp, p.relative_humidity], 1))
points_rdd.cache()

PythonRDD[148] at RDD at PythonRDD.scala:53

In [103]:
points_rdd.take(3)

                                                                                

[([912.3, 64.76, 60.5], 1),
 ([912.3, 63.86, 39.9], 1),
 ([912.3, 64.22, 43.0], 1)]

In [104]:
""" calculate distance """
def calculateDistance(point, centroid):
    distance = 0
    for index in range(len(point)):
        distance += (point[index]-centroid[index])**2
    return math.sqrt(distance)

In [105]:
""" belongs to Centroid """
def belongCluster(point, centroids):
    centroidIndex = 0
    closest = float("+inf")
    for centroid in centroids:
        dist = calculateDistance(point, centroid[1])
        if dist < closest:
            closest = dist
            centroidIndex = centroid[0]
    return centroidIndex

In [106]:
""" Reduce all points in each centroid """
def accumulatedCluster(p1, p2):
    cluster_sum = list(map(add, p1[0], p2[0]))
    cluster_count = p1[1]+p2[1]
    p = (cluster_sum, cluster_count)
    return p

In [107]:
# Map Phase
pointMapCentroid_rdd = points_rdd.keyBy(lambda point: belongCluster(point[0], centroids))

In [108]:
pointMapCentroid_rdd.take(5)

[(2, ([912.3, 64.76, 60.5], 1)),
 (2, ([912.3, 63.86, 39.9], 1)),
 (2, ([912.3, 64.22, 43.0], 1)),
 (2, ([912.3, 64.4, 49.5], 1)),
 (2, ([912.3, 64.4, 58.8], 1))]

In [109]:
# Reduce Phase
pointReducedCentroid_rdd = pointMapCentroid_rdd.reduceByKey(lambda p1, p2: accumulatedCluster(p1, p2))

In [110]:
pointReducedCentroid_rdd.take(3)

                                                                                

[(0, ([387607269.6000054, 20669036.480000146, 35156074.10000006], 423277)),
 (1, ([511055543.3000114, 37387668.000000685, 10949992.000000026], 556539)),
 (2, ([556582223.49993, 40117427.29998607, 29460650.499999877], 607441))]

In [111]:
pointReducedCentroid_rdd = pointReducedCentroid_rdd.map(lambda p: (p[0], np.divide(p[1][0], p[1][1]).tolist()))

In [112]:
pointReducedCentroid_rdd.take(3)

[(0, [915.7295803929942, 48.830993604661124, 83.05689678390289]),
 (1, [918.2744485112659, 67.17888234247857, 19.675156637719954]),
 (2, [916.2737179412158, 66.04333145109743, 48.49960819240037])]

In [113]:
reduced_points = pointReducedCentroid_rdd.collect()

In [114]:
for _ in reduced_points:
    print(_)

(0, [915.7295803929942, 48.830993604661124, 83.05689678390289])
(1, [918.2744485112659, 67.17888234247857, 19.675156637719954])
(2, [916.2737179412158, 66.04333145109743, 48.49960819240037])


In [115]:
# create new centroids
new_centroids = sorted(reduced_points)
centroids.sort()

In [116]:
# create new centroids
new_centroids

[(0, [915.7295803929942, 48.830993604661124, 83.05689678390289]),
 (1, [918.2744485112659, 67.17888234247857, 19.675156637719954]),
 (2, [916.2737179412158, 66.04333145109743, 48.49960819240037])]

In [117]:
# create new centroids
centroids

[(0, [920.5, 39.02, 89.2]),
 (1, [913.8, 68.18, 24.9]),
 (2, [915.9, 69.98, 38.6])]

In [118]:
# check convergence or not
convergence_percentage = 0
for index, centroid in enumerate(centroids):
    dist = calculateDistance(centroid[1], new_centroids[index][1])
    print(dist)
    
    if dist < threshold:
        convergence_percentage += 1
        
centroids = new_centroids
percentage = len(centroids)*80/100

if convergence_percentage > percentage:
    print("Centroids converged")
else:
    print("not converged, still run")

12.51999264621308
6.951396565045956
10.660171983910288
not converged, still run


In [119]:
schema_result = StructType() \
    .add("cluster_index", IntegerType(), True) \
    .add("air_pressure", DoubleType(), True) \
    .add("air_temp", DoubleType(), True) \
    .add("relative_humidity", DoubleType(), True)

df_result1 = spark.createDataFrame(pointMapCentroid_rdd.map(lambda p: (p[0], p[1][0][0], p[1][0][1],
                                                                    p[1][0][2])), schema=schema_result)
df_result1.show()

+-------------+------------+--------+-----------------+
|cluster_index|air_pressure|air_temp|relative_humidity|
+-------------+------------+--------+-----------------+
|            2|       912.3|   64.76|             60.5|
|            2|       912.3|   63.86|             39.9|
|            2|       912.3|   64.22|             43.0|
|            2|       912.3|    64.4|             49.5|
|            2|       912.3|    64.4|             58.8|
|            2|       912.3|    63.5|             62.6|
|            2|       912.3|   62.78|             65.6|
|            2|       912.3|   62.42|             65.2|
|            2|       912.3|   62.24|             65.8|
|            2|       912.3|   62.24|             58.6|
|            2|       912.3|   62.24|             38.5|
|            2|       912.2|   63.14|             42.6|
|            2|       912.2|   64.04|             45.3|
|            2|       912.2|    64.4|             36.1|
|            1|       912.2|   64.94|           

In [120]:
df_result1.write.option("header", True).csv("./result_weather_1")

                                                                                

In [121]:
df_result2 = spark.createDataFrame(spark.sparkContext.parallelize(centroids).map(lambda p: (p[0], p[1][0], p[1][1],
                                                    p[1][2])), schema=schema_result)
df_result2.show()

+-------------+-----------------+------------------+------------------+
|cluster_index|     air_pressure|          air_temp| relative_humidity|
+-------------+-----------------+------------------+------------------+
|            0|915.7295803929942|48.830993604661124| 83.05689678390289|
|            1|918.2744485112659| 67.17888234247857|19.675156637719954|
|            2|916.2737179412158| 66.04333145109743| 48.49960819240037|
+-------------+-----------------+------------------+------------------+



In [122]:
df_result2.coalesce(1).write.option("header", True).csv("./result_weather_2")