In [1]:
from pyspark.sql import SparkSession
from IPython.display import display, clear_output
import time
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [2]:
spark = SparkSession.builder \
        .appName('kafka') \
        .getOrCreate()

In [3]:
stream_df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "broker:29092") \
  .option("startingOffsets", "earliest") \
  .option("subscribe", "stock-trades") \
  .load()

In [4]:
stream_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [5]:
raw_stream = stream_df \
    .writeStream \
    .format("memory") \
    .queryName("raw_stocktrade_view") \
    .start()

In [6]:
clear_output(wait=True)

In [7]:
time.sleep(10)

In [8]:
clear_output(wait=True)
display(spark.sql('SELECT * FROM raw_stocktrade_view').show(20))
time.sleep(1)

+----------------+--------------------+------------+---------+-------+--------------------+-------------+
|             key|               value|       topic|partition| offset|           timestamp|timestampType|
+----------------+--------------------+------------+---------+-------+--------------------+-------------+
|      [5A 56 56]|[7B 22 73 63 68 6...|stock-trades|        0|1897636|2021-06-10 10:26:...|            0|
|   [5A 42 5A 58]|[7B 22 73 63 68 6...|stock-trades|        0|1897637|2021-06-10 10:26:...|            0|
|[5A 57 5A 5A 54]|[7B 22 73 63 68 6...|stock-trades|        0|1897638|2021-06-10 10:26:...|            0|
|[5A 56 5A 5A 54]|[7B 22 73 63 68 6...|stock-trades|        0|1897639|2021-06-10 10:26:...|            0|
|   [5A 42 5A 58]|[7B 22 73 63 68 6...|stock-trades|        0|1897640|2021-06-10 10:26:...|            0|
|[5A 4A 5A 5A 54]|[7B 22 73 63 68 6...|stock-trades|        0|1897641|2021-06-10 10:26:...|            0|
|[5A 57 5A 5A 54]|[7B 22 73 63 68 6...|stock-t

None

In [9]:
raw_stream.stop()

In [15]:
string_stream_df = stream_df \
    .withColumn("key", stream_df["key"].cast(StringType())) \
    .withColumn("value", stream_df["value"].cast(StringType()))

In [16]:
string_stream = string_stream_df \
    .writeStream \
    .format("memory") \
    .queryName("string_stocktrade_view") \
    .start()

In [17]:
clear_output(wait=True)
display(spark.sql('SELECT * FROM string_stocktrade_view').show(20, False))
time.sleep(1)

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+



None

In [18]:
string_stream.stop()

In [19]:
schema_stocktrade = StructType([
    StructField('payload', StructType([
        StructField("side", StringType(),  True),
        StructField("quantity", IntegerType(), True),
        StructField("symbol", StringType(),  True),
        StructField("price", IntegerType(),  True),
        StructField("account", StringType(),  True),
        StructField("userid", StringType(),  True)      
    ]))
])

In [20]:
json_stream_df = string_stream_df\
    .withColumn("value", F.from_json("value", schema_stocktrade))

In [21]:
json_stream_df.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- payload: struct (nullable = true)
 |    |    |-- side: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- price: integer (nullable = true)
 |    |    |-- account: string (nullable = true)
 |    |    |-- userid: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [22]:
json_stream = json_stream_df \
    .writeStream \
    .format("memory") \
    .queryName("extract_stock_view") \
    .start()

In [24]:
clear_output(wait=True)
display(spark.sql('SELECT * FROM extract_stock_view').show(20))
time.sleep(1)

+-----+--------------------+------------+---------+-------+--------------------+-------------+
|  key|               value|       topic|partition| offset|           timestamp|timestampType|
+-----+--------------------+------------+---------+-------+--------------------+-------------+
|  ZVV|{{SELL, 4220, ZVV...|stock-trades|        0|1897636|2021-06-10 10:26:...|            0|
| ZBZX|{{BUY, 3823, ZBZX...|stock-trades|        0|1897637|2021-06-10 10:26:...|            0|
|ZWZZT|{{SELL, 1571, ZWZ...|stock-trades|        0|1897638|2021-06-10 10:26:...|            0|
|ZVZZT|{{BUY, 2419, ZVZZ...|stock-trades|        0|1897639|2021-06-10 10:26:...|            0|
| ZBZX|{{BUY, 2499, ZBZX...|stock-trades|        0|1897640|2021-06-10 10:26:...|            0|
|ZJZZT|{{SELL, 1568, ZJZ...|stock-trades|        0|1897641|2021-06-10 10:26:...|            0|
|ZWZZT|{{SELL, 3373, ZWZ...|stock-trades|        0|1897642|2021-06-10 10:26:...|            0|
| ZBZX|{{SELL, 643, ZBZX...|stock-trades|        0

None

In [25]:
json_stream.stop()

In [26]:
stocktrade_stream_df = json_stream_df \
    .select( \
        F.col("key").alias("event_key"), \
        F.col("topic").alias("event_topic"), \
        F.col("timestamp").alias("event_timestamp"), \
        "value.payload.account", \
        "value.payload.symbol", \
        "value.payload.side", \
        "value.payload.price", \
        "value.payload.quantity", \
        "value.payload.userid"
    )

In [27]:
stocktrade_stream_df.printSchema()

root
 |-- event_key: string (nullable = true)
 |-- event_topic: string (nullable = true)
 |-- event_timestamp: timestamp (nullable = true)
 |-- account: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- side: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- userid: string (nullable = true)



In [28]:
stocktrade_stream = stocktrade_stream_df \
    .writeStream \
    .format("memory") \
    .queryName("stocktrade_view") \
    .start()

In [29]:
clear_output(wait=True)
display(spark.sql('SELECT * FROM stocktrade_view').show(20))
time.sleep(1)

+---------+-----------+---------------+-------+------+----+-----+--------+------+
|event_key|event_topic|event_timestamp|account|symbol|side|price|quantity|userid|
+---------+-----------+---------------+-------+------+----+-----+--------+------+
+---------+-----------+---------------+-------+------+----+-----+--------+------+



None

In [30]:
clear_output(wait=True)
display(spark.sql('SELECT event_key, COUNT(1) AS count, round(mean(price),0) as price, round(mean(quantity),0) as qty FROM stocktrade_view GROUP BY 1').show(20))
time.sleep(1)

+---------+-----+-----+------+
|event_key|count|price|   qty|
+---------+-----+-----+------+
|    ZXZZT| 8411|502.0|2497.0|
|    ZVZZT| 8412|499.0|2495.0|
|    ZTEST| 8209|502.0|2496.0|
|    ZWZZT| 8481|502.0|2533.0|
|    ZJZZT| 8388|510.0|2494.0|
|      ZVV| 8268|495.0|2495.0|
|     ZBZX| 8375|497.0|2518.0|
+---------+-----+-----+------+



None

In [31]:
stocktrade_stream.stop()

In [32]:
window_duration = '60 seconds'
slide_duration = '10 seconds'

In [33]:
windowed_count_df = stocktrade_stream_df \
    .withWatermark("event_timestamp", "1 minutes") \
    .groupBy(F.window(stocktrade_stream_df.event_timestamp, window_duration, slide_duration), stocktrade_stream_df.symbol) \
    .count()

In [34]:
count_stream = windowed_count_df \
    .writeStream \
    .format("memory") \
    .outputMode("Complete") \
    .queryName("count_view") \
    .start()

In [35]:
while True:
    clear_output(wait=True)
    display(spark.sql('SELECT * FROM count_view LIMIT 20').show())
    time.sleep(1)

+--------------------+------+-----+
|              window|symbol|count|
+--------------------+------+-----+
|{2021-06-10 10:30...| ZJZZT|  148|
|{2021-06-10 10:36...| ZWZZT|  178|
|{2021-06-10 10:45...| ZJZZT|  146|
|{2021-06-10 10:42...|  ZBZX|  178|
|{2021-06-10 10:30...| ZWZZT|  186|
|{2021-06-10 10:57...| ZWZZT|  149|
|{2021-06-10 10:41...|   ZVV|  162|
|{2021-06-10 11:11...| ZXZZT|  158|
|{2021-06-10 10:52...| ZWZZT|  205|
|{2021-06-10 11:13...| ZTEST|  152|
|{2021-06-10 10:48...|   ZVV|  164|
|{2021-06-10 11:05...| ZXZZT|  190|
|{2021-06-10 10:25...| ZWZZT|  136|
|{2021-06-10 10:53...| ZWZZT|  197|
|{2021-06-10 10:44...| ZVZZT|  188|
|{2021-06-10 10:49...| ZVZZT|  173|
|{2021-06-10 10:32...| ZJZZT|  181|
|{2021-06-10 10:49...| ZVZZT|  174|
|{2021-06-10 10:59...| ZWZZT|  165|
|{2021-06-10 10:53...| ZTEST|  145|
+--------------------+------+-----+



None

KeyboardInterrupt: 

In [None]:
count_stream.stop()