## Stream Processing Exercise 4 - Consuming from Kafka

Goals:

* Perform different computations on a input stream: read, aggregation, windowed aggregation
* Additional references
    * [Spark Streaming](https://spark.apache.org/streaming/)
    * [Structured Spark Streaming documentation](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
    * [Spark and Kafka integration guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)


Let’s inspect content of Pageviews topic, showing it every 5 seconds:

In [1]:
import sys
import os 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils


sc = SparkContext(appName="PageViewsConsumer")

ssc = StreamingContext(sc, 5)


topics = ['pageviews']

kafkaParams = {'bootstrap.servers': 'broker:29092', 
               'group.id' : 'test'}

stream = KafkaUtils.createDirectStream(ssc, topics, kafkaParams)

stream.map(lambda record : (record[0], record[1])).pprint()


ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2021-01-28 18:37:00
-------------------------------------------

-------------------------------------------
Time: 2021-01-28 18:37:05
-------------------------------------------

-------------------------------------------
Time: 2021-01-28 18:37:10
-------------------------------------------

-------------------------------------------
Time: 2021-01-28 18:37:15
-------------------------------------------

-------------------------------------------
Time: 2021-01-28 18:37:20
-------------------------------------------
('1', '1611859030254,User_2,Page_34')
('11', '1611859036879,User_5,Page_64')
('21', '1611859037096,User_3,Page_66')
('31', '1611859037960,User_9,Page_83')
('41', '1611859038050,User_2,Page_58')
('51', '1611859038655,User_8,Page_21')
('61', '1611859039176,User_8,Page_86')
('71', '1611859039483,User_7,Page_56')
('81', '1611859039967,User_7,Page_66')
('91', '1611859040049,User_9,Page_55')
...

--------------------------------

KeyboardInterrupt: 

Now, inspect also the content of Users topic

In [None]:
import sys
import os 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils


sc = SparkContext(appName="PageViewsConsumer")

ssc = StreamingContext(sc, 5)


topicUsers = ['users']

kafkaParams = {'bootstrap.servers': 'broker:29092', 
               'group.id' : 'test'}

stream = KafkaUtils.createDirectStream(ssc, topicUsers, kafkaParams)

stream.map(lambda record : (record[0], record[1])).pprint()


ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2021-01-28 18:40:50
-------------------------------------------
('User_6', '1519013368530,User_6,Region_2,OTHER')
('User_2', '1516444378607,User_2,Region_7,OTHER')
('User_5', '1512373170274,User_5,Region_6,FEMALE')
('User_4', '1514640010990,User_4,Region_4,FEMALE')
('User_6', '1499371853710,User_6,Region_1,FEMALE')
('User_5', '1513578775862,User_5,Region_5,OTHER')
('User_6', '1494641976481,User_6,Region_8,FEMALE')
('User_8', '1503677985092,User_8,Region_7,FEMALE')
('User_8', '1504700142647,User_8,Region_2,OTHER')
('User_8', '1515510948549,User_8,Region_1,MALE')
...

-------------------------------------------
Time: 2021-01-28 18:40:55
-------------------------------------------
('User_8', '1502347843740,User_8,Region_5,OTHER')
('User_7', '1514332347842,User_7,Region_1,OTHER')
('User_4', '1500111995736,User_4,Region_5,MALE')
('User_3', '1511533300960,User_3,Region_9,OTHER')
('User_8', '1505762955472,User_8,Region_6,MALE')
('User_5', '149

Here we will consume streaming data from pageviews kafka topic to count numer of visits per page.
First we are going to define input Stream

In [1]:
from pyspark import sql
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("PageViewsConsumer") \
    .getOrCreate()


dfPageViewsStream = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker:29092")
    .option("subscribe", "pageviews")
    .load()
)

dfPageViews = (
    dfPageViewsStream
    .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    .withColumn("_tmp", split(col("value"), "\\,"))
    .select((col("_tmp").getItem(0).cast("long") / lit(1000)).cast("timestamp").alias("viewtime"),
            col("_tmp").getItem(1).alias("userid"),
            col("_tmp").getItem(2).alias("pageid"),
            col("timestamp"))
)

dfPageViews.printSchema()


root
 |-- viewtime: timestamp (nullable = true)
 |-- userid: string (nullable = true)
 |-- pageid: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



Now let's create a table to store query output on memory



In [2]:
dfPageViews.writeStream.format("memory").outputMode("append").queryName("PageViews").start()


<pyspark.sql.streaming.StreamingQuery at 0x7f939409bb50>

Here you can see table structure

In [3]:
spark.sql("describe pageviews").show()

+---------+---------+-------+
| col_name|data_type|comment|
+---------+---------+-------+
| viewtime|timestamp|   null|
|   userid|   string|   null|
|   pageid|   string|   null|
|timestamp|timestamp|   null|
+---------+---------+-------+



Now, select those events happening in odd minutes.

In [4]:
spark.sql('select * from PageViews where (minute(timestamp)%2) != 0').show()

+--------------------+------+-------+--------------------+
|            viewtime|userid| pageid|           timestamp|
+--------------------+------+-------+--------------------+
|2021-01-28 18:47:...|User_4|Page_37|2021-01-28 18:47:...|
|2021-01-28 18:47:...|User_5|Page_13|2021-01-28 18:47:...|
|2021-01-28 18:47:...|User_3|Page_43|2021-01-28 18:47:...|
|2021-01-28 18:47:...|User_9|Page_35|2021-01-28 18:47:...|
|2021-01-28 18:47:...|User_1|Page_28|2021-01-28 18:47:...|
|2021-01-28 18:47:...|User_9|Page_29|2021-01-28 18:47:...|
|2021-01-28 18:47:...|User_3|Page_50|2021-01-28 18:47:...|
|2021-01-28 18:47:...|User_6|Page_14|2021-01-28 18:47:...|
|2021-01-28 18:47:...|User_1|Page_32|2021-01-28 18:47:...|
|2021-01-28 18:47:...|User_2|Page_16|2021-01-28 18:47:...|
|2021-01-28 18:47:...|User_1|Page_44|2021-01-28 18:47:...|
|2021-01-28 18:47:...|User_1|Page_67|2021-01-28 18:47:...|
|2021-01-28 18:47:...|User_3|Page_22|2021-01-28 18:47:...|
|2021-01-28 18:47:...|User_3|Page_29|2021-01-28 18:47:..

Try with an order over userid.

In [5]:
spark.sql('select * from PageViews where (minute(timestamp)%2) != 0 order by userid').show()

+--------------------+------+-------+--------------------+
|            viewtime|userid| pageid|           timestamp|
+--------------------+------+-------+--------------------+
|2021-01-28 18:49:...|User_1|Page_40|2021-01-28 18:49:...|
|2021-01-28 18:49:...|User_1|Page_75|2021-01-28 18:49:...|
|2021-01-28 18:49:...|User_1|Page_43|2021-01-28 18:49:...|
|2021-01-28 18:49:...|User_1|Page_90|2021-01-28 18:49:...|
|2021-01-28 18:49:...|User_1|Page_49|2021-01-28 18:49:...|
|2021-01-28 18:49:...|User_1|Page_30|2021-01-28 18:49:...|
|2021-01-28 18:49:...|User_1|Page_78|2021-01-28 18:49:...|
|2021-01-28 18:49:...|User_1|Page_74|2021-01-28 18:49:...|
|2021-01-28 18:49:...|User_1|Page_87|2021-01-28 18:49:...|
|2021-01-28 18:49:...|User_1|Page_13|2021-01-28 18:49:...|
|2021-01-28 18:49:...|User_1|Page_97|2021-01-28 18:49:...|
|2021-01-28 18:49:...|User_1|Page_29|2021-01-28 18:49:...|
|2021-01-28 18:49:...|User_1|Page_44|2021-01-28 18:49:...|
|2021-01-28 18:49:...|User_1|Page_94|2021-01-28 18:49:..


Now count number of visits of each page:

* from the source stream: dfPageViews
* by page means group by pageid 
* count as the aggregation operation
* store the output stream as an in-memory table: CountsByPage.

Describe its content and show part of the content

In [6]:
dfCountsByPage = dfPageViews.groupBy('pageid').count()

dfCountsByPage.printSchema()

root
 |-- pageid: string (nullable = true)
 |-- count: long (nullable = false)



In [10]:
dfCountsByPage \
.writeStream \
.format("memory") \
.outputMode("complete") \
.queryName("CountsByPage") \
.start()

<pyspark.sql.streaming.StreamingQuery at 0x7f93940941d0>

In [11]:
spark.sql('describe CountsByPage').show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|  pageid|   string|   null|
|   count|   bigint|   null|
+--------+---------+-------+



In [12]:
spark.sql('select * from CountsByPage').show()

+------+-----+
|pageid|count|
+------+-----+
+------+-----+



Now we want to get number of visits every 5 minutes over last 10 minutes:

* 10 minutes is the window duration
* 5 minutes is the slide duration

Additional references for windowing in Spark can be found [here](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time).


In [7]:
dfWindow = dfPageViews.groupBy(window('timestamp','10 minutes','5 minutes'), 'pageid').count()

dfWindow.printSchema()

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- pageid: string (nullable = true)
 |-- count: long (nullable = false)



In [8]:
dfWindow.writeStream.format("memory").outputMode("complete").queryName("pageWindow").start()

<pyspark.sql.streaming.StreamingQuery at 0x7f936fcda890>

In [9]:
spark.sql('select * from pageWindow').show()

+--------------------+-------+-----+
|              window| pageid|count|
+--------------------+-------+-----+
|[2021-01-28 18:50...|Page_18|    1|
|[2021-01-28 18:55...|Page_18|    1|
+--------------------+-------+-----+

