Read data from Kafka

In [None]:
from pyspark.sql.functions import *

# Define Kafka configurations
kafka_brokers = "b-1.detrainingmsk.66lq6h.c10.kafka.us-east-1.amazonaws.com:9092"
kafka_topic = 'coincap_trade'


df = (spark
    .readStream
    .format("kafka")
    .option('inferSchema', True)
    .option("kafka.bootstrap.servers", kafka_brokers) 
    .option("subscribe", kafka_topic) 
    .option("startingOffsets", "latest") 
    .load()
     )

df1=df.select(col("value").cast("string"))
df1.printSchema()

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, LongType

json_schema = StructType([
    StructField("exchange", StringType()),
    StructField("base", StringType()),
    StructField("quote", StringType()),
    StructField("direction", StringType()),
    StructField("price", DoubleType()),
    StructField("volume", DoubleType()),
    StructField("timestamp", LongType()),
    StructField("priceUsd", DoubleType())
  ])

df2 = df1.select(
      from_json(col("value"), json_schema).alias("record")
)

df3 = (df2
   .withColumn('readable_time', from_unixtime(col("timestamp")/1000))
   .filter("priceUsd is not NULL")
   .withColumn("readable_time", from_utc_timestamp(from_unixtime(col("timestamp")/1000, "yyyy-MM-dd HH:mm:ss"), "UTC")) 
)

tumbling_window = (df3
   .withWatermark("readable_time", "5 minutes")
   .withColumn('amount', col('volume') * col('priceUsd'))
   .groupBy("base", window("readable_time", "1 minute")) 
   .agg(sum('volume').alias('total_volume'), sum('amount').alias('total_amount'), avg("priceUsd").alias("average_price"))
   .withColumn('actual_avg', col('total_amount') / col('total_volume'))
   .orderBy("base", "window")
)

Write result to memory

In [None]:
# pyspark.sql.streaming.query.StreamingQuery
tumbling_window_streaming_query = (
  tumbling_window.writeStream
  .queryName("events_per_tumbling_window")
  .trigger(processingTime='1 minute')
  .format("memory")
  .outputMode("complete")
  .start()
)

Query the result in memory

In [None]:
import time
from pyspark.sql.window import Window

for i in range(10):
    print(f"Run index {i}:")
    table_df = spark.sql("SELECT * FROM events_per_sliding_window ORDER BY base, window")

    window_size = expr("INTERVAL 6 MINUTES")

   price_trend_df = (table_df
        .withColumn("prev_avg", lag('actual_avg').over(windowSpec))
        .withColumn("price_trend", when(col("acturl_avg") > col("prev_avg"), "up")
                                .when(col("acturl_avg") < col("prev_avg"), "down")
                                .otherwise("null"))
        .withColumn("current_time", current_timestamp())
        .withColumn("current_time_minus_5m", col("current_time") - window_size)
        .filter(col("window.start") > col("current_time_minus_5m"))
    )
    price_trend_df.select('base', 'window', 'total_volume', 'actual_avg', 'price_trend').show(20,truncate=False)
    time.sleep(60)

Write result to a Delta table

In [None]:
stream_chkpoint_loc = 's3a://asc-de-training-destination-s3/roger/checkpint/coincap_streaming'
table_path = 's3a://asc-de-training-destination-s3/roger/data/coincap_streaming_result'

tumbling_window_streaming_query_delta = (
    price_trend_df.writeStream
    .format('delta')
    .outputMode("complete")
    .trigger(processingTime='1 minute')
    .option("checkpointLocation", stream_chkpoint_loc)
    .option("path", table_path)
    .toTable("coincap_streaming_result")
    .start()
)