## Spark Streaming - Demo
This notebook will guide us through some examples of **Spark Streaming API**. 
In particular, in this notebook Spark will interact with **Apache Kafka** (a message broker) and will consume a stream of messages.

As for the Spark Demo, the **SparkSession** object must be created in order to be able to work with streams. Note that in order to use Kafka the right libraries must be loaded.

In [1]:
#this cell creates a Spark Session object that is used to interact with Spark
from pyspark.sql import SparkSession
ss = SparkSession.builder \
.config('spark.jars.packages', 'org.apache.kafka:kafka_2.11:1.1.1,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4') \
.getOrCreate()
ss.version
# version 2.4.4 uses Scala 2.11

'2.4.4'

### Infinite dataframe

Spark Streaming API allows us to work with infinite streams and manage them as they were relational tables. In the following cell the SparkSession maps a stream of messages from Kafka (topic *my_topic* from the broker available at *kafka:9092*) onto a dataframe object.  

In [2]:
brokers = 'kafka:9092'
df=(ss
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers",brokers)
  .option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  .option("subscribe", "my_topic")
  .option("startingOffsets", "earliest") # read data from the beginning of the stream
  .load())

The structure of a Kafka message is icludes:

- a key
- a value
- the name of the topic
- the stream partition
- the timestamp

It can be easily checked using the `printSchema()` method.

In [3]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### Imposing a schema

To handle the data stream, Spark needs to access the `value` field of every Kafka message. In this scenario, a message is a string in json format. The following cell define a data schema to be used to extract to message's fields of `df` and a new dataframe.  

In [4]:
from pyspark.sql.types import StringType, StructField, StructType, IntegerType, TimestampType, BooleanType
mySchema = StructType([
            StructField("name", StringType(), True),
            StructField("id", StringType(), True),
            StructField("firstname", IntegerType(), True),
            StructField("lastname", StringType(), True),
            StructField("address", StringType(), True),
            StructField("timestamp", TimestampType(), True),
            StructField("gender", StringType(), True),
            StructField("arrested", BooleanType(), True),
            StructField("age", IntegerType(), True),
            StructField("race", StringType(), True)
])

We can now extract the `value` field and apply the schema using the `from_json()` function. A new dataframe `df1` is created. Note that, since it is lazy, no action is undertaken and no process is depolyed at this point.

In [5]:
from pyspark.sql.functions import from_json

df1 =(df.selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
  .select(from_json("value", mySchema).alias("data"), "timestamp")
  .select("data.race",
          "data.gender", 
          "data.lastname", 
          "data.firstname", 
          "data.arrested", 
          "data.age", "timestamp"))

Printing the schema we can check that the structure of `df1` is correct. 

In [6]:
df1.printSchema()

root
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- firstname: integer (nullable = true)
 |-- arrested: boolean (nullable = true)
 |-- age: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



### SQL over stream

Spark SQL API applies also on infinite dataframes. In the following cell the `df1` dataframe is registered as table `mytable`. An sql query is also defined; note that the result is another dataframe: `df2`.

In [7]:
df1.createOrReplaceTempView("mytable")
df2 =ss.sql('''
SELECT age, count(age) from mytable group by age'''
           )

### Starting the stream data processing

In the following cell, the streaming query from `df2` is started, a window of 15 seconds is set up. The query sink is the machine's memory.
The output mode **complete** means each output data point is a table.

In [8]:
streamingQuery1 = df2.writeStream \
  .outputMode("complete") \
  .format("memory") \
  .queryName("test") \
  .option("truncate", "false") \
  .trigger(processingTime = "15 seconds")\
  .start()

We can read the output stream from memory using sql over the output stream, named `test`.

In [9]:
import time
for i in range(5):
    print(f'Batch number {i}')
    ss.sql('select * from test').show()
    time.sleep(15)


Batch number 0
+---+----------+
|age|count(age)|
+---+----------+
| 31|       235|
| 65|       249|
| 53|       262|
| 34|       254|
| 28|       265|
| 76|       250|
| 26|       244|
| 27|       268|
| 44|       221|
| 12|       266|
| 22|       241|
| 47|       276|
| 52|       226|
| 13|       248|
| 16|       282|
| 40|       237|
| 20|       251|
| 57|       270|
| 54|       245|
| 48|       280|
+---+----------+
only showing top 20 rows

Batch number 1
+---+----------+
|age|count(age)|
+---+----------+
| 31|       236|
| 65|       251|
| 53|       265|
| 34|       257|
| 28|       265|
| 76|       253|
| 26|       247|
| 27|       269|
| 44|       222|
| 12|       268|
| 22|       243|
| 47|       279|
| 52|       227|
| 13|       251|
| 16|       286|
| 40|       239|
| 20|       254|
| 57|       273|
| 54|       248|
| 48|       281|
+---+----------+
only showing top 20 rows

Batch number 2
+---+----------+
|age|count(age)|
+---+----------+
| 31|       236|
| 65|       252|
| 

### Stopping the streaming query

The just created application would run potentially forever. To stop it we can use the `stop()` method. 

In [10]:
streamingQuery1.stop()

### Update mode and Console output format

The following cell deploy the previous stream data analysis by outputting the resulting stream in console in `update` output mode.
In the update output mode only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates

Use the `docker logs -f pyspark-jupyter` command to check that the application is running.


In [11]:
streamingQuery2=df2.writeStream \
  .outputMode("update") \
  .option("truncate", "false") \
  .format("console") \
  .trigger(processingTime = "15 seconds")  \
  .start()


Let's check that the query is active with the `isActive` field.

In [12]:
streamingQuery2.isActive

True

Let's stop the application and check that it is not active anymore.

In [13]:
streamingQuery2.stop()

In [14]:
streamingQuery2.isActive

False

### Merging infinite columns

The next cell generates a new dataframe (`df3`) concatenating the two columns of the `df2` dataframe.

In [15]:
from pyspark.sql.functions import concat, col, lit 
df3=df2.select(concat(col("age"), lit(" "), col("count(age)")).alias("value"))
df3.printSchema()

root
 |-- value: string (nullable = true)



### Sinking into Kafka

The next cell deploys the streaming application defined by `df3` using Kafka as sink. The `topic_out` is used as sink and the mode is `complete`. Note that the stream is checkpointed.

To check the output stream use the `peeping_data_stream_out.sh` script.

In [16]:
streamingQuery3=df3 \
        .writeStream \
          .format("kafka") \
          .outputMode("complete") \
          .option("kafka.bootstrap.servers",brokers) \
          .option("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") \
          .option("topic", "topic_out") \
          .option("failOnDataLoss","false")\
          .option("checkpointLocation", "checkpoint") \
          .start()

The notebook concludes by stopping the active streaming application.

In [21]:
streamingQuery3.stop()

In [22]:
streamingQuery3.isActive

False