In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = (
    SparkSession.builder 
    .master("local") 
    .appName("BIOS-821") 
    .config("spark.executor.cores", 4) 
    .getOrCreate()    
)

In [3]:
spark.conf.set('spark.sql.shuffle.partitions', 5)

# Structured Streaming

![img](https://tse2.mm.bing.net/th?id=OIP.sMrdnOlx6YJdnl6DU8RyswHaDz&pid=Api&w=1037&h=533&rs=1&p=0)

Reference: [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)

## Basic concepts

- sources
- sinks
- output mode
- triggers
- eventt time and watermarks

## Example

We use the exmaple and data set from [Spark: The Definitive Guide](https://github.com/databricks/Spark-The-Definitive-Guide)

Details about the dataset can be found [here](https://archive.ics.uci.edu/ml/datasets/Heterogeneity+Activity+Recognition#)

### Static DataFrame

In [4]:
static = spark.read.json('data/activity-data/part-00000-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json')

In [5]:
static.printSchema()

root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



In [6]:
static.show()

+-------------+-------------------+--------+-----+------+----+-----+-------------+-------------+-------------+
| Arrival_Time|      Creation_Time|  Device|Index| Model|User|   gt|            x|            y|            z|
+-------------+-------------------+--------+-----+------+----+-----+-------------+-------------+-------------+
|1424686735175|1424686733176178965|nexus4_1|   35|nexus4|   g|stand| 0.0014038086|    5.0354E-4|-0.0124053955|
|1424686735378|1424686733382813486|nexus4_1|   76|nexus4|   g|stand|-0.0039367676|  0.026138306|  -0.01133728|
|1424686735577|1424686733579072031|nexus4_1|  115|nexus4|   g|stand|  0.003540039| -0.034744263| -0.019882202|
|1424686735779|1424688581834321412|nexus4_2|  163|nexus4|   g|stand|  0.002822876|  0.005584717|  0.017318726|
|1424686735982|1424688582035859498|nexus4_2|  203|nexus4|   g|stand| 0.0017547607| -0.018981934| -0.022201538|
|1424686736186|1424686734188508066|nexus4_1|  236|nexus4|   g|stand| 0.0014038086|  0.010116577|  4.119873E-4|
|

### Dynamic DataFrame

Streaming does not infer schmea by default, so we read in a single file statically first.

#### Source

Typical sources are Apache Kafka, distributed files, or a socket (for testing).

In [7]:
schema = spark.read.json('data/activity-data/part-00000-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json').schema

In [8]:
stream = (
    spark.readStream.schema(schema).
    option('maxFilesPerTrigger', 1).
    json('data/activity-data/')
)

In [9]:
counts = stream.groupby('gt').count()

#### Sink and output modes

Typical sinks are Apache Kafka, files, `foreach`, console, memory.

There are 3 output modes:

- `complete` writes full oputput
- `update` in-place update of chnaged records
- `append` only adds new records

In [10]:
query = (
    counts.writeStream.
    queryName('activity_counts_complete').
    format('memory').
    outputMode('complete').
    start()
)

In production, you should add this line to your job so the driver stays alive.

```python
query.awaitTermination()
```

In [11]:
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x11ccfd320>]

In [12]:
from time import sleep

In [13]:
for i in range(3):
    spark.sql('''
    SELECT * from activity_counts_complete
    ''').show()
    sleep(1)

+---+-----+
| gt|count|
+---+-----+
+---+-----+

+---+-----+
| gt|count|
+---+-----+
+---+-----+

+----------+-----+
|        gt|count|
+----------+-----+
|       sit|12308|
|     stand|11384|
|stairsdown| 9363|
|      walk|13255|
|  stairsup|10461|
|      null|10446|
|      bike|10797|
+----------+-----+



## Transformations on streams

In [14]:
from pyspark.sql.functions import expr

In [15]:
s1 = (
    stream.withColumn("stairs", expr("gt like '%stairs%'")).
    where("stairs").
    where("gt is not null").
    select("gt", "model", "arrival_time", "creation_time")
)

In [16]:
query = (
    s1.writeStream.
    queryName("trensform_example").
    format("memory").
    outputMode("update").
    start()
)

In [17]:
sleep(3)

In [18]:
spark.sql('''
SELECT * FROM trensform_example
''').show()

+--------+------+-------------+-------------------+
|      gt| model| arrival_time|      creation_time|
+--------+------+-------------+-------------------+
|stairsup|nexus4|1424687983801|1424689829851420571|
|stairsup|nexus4|1424687984163|1424687982169917952|
|stairsup|nexus4|1424687984571|1424687982572835163|
|stairsup|nexus4|1424687984972|1424687982975667195|
|stairsup|nexus4|1424687985370|1424687983379305060|
|stairsup|nexus4|1424687985773|1424687983776247199|
|stairsup|nexus4|1424687986176|1424687984179201300|
|stairsup|nexus4|1424687986585|1424687984587465460|
|stairsup|nexus4|1424687986982|1424687984990016762|
|stairsup|nexus4|1424687987391|1424687985392940346|
|stairsup|nexus4|1424687987786|1424687985795693382|
|stairsup|nexus4|1424687988071|1424689834116578018|
|stairsup|nexus4|1424687988275|1424687986284249290|
|stairsup|nexus4|1424687988475|1424689834519410049|
|stairsup|nexus4|1424687988677|1424687986681893333|
|stairsup|nexus4|1424687988875|1424689834922242080|
|stairsup|ne

In [19]:
from pyspark.sql.functions import mean

In [20]:
s2 = (
    stream.groupby("gt").
    agg(mean("x"), mean("y"), mean("z"))
)

In [21]:
query = (
    s2.writeStream.
    queryName("agg_example").
    format("memory").
    outputMode("complete").
    start()
)

In [22]:
sleep(3)

In [23]:
spark.sql('''
SELECT * FROM agg_example
''').show()

+----------+--------------------+--------------------+--------------------+
|        gt|              avg(x)|              avg(y)|              avg(z)|
+----------+--------------------+--------------------+--------------------+
|       sit|-5.48643225000002...|-1.75231850243742...|-2.21252465063370...|
|     stand|-1.91564553417079...|-1.20081210216091...|-6.67507354181460...|
|stairsdown|0.022206868836056676|-0.03261251395847493| 0.11849359875695864|
|      walk|-0.00533268257910...|0.007791046676386267|9.245203665258084E-4|
|  stairsup|-0.02502670570758...|-0.00196794930013...|-0.09861646979972294|
|      null|-0.00534777761728...|-0.00471625131602...|0.001053548924143...|
|      bike| 0.02652570661093831| -0.0112163392482819|-0.08351623094110396|
+----------+--------------------+--------------------+--------------------+

