In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, ArrayType

### Create Spark session

In [None]:
from pyspark.sql import SparkSession
import os

spark = SparkSession.builder \
    .config("spark.jars", jars) \
    .appName("Check Spark Version") \
    .master("local[*]") \
    .getOrCreate()

### Read data from `btc-price` topic

Read and parse the data from the `btc-price` topic

In [None]:
# Read data from btc-price topic
# btc-price publish its data from inside the container to outside using 9092 port
df_price = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \ 
    .option("subscribe", "btc-price") \
    .load()
    
# get kafka value and convert it from binary to string format
df_string_price = df_price.selectExpr("CAST(value AS STRING) as json_value")

# cause the data is structured in json format, define a schema to parse it
schema_price = StructType() \
    .add("symbol", StringType()) \
    .add("price", StringType()) \
    .add("timestamp", StringType())
    
# Parse JSON
parsed_price_df = df_string_price.select(from_json(col("json_value"), schema_price).alias("data")).select("data.*")

For ease of debugging, run this cell to see the parsed data.

In [None]:
# Print to console (OPTIONAL) to check whether the data is read correctly or not
query_price = parsed_price_df.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

query_price.awaitTermination()

# Stop the debug stream
query_price.stop()

### Read data from `btc-price-moving` topic

Read and parse the data from the `btc-price-moving` topic

In [None]:
# Read data from btc-price-moving topic
# btc-price-moving publish its data from inside the container to outside using 9092 port
df_moving = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "btc-price-moving") \
    .load()
    
# Get kafka value and convert it from binary to string format
df_string_moving = df_moving.selectExpr("CAST(value AS STRING) as json_value")

# Define schema for each sliding window
schema_window = StructType() \
    .add("window", StringType()) \
    .add("avg_price", DoubleType()) \
    .add("std_price", DoubleType())

# Define schema for each record which consists of array of different duration sliding windows defined above
schema_moving = StructType() \
    .add("timestamp", StringType()) \
    .add("symbol", StringType()) \
    .add("windows", ArrayType(schema_window))

# Parse JSON
parsed_moving_df = df_string_moving.select(from_json(col("json_value"), schema_moving).alias("data")) \
                                .select("data.timestamp", explode("data.windows").alias("win"))         

Explode the sliding window in each record.

In [None]:
# Cause the record consists of array of different duration sliding windows, we have to explode them 
# to a single window with corresponding timestamp in the record
flat_moving_df = parsed_moving_df.select(
    col("moving_ts"),
    col("symbol"),
    col("win.window").alias("window_type"),       # "30s", "1m", ...
    col("win.avg_price").alias("mean"),
    col("win.std_price").alias("std")
)

For ease of debugging, run this cell to see the parsed data.

In [None]:
# Print to console (OPTIONAL) to check whether the data is read correctly or not
query_moving = flat_moving_df.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

query_moving.awaitTermination()

# Stop the debugging stream
query_moving.stop()

### Join two topic by sliding window

Join records in topic `btc-price` and `btc-price-moving` by its correspond sliding window.

In [None]:
from pyspark.sql.functions import window

# List of sliding windows and its duration
windows = [("30s", "30 seconds"), 
           ("1m", "1 minute"), 
           ("5m", "5 minutes"), 
           ("15m", "15 minutes"), 
           ("30m", "30 minutes"), 
           ("1h", "1 hour")]

window_zscore = None
for label, duration in windows:
    price = price_parsed.withWatermark("timestamp", "10 seconds") \
        .withColumn("window", window("timestamp", duration))

    moving = flat_df.filter(col("window_type") == label) \
        .withColumn("window", window("moving_ts", duration))

    joined = price.join(moving, on="window")

    # handle the case where standard deviation is equal to zero
    z_col = when(col("std") == 0, 0).otherwise((col("price") - col("mean")) / col("std"))
    
    result = joined.select(
        "timestamp", 
        "symbol", 
        lit(label).alias("window"),
        z_col.alias("zscore_price")
    )
        
    if window_zscore is None:
        window_zscore = result
    else:
        window_zscore = window_zscore.union(result)

Group windows z-score to a single JSON record by timestamp and symbol

In [None]:
# Group windows z-score to a single JSON record by timestamp and symbol
grouped = window_zscore.groupBy("symbol", "timestamp").agg(
    collect_list(
        struct(
            col("window"), 
            col("zscore_price")
        )
    ).alias("windows")
).select(
    to_json(
        struct(
            col("timestamp").cast("string"),
            col("symbol"),
            col("windows")
        )
    ).alias("value")
)

Write into Kafka topic `btc-price-zscore`

In [None]:
# Write into Kafka topic btc-price-zscore
grouped.writeStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("topic", "btc-price-zscore").option("checkpointLocation", '/tmp/btc-zscore-checkpoint') \
    .outputMode("update") \
    .start().awaitTermination()