In [1]:
#we are working with heterogeneity human activity recognition dataset
#the folder containing the data is fairly large 1.18 GB

In [2]:
static = spark.read.json("E:/big data courses/Spark-The-Definitive-Guide-master/data/activity-data/")
#read this data as simple dataframes "patch processing applications"

In [3]:
dataschema = static.schema

In [4]:
dataschema

StructType(List(StructField(Arrival_Time,LongType,true),StructField(Creation_Time,LongType,true),StructField(Device,StringType,true),StructField(Index,LongType,true),StructField(Model,StringType,true),StructField(User,StringType,true),StructField(gt,StringType,true),StructField(x,DoubleType,true),StructField(y,DoubleType,true),StructField(z,DoubleType,true)))

In [5]:
static.take(1)
# the columns contain timestamp columns , model, user , device information , gt :specify activity

[Row(Arrival_Time=1424686735090, Creation_Time=1424686733090638193, Device='nexus4_1', Index=18, Model='nexus4', User='g', gt='stand', x=0.0003356934, y=-0.0005645752, z=-0.018814087)]

In [6]:
#streaming should take schema explicitly 
streaming = spark.readStream.schema(dataschema).option("maxFilePerTrigger",1)\
.json("E:/big data courses/Spark-The-Definitive-Guide-master/data/activity-data/")

In [7]:
activitycounts = streaming.groupBy("gt").count()

In [8]:
spark.conf.set("spark.sql.shuffle.partitions" , 5)
#the default shullfe partitions = 200 

In [9]:
activityquery  = activitycounts.writeStream.queryName("activity_counts")\
.format("memory").outputMode("complete").start()
#this will set activity_counts table in memory , we will query it later , 
#mode: complete to overwrite the results everytime

In [10]:
#activityquery.awaitTermination()

In [11]:
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x2a87c3d2588>]

In [12]:
from time import sleep
for x in range(5):
    spark.sql("select * from activity_counts").show()
    sleep(2)
#this loop to domenstarte complete mode , and how it will overwrite everything    

+---+-----+
| gt|count|
+---+-----+
+---+-----+

+---+-----+
| gt|count|
+---+-----+
+---+-----+

+---+-----+
| gt|count|
+---+-----+
+---+-----+

+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|
|     stand| 910783|
|stairsdown| 749059|
|      walk|1060402|
|  stairsup| 836598|
|      null| 835725|
|      bike| 863710|
+----------+-------+

+----------+-------+
|        gt|  count|
+----------+-------+
|       sit| 984714|
|     stand| 910783|
|stairsdown| 749059|
|      walk|1060402|
|  stairsup| 836598|
|      null| 835725|
|      bike| 863710|
+----------+-------+



In [13]:
#selection and filtering 
from pyspark.sql.functions import expr
simpletransfrom = streaming.withColumn("stairs",expr("gt like '%stairs%'"))\
                  .where("stairs")\
                  .where("gt is not null")\
                  .select("gt","model", "Arrival_Time","Creation_Time")\
                  .writeStream.queryName("simple_Transform")\
                  .format("memory").outputMode("append").start()

In [16]:
spark.sql("select * from simple_Transform").show(5)

+--------+------+-------------+-------------------+
|      gt| model| Arrival_Time|      Creation_Time|
+--------+------+-------------+-------------------+
|stairsup|nexus4|1424687983631|1424689829685305118|
|stairsup|nexus4|1424687983837|1424687981832546126|
|stairsup|nexus4|1424687984222|1424687982225307357|
|stairsup|nexus4|1424687984626|1424687982633321003|
|stairsup|nexus4|1424687985029|1424687983036153035|
+--------+------+-------------+-------------------+
only showing top 5 rows



In [17]:
deviceModelStats = streaming.cube("gt" , "model").avg()\
                   .drop("avg(Arrival_Time)").drop("avg(Creation_Time)").drop("avg(Index)")\
                   .writeStream.queryName("device_counts2").format("memory")\
                   .outputMode("complete").start()

In [24]:
#you should wait untill the above query finished from processing 
# or using device_counts2.awaitTermination() , but only for production
spark.sql("select * from device_counts2").show(5)

+-----+------+--------------------+--------------------+--------------------+
|   gt| model|              avg(x)|              avg(y)|              avg(z)|
+-----+------+--------------------+--------------------+--------------------+
|  sit|  null|-5.49433244039557...| 2.79144628170004E-4|-2.33994461689905...|
|stand|  null|-3.11082189691711...|3.218461665975361...|2.141300040636498E-4|
|  sit|nexus4|-5.49433244039557...| 2.79144628170004E-4|-2.33994461689905...|
|stand|nexus4|-3.11082189691711...|3.218461665975361...|2.141300040636498E-4|
| null|  null|-0.00847688860109...|-7.30455258739188...|0.003090601491419...|
+-----+------+--------------------+--------------------+--------------------+
only showing top 5 rows



## joins in streaming 

In [19]:
# here we will join static with streaming streams 
historicalagg = static.groupBy("gt","model").avg()
devicemodelstats = streaming.drop("Arrival_Time" ,"Creation_Time","Index")\
                   .cube("gt","model").avg()\
                   .join(historicalagg , ["gt","model"])\
                   .writeStream.queryName("device_Counts3").format("memory")\
                   .outputMode("complete").start()

In [25]:
spark.sql("select * from device_counts3").show(5)

+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
|        gt| model|              avg(x)|              avg(y)|              avg(z)|   avg(Arrival_Time)|  avg(Creation_Time)|        avg(Index)|              avg(x)|              avg(y)|              avg(z)|
+----------+------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
|      bike|nexus4| 0.02268875955086685|-0.00877912156368...|-0.08251001663412344|1.424751134339985...|1.424752127369589...| 326459.6867328154| 0.02268875955086685|-0.00877912156368...|-0.08251001663412344|
|      null|nexus4|-0.00847688860109...|-7.30455258739188...|0.003090601491419...|1.424749002876339...|1.424749919482127...| 219276.9663669269|-0.00847688860109...|-7.30455

## event time and statful processing

In [36]:
#we will continue on the beow configuration
#spark.conf.set("spark.sql.shuffle.partitions" , 5)
#static = spark.read.json("E:/big data courses/Spark-The-Definitive-Guide-master/data/activity-data/")
spark.conf.set("spark.sql.streaming.schemaInference","true")
streaming = spark.readStream.option("maxFilePerTrigger",10)\
.json("E:/big data courses/Spark-The-Definitive-Guide-master/data/activity-data/")

In [37]:
#inferring schema is disabled for streaming datasources , we can enable it with 
streaming.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



In [38]:
# in streaming processing we work with event time not processing time 
#convert time column to proper spark time stamp.
witheventtime  = streaming.selectExpr("*", "cast (cast(Creation_Time as double)/1000000000 as timestamp)as event_time")

In [39]:
from pyspark.sql.functions import window , col
witheventtime.groupBy(window (col("event_time") , "10 minutes")).count()\
.writeStream\
.queryName("pyevent_per_window")\
.format("memory")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x2a87f4f6248>

In [41]:
spark.sql("select * from pyevent_per_window").printSchema()
# window in struct type (complex)

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [44]:
spark.sql("select * from pyevent_per_window").show(5)

+--------------------+------+
|              window| count|
+--------------------+------+
|[2015-02-23 12:40...| 88681|
|[2015-02-24 13:50...|150773|
|[2015-02-24 15:00...|133323|
|[2015-02-23 15:20...|106075|
|[2015-02-22 02:40...|    35|
+--------------------+------+
only showing top 5 rows



In [46]:
#perform aggregations in multiple columns 
from pyspark.sql.functions import window , col
witheventtime.groupBy(window (col("event_time") , "10 minutes") , "user").count()\
.writeStream\
.queryName("pyevent_per_window_user")\
.format("memory")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x2a87f4f6448>

## sliding windows

In [47]:
#we will running windows every 10 minutes , starting every 5 minutes
from pyspark.sql.functions import window , col
witheventtime.groupBy(window (col("event_time") , "10 minutes" , "5 minutes")).count()\
.writeStream\
.queryName("pyevent_per_window_user_5")\
.format("memory")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x2a87f477708>

In [51]:
#spark.sql("select * from pyevent_per_window_user_5").show(5)

## handling late data with watermark

In [54]:
#we will make latency 30 minutes , after this will ignore states within corresponding time frame
from pyspark.sql.functions import window , col
witheventtime.withWatermark("event_time" , "30 minutes")\
.groupBy(window (col("event_time") , "10 minutes" , "5 minutes")).count()\
.writeStream\
.queryName("pyevent_per_window_user_w")\
.format("memory")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x2a87fd0da08>

## drop duplicate if exist it is most important feature when working with record-at-atime

In [56]:
# make the record unique with user , and event_time 
from pyspark.sql.functions import expr
witheventtime.withWatermark("event_time" , "30 minutes")\
.dropDuplicates(["user","event_time"])\
.groupBy(window (col("event_time") , "10 minutes" , "5 minutes")).count()\
.writeStream\
.queryName("pyevent_per_window_user_d")\
.format("memory")\
.outputMode("complete")\
.start()

<pyspark.sql.streaming.StreamingQuery at 0x2a87fa167c8>