# Initialization

In [1]:
import pyspark
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Dibimbing')
        .setMaster('local')
    ))
sparkcontext.setLogLevel("WARN")

spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

In [2]:
spark

# Data Load

In [None]:
!ls /resources/data/activity-data/ | head -10

In [None]:
static = spark.read.json('/resources/data/activity-data/')
dataSchema = static.schema
dataSchema

In [None]:
static.show(5)

Metadata for the dataset

| Column | Description |
| --- | ----------- |
| Index         |  The row number.
| Arrival_Time  |  The time the measurement arrived to the sensing application
| Creation_Time |  The timestamp the OS attaches to the sample
| X,Y,Z | The values provided by the sensor for the three axes, X,y,z
| User          |  The user this sample originates from, the users are named a to i.
| Model         |  The phone/watch model this sample originates from
| Device        |  The specific device this sample is from. They are prefixed with the model name and then the number, e.g., nexus4_1 or nexus4_2.
| Gt            |  The activity the user was performing: bike sit, stand, walk, stairsup, stairsdown and null

# Structured Streaming

### Mock File Streaming (Throttle)

In [None]:
spark.conf.set('spark.sql.shuffle.partitions', 5)
streaming = (
    spark
    .readStream
    .schema(dataSchema)
    .option('maxFilesPerTrigger', 1)
    .json('/resources/data/activity-data/')
)
streaming.printSchema()

# Windowing

### Window on Event Time

In [7]:
withEventTime = streaming.selectExpr(
    "*",
    "cast(cast(Creation_time as double)/1000000000 as timestamp) as event_time"
)


### Tumbling Window

In [None]:
from pyspark.sql.functions import window, col

(
    withEventTime
    .groupBy(window(col('event_time'), '10 minutes'))
    .count()
    .writeStream
    .queryName('pyevents_per_window')
    .format('memory')
    .outputMode('complete')
    .start()
)

In [None]:
spark.sql("SELECT * FROM pyevents_per_window").show(5)

In [None]:
spark.sql("SELECT * FROM pyevents_per_window").printSchema()

### Sliding Windows

In [None]:
from pyspark.sql.functions import window, col

(
    withEventTime
    .groupBy(window(col('event_time'), '10 minutes', '5 minutes'))
    .count()
    .writeStream
    .queryName('pyevents_per_slidingWindow')
    .format('memory')
    .outputMode('complete')
    .start()
)

In [None]:
spark.sql('SELECT * FROM pyevents_per_slidingWindow').show(5)

In [None]:
spark.sql("SELECT * FROM pyevents_per_slidingWindow").printSchema()

# Watermarks

In [None]:
(
    withEventTime
    .withWatermark('event_time', '30 minutes')
    .groupBy(window(col('event_time'), '10 minutes', '5 minutes'))
    .count()
    .writeStream
    .queryName('pyevents_per_window2')
    .format('memory')
    .outputMode('complete')
    .start()
)

In [None]:
spark.sql('SELECT * FROM pyevents_per_window2').show(5)

# Drop Duplicates

In [None]:
(
    withEventTime
    .withWatermark('event_time', '5 seconds')
    .dropDuplicates(['User', 'event_time'])
    .groupBy('User')
    .count()
    .writeStream
    .queryName('pydeduplicated')
    .format('memory')
    .outputMode('complete')
    .start()
)

In [None]:
spark.sql('SELECT * FROM pydeduplicated').show(5)

# Checkpoints

In [19]:
static = spark.read.json('/resources/data/activity-data/')

streaming = (
    spark
    .readStream
    .schema(static.schema)
    .option('maxFilesPerTrigger', 10)
    .json('/resources/data/activity-data/')
    .groupBy('gt')
    .count()
)


query = (
    streaming
    .writeStream
    .outputMode('complete')
    .option('checkpointlocation', '/resources/logs')
    .queryName('test_python_stream')
    .format('memory')
    .start()
)

In [20]:
spark.stop()