# Initialization

In [1]:
import pyspark
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Dibimbing')
        .setMaster('local')
    ))
sparkcontext.setLogLevel("WARN")

spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

In [None]:
spark

# Data Load

In [None]:
!ls /resources/data/activity-data/ | head -10

In [None]:
static = spark.read.json('/resources/data/activity-data/')
dataSchema = static.schema
print(dataSchema)

In [None]:
static.show(5)

In [None]:
static.tail(5)

Metadata for the dataset

| Column | Description |
| --- | ----------- |
| Index         |  The row number.
| Arrival_Time  |  The time the measurement arrived to the sensing application
| Creation_Time |  The timestamp the OS attaches to the sample
| X,Y,Z | The values provided by the sensor for the three axes, X,y,z
| User          |  The user this sample originates from, the users are named a to i.
| Model         |  The phone/watch model this sample originates from
| Device        |  The specific device this sample is from. They are prefixed with the model name and then the number, e.g., nexus4_1 or nexus4_2.
| Gt            |  The activity the user was performing: bike sit, stand, walk, stairsup, stairsdown and null

# Structured Streaming

### Mock File Streaming (Throttle)

In [9]:
streaming = (
    spark
    .readStream
    .schema(dataSchema)
    .option('maxFilesPerTrigger', 1)
    .json('/resources/data/activity-data/')
)

### Simple Aggregations

In [10]:
# set partitions
spark.conf.set('spark.sql.shuffle.partitions', 5)

In [23]:
activityCounts = streaming.select('index').distinct()
activityQuery = (
    activityCounts.writeStream
    .queryName('activity_counts_3')
    .format('memory')
    .outputMode('append')
    .start()
)

# activityQuery.awaitTermination()

In [15]:
# activityQuery.awaitTermination()
activityQuery.stop()

In [None]:
from time import sleep
for x in range(5):
    spark.sql("SELECT COUNT(*) FROM activity_counts_3").show()
    sleep(1)

### Complex Aggregation

In [22]:
from pyspark.sql.functions import expr

simpleTransform = (
    streaming
    .withColumn('stairs', expr("gt like '%stairs%'"))
    .where('stairs')
    .where('gt is not null')
    .select('gt', 'model', 'arrival_time', 'creation_time')
    .writeStream
    .queryName('simple_transform')
    .format('memory')
    .outputMode('append')
    .start()
)

In [None]:
for x in range(2):
    spark.sql("SELECT * FROM simple_transform").show()
    sleep(1)

In [27]:
deviceModelStats = (
    streaming
    .groupBy('gt')
    .count()
    .writeStream
    .queryName('device_counts')
    .format('memory')
    .outputMode('complete')
    .start()
)

In [26]:
deviceModelStats.stop()

In [None]:
for x in range(10):
    spark.sql("SELECT * FROM device_counts").show()
    sleep(1)

In [14]:
spark.stop()