In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [18]:
spark = SparkSession\
            .builder\
            .master("local")\
            .appName("Structured_Streamming")\
            .config("spark.sql.streaming.schemaInference", True)\
            .getOrCreate()

In [19]:
spark

### Static version

In [20]:
static = spark.read.json("../data/activity-data/")

In [21]:
static.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



In [22]:
static.show()

+-------------+-------------------+--------+-----+------+----+-----+-------------+------------+------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|            x|           y|           z|
+-------------+-------------------+--------+-----+------+----+-----+-------------+------------+------------+
|1424686735090|1424686733090638193|nexus4_1|   18|nexus4|   g|stand|  3.356934E-4|-5.645752E-4|-0.018814087|
|1424686735292|1424688581345918092|nexus4_2|   66|nexus4|   g|stand| -0.005722046| 0.029083252| 0.005569458|
|1424686735500|1424686733498505625|nexus4_1|   99|nexus4|   g|stand|    0.0078125|-0.017654419| 0.010025024|
|1424686735691|1424688581745026978|nexus4_2|  145|nexus4|   g|stand| -3.814697E-4|   0.0184021|-0.013656616|
|1424686735890|1424688581945252808|nexus4_2|  185|nexus4|   g|stand| -3.814697E-4|-0.031799316| -0.00831604|
|1424686736094|1424686734097840342|nexus4_1|  218|nexus4|   g|stand| -7.324219E-4|-0.013381958|  0.01109314|
|1424686736294|1424

### Streamming version

In [25]:
#Spark streamming need to enable the Schema
#Do this by configure or read from valid file to get the schema
streaming = spark.readStream\
                .option("maxFilesPerTrigger", 1)\
                .json("../data/activity-data")

In [32]:
#Set up transformation
activityCounts = streaming.groupBy("gt").count()

In [33]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

In [34]:
#action to start query use memory as sink
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
                .format("memory")\
                .outputMode("complete")\
                .start()

In [43]:
# activityQuery.awaitTermination()

In [42]:
from time import sleep
#sql from the stream we write in memory
for x in range(5):
    spark.sql("SELECT * FROM activity_counts").show()
    sleep(1)

+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|
|     stand| 910783|
|stairsdown| 749059|
|      walk|1060402|
|  stairsup| 836598|
|      null| 835725|
|      bike| 863710|
+----------+-------+

+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|
|     stand| 910783|
|stairsdown| 749059|
|      walk|1060402|
|  stairsup| 836598|
|      null| 835725|
|      bike| 863710|
+----------+-------+

+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|
|     stand| 910783|
|stairsdown| 749059|
|      walk|1060402|
|  stairsup| 836598|
|      null| 835725|
|      bike| 863710|
+----------+-------+

+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|
|     stand| 910783|
|stairsdown| 749059|
|      walk|1060402|
|  stairsup| 836598|
|      null| 835725|
|      bike| 863710|
+----------+-------+

+----------+-------+
|        gt|  count|
+----------+-------+
|       s

### Transformation on stream

In [44]:
from pyspark.sql.functions import expr
#Selection and filter
simpleTransform = streaming.withColumn("stairs", expr("gt like '%stairs%'"))\
                .where("stairs")\
                .where("gt is not null")\
                .select("gt", "model", "arrival_time", "creation_time")\
                .writeStream\
                .queryName("simple_transform")\
                .format("memory")\
                .outputMode("append")\
                .start()

In [47]:
# spark.sql("SELECT * FROM simple_transform").show()

In [49]:
# Aggregate
deviceModelStats = streaming.cube("gt", "model").avg()\
                    .drop("avg(Arrival_time)")\
                    .drop("avg(Creation_Time)")\
                    .drop("avg(Index)")\
                    .writeStream.queryName("device_counts").format("memory")\
                    .outputMode("complete")\
                    .start()

In [54]:
spark.sql("SELECT * FROM device_counts").show()

+---+-----+------+------+------+
| gt|model|avg(x)|avg(y)|avg(z)|
+---+-----+------+------+------+
+---+-----+------+------+------+



In [55]:
#Join
historicalAgg = static.groupBy("gt", "model").avg()
deviceModelStats = streaming.drop("Arrival_Time", "Creation_Time", "Index")\
                            .cube("gt", "model").avg()\
                            .join(historicalAgg, ["gt", "model"])\
                            .writeStream.queryName("device_counts").format("memory")\
                            .outputMode("complete")\
                            .start()

In [56]:
spark.sql("SELECT * FROM device_counts").show()

+---+-----+------+------+------+-----------------+------------------+----------+------+------+------+
| gt|model|avg(x)|avg(y)|avg(z)|avg(Arrival_Time)|avg(Creation_Time)|avg(Index)|avg(x)|avg(y)|avg(z)|
+---+-----+------+------+------+-----------------+------------------+----------+------+------+------+
+---+-----+------+------+------+-----------------+------------------+----------+------+------+------+

