In [None]:
import os
from pyspark.sql import SparkSession

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.0.0,  pyspark-shell'



spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "2g").\
        getOrCreate()

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "crypto") \
  .load()

json_schema = StructType([
    StructField("data", StructType([
        StructField("id", StringType()),
        StructField("rank", StringType()),
        StructField("symbol", StringType()),
        StructField("name", StringType()),
        StructField("supply", StringType()),
        StructField("maxSupply", StringType()),
        StructField("marketCapUsd", StringType()),
        StructField("volumeUsd24Hr", StringType()),
        StructField("priceUsd", StringType()),
        StructField("previousPriceUsd", StringType()),
        StructField("changePercent24Hr", StringType()),
        StructField("vwap24Hr", StringType()),
        StructField("explorer", StringType()),
        StructField("timestamp", TimestampType())
    ]))
])


df = df.select("*", F.col("key").cast("string").alias("key_casted"), F.col("value").cast("string").alias("value_casted"))


df = df.withColumn('value_casted', F.from_json(F.col('value_casted'), json_schema))
df = df.select(F.col("key_casted"),F.col("value_casted.data.id"),F.col("value_casted.data.rank"),F.col("value_casted.data.symbol"),F.col("value_casted.data.name"),F.col("value_casted.data.supply"),F.col("value_casted.data.maxSupply"),F.col("value_casted.data.marketCapUsd")
               ,F.col("value_casted.data.volumeUsd24Hr"),F.col("value_casted.data.priceUsd"),F.col("value_casted.data.previousPriceUsd"),
               F.col("value_casted.data.changePercent24Hr"),F.col("value_casted.data.vwap24Hr"),F.col("value_casted.data.explorer"),F.col("value_casted.data.timestamp"),F.col("topic"),F.col("timestamp").alias("kafka_timestamp"))





StreamDF = df \
    .withWatermark("timestamp", "1 seconds") \
    .groupBy("timestamp","key_casted","topic",F.window(df.timestamp, "1 seconds")) \
    .agg(
        F.avg(df.priceUsd).alias('avg_current_price'),
        F.avg(df.previousPriceUsd).alias('avg_previous_price')
    )

StreamDF = StreamDF.withColumn('window_start_time',F.col("window.start").cast("string")) \
.withColumn('window_end_time',F.col("window.end").cast("string")).orderBy("timestamp")


In [None]:

StreamDF.writeStream \
.format("console") \
.outputMode("complete") \
.option("truncate", False) \
.option("numRows",10000) \
.start() \
.awaitTermination()