# Spark Strreaming

In [None]:
import string
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

In [None]:
spark = (
    SparkSession.builder 
    .master("local") 
    .appName("BIOS-823") 
    .config("spark.executor.cores", 4) 
    .getOrCreate()    
)

## High level API

In [None]:
df = (
    spark.
    readStream.
    format("text").
    option("maxFilesPerTrigger", 1)
    .load("data/inputs")
)

In [None]:
df.printSchema()

In [None]:
from pyspark.sql.functions import explode, split, lower, expr

In [None]:
wc = (
    df.select(explode(split(lower(expr('value')), ' ')))
)

In [None]:
query = (
    wc.writeStream.
    format('csv').
    option('checkpointLocation', 'data/checkpoits').
    option('path', 'data/outputs').
    outputMode('append').
    start()
)

In [None]:
query.stop()

## Example

We use the exmaple and data set from [Spark: The Definitive Guide](https://github.com/databricks/Spark-The-Definitive-Guide)

Details about the dataset can be found [here](https://archive.ics.uci.edu/ml/datasets/Heterogeneity+Activity+Recognition#)

If you want to run the example

```bash
cd data
wget https://www.dropbox.com/s/8zgrpu4o3nqdcgh/activity-data.zip
unzip activity-data.zip
```

### Static DataFrame

In [None]:
static = spark.read.json('data/activity-data/part-00000-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json')

In [None]:
static.printSchema()

In [None]:
static.show()

### Dynamic DataFrame

Streaming does not infer schmea by default, so we read in a single file statically first.

#### Source

Typical sources are Apache Kafka, distributed files, or a socket (for testing).

In [None]:
schema = spark.read.json('data/activity-data/part-00000-tid-730451297822678341-1dda7027-2071-4d73-a0e2-7fb6a91e1d1f-0-c000.json').schema

In [None]:
stream = (
    spark.readStream.
    schema(schema).
    option('maxFilesPerTrigger', 1).
    json('data/activity-data/')
)

In [None]:
counts = stream.groupby('gt').count()

#### Sink and output modes

Typical sinks are Apache Kafka, files, `foreach`, console, memory.

There are 3 output modes:

- `complete` writes full oputput
- `update` in-place update of chnaged records
- `append` only adds new records

In [None]:
query = (
    counts.writeStream.
    queryName('activity_counts_complete').
    format('memory').
    outputMode('complete').
    start()
)

In production, you should add this line to your job so the driver stays alive.

```python
query.awaitTermination()
```

In [None]:
spark.streams.active

In [None]:
from time import sleep

In [None]:
for i in range(3):
    spark.sql('''
    SELECT * from activity_counts_complete
    ''').show()
    sleep(1)

## Transformations on streams

In [None]:
from pyspark.sql.functions import expr

In [None]:
s1 = (
    stream.withColumn("stairs", expr("gt like '%stairs%'")).
    where("stairs").
    where("gt is not null").
    select("gt", "model", "arrival_time", "creation_time")
)

In [None]:
query = (
    s1.writeStream.
    queryName("transform_example").
    format("memory").
    outputMode("update").
    start()
)

In [None]:
sleep(10)

In [None]:
spark.sql('''
SELECT * FROM transform_example
''').show()

In [None]:
from pyspark.sql.functions import mean

In [None]:
s2 = (
    stream.groupby("gt").
    agg(mean("x"), mean("y"), mean("z"))
)

In [None]:
query = (
    s2.writeStream.
    queryName("agg_example").
    format("memory").
    outputMode("complete").
    start()
)

In [None]:
sleep(15)

In [None]:
spark.sql('''
SELECT * FROM agg_example
''').show()