In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Triggers in Spark Streaming") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    .config("spark.sql.shuffle.partitions", 2)
    .master("local[*]") 
    .getOrCreate()
)

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy.optimize import minimize
from scipy.stats import norm
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import count, lit, window, avg, stddev, sum, pow
        
# Create the kafka_df to read from kafka
kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "ed-kafka:29092")
    .option("subscribe", "MatchTrades-data,Lob-data")
    .option("startingOffsets", "earliest")
    .load()
)

# Convert binary to string value column
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

from pyspark.sql.functions import from_json, col, split, explode, window

# JSON Schema
json_schema = "date timestamp, spread string, LDispersion string, price string, topic string"

# Expand JSON from Value column using Schema
json_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema))

# Select the required columns
flattened_df = json_df.select("values_json.date","values_json.price","values_json.spread","values_json.LDispersion","topic")

# Split the data in words
data_df = flattened_df \
    .withColumn("spread", col("spread")) \
    .withColumn("LDispersion", col("LDispersion")) \
    .withColumn("price", col("price")) \
    .withColumn("topic",col("topic")) \
    .withColumn("date", col("date").cast("timestamp")) \
    
# Aggregate the words to generate count
from pyspark.sql.functions import count, lit, window, avg, stddev

df_agg = data_df \
    .withWatermark("date", "30 minutes") \
    .groupBy(window("date", "10 minutes","5 minutes")) \
    .agg(avg("price").alias("avg_price"),stddev("price").alias("stddev_price")) 

df_final = df_agg.selectExpr("window.start as start_time", "window.end as end_time","avg_price","stddev_price") \
#.orderBy(['start_time','end_time'], ascending = [False,False])

result = (df_final.selectExpr(
                    "CAST(start_time AS STRING)",
                    "CAST(end_time AS STRING)",
                    "CAST(avg_price AS STRING)",
                    "CAST(stddev_price AS STRING)",
            ).withColumn("value", to_json(struct("*")).cast("string"),)
         )

In [None]:
(result
.select("value")
.writeStream.trigger(processingTime="10 seconds")
.outputMode("update")
.format("kafka")
.option("topic", "sliding_window")
.option("kafka.bootstrap.servers",  "ed-kafka:29092")
.option("checkpointLocation", "sliding_window")
.start()
.awaitTermination()
)

In [None]:
(result
 .writeStream
 .format("console")
 .outputMode("update")
 .trigger(processingTime='10 seconds')
 .option("checkpointLocation", "checkpoint stream 2")
 .start()
)