# Initialization

In [6]:
import pyspark
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Dibimbing')
        .setMaster('local')
    ))
sparkcontext.setLogLevel("WARN")

spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

In [7]:
spark

# Data Load

In [4]:
import json
import uuid
import os
import json
from dotenv import load_dotenv
from pathlib import Path
from kafka import KafkaProducer
from faker import Faker
from time import sleep
from datetime import datetime, timedelta

dotenv_path = Path("/resources/.env")
load_dotenv(dotenv_path=dotenv_path)

kafka_host = os.getenv("KAFKA_HOST")
kafka_topic = os.getenv("KAFKA_TOPIC_NAME")

print(kafka_host)
print(kafka_topic)

streaming-kafka
assignment-streaming-spark


In [8]:
!ls /resources/data/activity-data/ | head -10

_committed_730451297822678341
part-00000-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00001-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00002-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00003-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00004-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00005-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00006-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00007-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json
part-00008-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json


In [9]:
static = spark.read.json('/resources/data/activity-data/')
dataSchema = static.schema
dataSchema

StructType([StructField('Arrival_Time', LongType(), True), StructField('Creation_Time', LongType(), True), StructField('Device', StringType(), True), StructField('Index', LongType(), True), StructField('Model', StringType(), True), StructField('User', StringType(), True), StructField('gt', StringType(), True), StructField('x', DoubleType(), True), StructField('y', DoubleType(), True), StructField('z', DoubleType(), True)])

In [10]:
static.show(5)

+-------------+-------------------+--------+-----+------+----+-----+------------+-------------+------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|           x|            y|           z|
+-------------+-------------------+--------+-----+------+----+-----+------------+-------------+------------+
|1424686735011|1424686733015076670|nexus4_1|    3|nexus4|   g|stand|0.0014038086|   0.03147888|  0.01109314|
|1424686735214|1424688581265321168|nexus4_2|   50|nexus4|   g|stand|-0.008926392|  -0.04034424|0.0034332275|
|1424686735420|1424688581471925172|nexus4_2|   91|nexus4|   g|stand|-3.814697E-4|-0.0018920898|-0.015792847|
|1424686735618|1424686733619416269|nexus4_1|  123|nexus4|   g|stand| 3.356934E-4| -0.030471802|-0.025222778|
|1424686735821|1424688581874604615|nexus4_2|  171|nexus4|   g|stand|0.0038909912| -0.013641357|  0.01411438|
+-------------+-------------------+--------+-----+------+----+-----+------------+-------------+------------+
only showing top 5 

Metadata for the dataset

| Column | Description |
| --- | ----------- |
| Index         |  The row number.
| Arrival_Time  |  The time the measurement arrived to the sensing application
| Creation_Time |  The timestamp the OS attaches to the sample
| X,Y,Z | The values provided by the sensor for the three axes, X,y,z
| User          |  The user this sample originates from, the users are named a to i.
| Model         |  The phone/watch model this sample originates from
| Device        |  The specific device this sample is from. They are prefixed with the model name and then the number, e.g., nexus4_1 or nexus4_2.
| Gt            |  The activity the user was performing: bike sit, stand, walk, stairsup, stairsdown and null

# Structured Streaming

### Mock File Streaming (Throttle)

In [11]:
spark.conf.set('spark.sql.shuffle.partitions', 5)
streaming = (
    spark
    .readStream
    .schema(dataSchema)
    .option('maxFilesPerTrigger', 1)
    .json('/resources/data/activity-data/')
)
streaming.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



# Windowing

### Window on Event Time

In [12]:
withEventTime = streaming.selectExpr(
    "*",
    "cast(cast(Creation_time as double)/1000000000 as timestamp) as event_time"
)


### Tumbling Window

In [13]:
from pyspark.sql.functions import window, col

(
    withEventTime
    .groupBy(window(col('event_time'), '10 minutes'))
    .count()
    .writeStream
    .queryName('pyevents_per_window')
    .format('memory')
    .outputMode('complete')
    .start()
)

<pyspark.sql.streaming.StreamingQuery at 0x7f0c0d989e10>

In [17]:
spark.sql("SELECT * FROM pyevents_per_window").show(5)

+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2015-02-24 11:50...|20681|
|{2015-02-24 13:00...|18336|
|{2015-02-23 12:30...|13836|
|{2015-02-23 10:20...|13634|
|{2015-02-24 12:30...|17229|
+--------------------+-----+
only showing top 5 rows



In [15]:
spark.sql("SELECT * FROM pyevents_per_window").printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



### Sliding Windows

In [11]:
from pyspark.sql.functions import window, col

(
    withEventTime
    .groupBy(window(col('event_time'), '10 minutes', '5 minutes'))
    .count()
    .writeStream
    .queryName('pyevents_per_slidingWindow')
    .format('memory')
    .outputMode('complete')
    .start()
)

<pyspark.sql.streaming.StreamingQuery at 0xffff8007c040>

In [12]:
spark.sql('SELECT * FROM pyevents_per_slidingWindow').show(5)

+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2015-02-23 14:15...| 1398|
|{2015-02-24 11:50...| 1843|
|{2015-02-24 13:00...| 1644|
|{2015-02-23 12:30...| 1226|
|{2015-02-23 10:20...| 1165|
+--------------------+-----+
only showing top 5 rows



In [13]:
spark.sql("SELECT * FROM pyevents_per_slidingWindow").printSchema()

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



# Watermarks

In [14]:
(
    withEventTime
    .withWatermark('event_time', '30 minutes')
    .groupBy(window(col('event_time'), '10 minutes', '5 minutes'))
    .count()
    .writeStream
    .queryName('pyevents_per_window2')
    .format('memory')
    .outputMode('complete')
    .start()
)

<pyspark.sql.streaming.StreamingQuery at 0xffff597a5c90>

In [15]:
spark.sql('SELECT * FROM pyevents_per_window2').show(5)

+--------------------+-----+
|              window|count|
+--------------------+-----+
|{2015-02-23 14:15...| 1398|
|{2015-02-24 11:50...| 1843|
|{2015-02-24 13:00...| 1644|
|{2015-02-23 12:30...| 1226|
|{2015-02-23 10:20...| 1165|
+--------------------+-----+
only showing top 5 rows



# Drop Duplicates

In [16]:
(
    withEventTime
    .withWatermark('event_time', '5 seconds')
    .dropDuplicates(['User', 'event_time'])
    .groupBy('User')
    .count()
    .writeStream
    .queryName('pydeduplicated')
    .format('memory')
    .outputMode('complete')
    .start()
)

<pyspark.sql.streaming.StreamingQuery at 0xffff597a6a70>

In [17]:
spark.sql('SELECT * FROM pydeduplicated').show(5)

+----+-----+
|User|count|
+----+-----+
|   a| 8085|
|   b| 9123|
|   c| 7715|
|   g| 9167|
|   h| 7733|
+----+-----+
only showing top 5 rows



# Checkpoints

In [19]:
static = spark.read.json('/resources/data/activity-data/')

streaming = (
    spark
    .readStream
    .schema(static.schema)
    .option('maxFilesPerTrigger', 10)
    .json('/resources/data/activity-data/')
    .groupBy('gt')
    .count()
)


query = (
    streaming
    .writeStream
    .outputMode('complete')
    .option('checkpointlocation', '/resources/logs')
    .queryName('test_python_stream')
    .format('memory')
    .start()
)

In [18]:
spark.stop()