In [1]:
from pyspark.sql.functions import dayofmonth, month, hour
from pyspark.sql.session import SparkSession
from pyspark.sql.types import DoubleType, StructType, StructField, TimestampType, StringType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import pyspark.sql.functions as func

In [2]:
# Initializing Spark Session
spark = SparkSession.builder.appName('Spark_Streaming_KMeans').getOrCreate()

In [3]:
# Setting up the schema
schema = StructType([StructField("dt", TimestampType(), True),
                     StructField("lat", DoubleType(), True),
                     StructField("lon", DoubleType(), True),
                     StructField("base", StringType(), True)]
                    )

In [5]:
# Reading the Data into spark
Dataset = spark.read.option("inferSchema", "false").schema(schema).csv("G:\\Data\\Input_Streaming_Spark.csv")
Dataset.show(5)
print("Raw Dataset")

+-------------------+-------+--------+------+
|                 dt|    lat|     lon|  base|
+-------------------+-------+--------+------+
|2014-08-01 00:00:00| 40.729|-73.9422|B02598|
|2014-08-01 00:00:00|40.7476|-73.9871|B02598|
|2014-08-01 00:00:00|40.7424|-74.0044|B02598|
|2014-08-01 00:00:00| 40.751|-73.9869|B02598|
|2014-08-01 00:00:00|40.7406|-73.9902|B02598|
+-------------------+-------+--------+------+
only showing top 5 rows

Raw Dataset


In [6]:
# Defining feature array
assembler = VectorAssembler(inputCols=("lat", "lon"), outputCol='features')
Dataframe = assembler.transform(Dataset)

In [7]:
# Dataset with feature
Dataframe.show()
print("Dataset with feature")

+-------------------+-------+--------+------+------------------+
|                 dt|    lat|     lon|  base|          features|
+-------------------+-------+--------+------+------------------+
|2014-08-01 00:00:00| 40.729|-73.9422|B02598| [40.729,-73.9422]|
|2014-08-01 00:00:00|40.7476|-73.9871|B02598|[40.7476,-73.9871]|
|2014-08-01 00:00:00|40.7424|-74.0044|B02598|[40.7424,-74.0044]|
|2014-08-01 00:00:00| 40.751|-73.9869|B02598| [40.751,-73.9869]|
|2014-08-01 00:00:00|40.7406|-73.9902|B02598|[40.7406,-73.9902]|
|2014-08-01 00:00:00|40.6994|-73.9591|B02617|[40.6994,-73.9591]|
|2014-08-01 00:00:00|40.6917|-73.9398|B02617|[40.6917,-73.9398]|
|2014-08-01 00:00:00|40.7063|-73.9223|B02617|[40.7063,-73.9223]|
|2014-08-01 00:00:00|40.6759|-74.0168|B02617|[40.6759,-74.0168]|
|2014-08-01 00:00:00|40.7617|-73.9847|B02617|[40.7617,-73.9847]|
|2014-08-01 00:00:00|40.6969|-73.9064|B02617|[40.6969,-73.9064]|
|2014-08-01 00:00:00|40.7623|-73.9751|B02617|[40.7623,-73.9751]|
|2014-08-01 00:00:00|40.6

In [8]:
# setting K means k = 20 and Max Iteration to 5
kmeans = KMeans().setK(20).setMaxIter(5)


In [9]:
# fitting out features into K means
model = kmeans.fit(Dataframe.select('features'))

In [10]:
# Save your model
# model.save("F:\\kMeans")


In [12]:
# Adding the prediction from K means to the Dataset
clusters = model.transform(Dataframe)
clusters.show()
print("K means predictions")

+-------------------+-------+--------+------+------------------+----------+
|                 dt|    lat|     lon|  base|          features|prediction|
+-------------------+-------+--------+------+------------------+----------+
|2014-08-01 00:00:00| 40.729|-73.9422|B02598| [40.729,-73.9422]|         2|
|2014-08-01 00:00:00|40.7476|-73.9871|B02598|[40.7476,-73.9871]|        17|
|2014-08-01 00:00:00|40.7424|-74.0044|B02598|[40.7424,-74.0044]|        16|
|2014-08-01 00:00:00| 40.751|-73.9869|B02598| [40.751,-73.9869]|        17|
|2014-08-01 00:00:00|40.7406|-73.9902|B02598|[40.7406,-73.9902]|        17|
|2014-08-01 00:00:00|40.6994|-73.9591|B02617|[40.6994,-73.9591]|         2|
|2014-08-01 00:00:00|40.6917|-73.9398|B02617|[40.6917,-73.9398]|         2|
|2014-08-01 00:00:00|40.7063|-73.9223|B02617|[40.7063,-73.9223]|         2|
|2014-08-01 00:00:00|40.6759|-74.0168|B02617|[40.6759,-74.0168]|        12|
|2014-08-01 00:00:00|40.7617|-73.9847|B02617|[40.7617,-73.9847]|        13|
|2014-08-01 

In [None]:
clusters.select(month("dt").alias("month"), dayofmonth("dt").alias("day"), hour("dt").alias(
    "hour"), "prediction").groupBy("month", "day", "hour", "prediction").agg(
    func.count("prediction").alias("count")).orderBy("day", "hour", "prediction").show()
print("Count Total")

In [None]:
clusters.select(hour("dt").alias("hour"), "prediction").groupBy("hour", "prediction").agg(
    func.count("prediction").alias("count")).orderBy(
    func.desc("count")).show()
print("Count Total ordered by count")

In [None]:
clusters.groupBy("prediction").count().show()
print("Counts in each cluster")