In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, to_timestamp, window, avg
)
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

In [None]:
spark = (SparkSession.builder
         .appName("spark_structured_streaming")
         # These 3 lines configure Spark to work with Delta Tables
         # If dependencies are not present in your claspath yet they will be automatically downloaded
         # It is important to have an adequate Java environment. Having more than 1 version
         # installed and env variables not properly set may lead to errors
         .config("spark.jars.packages",
                 "io.delta:delta-spark_2.13:4.0.0,org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0")
         .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
         # Set this to prevent Spark from creating a directory called spark-warehouse
         # where date is persisted
         .config("spark.sql.warehouse.dir", "data/")
         .getOrCreate())

In [None]:
paths = {
    "bronze": f"data/delta/bronze_raw",
    "silver_minavg": f"data/delta/silver_minute_avg",
    "ckpt_bronze": f"data/_checkpoints/bronze_raw",
    "ckpt_silver": f"data/_checkpoints/silver_minute_avg",
}

kafka_bootstrap = "127.0.0.1:9092"
topic = "iot"


In [None]:
raw_kafka = (spark.readStream.format("kafka")
             .option("kafka.bootstrap.servers", kafka_bootstrap)
             .option("subscribe", topic)
             .option("startingOffsets", "earliest")
             .load())

In [None]:
json_df = raw_kafka.selectExpr(
    "CAST(value AS STRING) AS json",
    "topic", "partition", "offset",
    "timestamp as kafka_timestamp"
)

In [None]:

payload_schema = StructType([
    StructField("ts", LongType(), True),  # epoch millis from the device or gateway
    StructField("device_id", StringType(), True),
    StructField("metric", DoubleType(), True),
    StructField("unit", StringType(), True),
])

In [None]:
parsed = json_df.select(
    from_json(col("json"), payload_schema).alias("d"),
    "topic", "partition", "offset", "kafka_timestamp"
).select(
    col("d.ts").alias("event_ts_ms"),
    col("d.device_id"),
    col("d.metric"),
    col("d.unit"),
    col("topic").alias("_kafka_topic"),
    col("partition").alias("_kafka_partition"),
    col("offset").alias("_kafka_offset"),
    col("kafka_timestamp").alias("_kafka_timestamp")
).withColumn("event_time", to_timestamp((col("event_ts_ms") / 1000).cast("double")))  # event-time in seconds


In [None]:
bronze_q = (parsed.writeStream
            .format("delta")
            .option("checkpointLocation", paths["ckpt_bronze"])
            .outputMode("append")
            .option("mergeSchema", "true")
            .start(paths["bronze"]))

In [None]:
minute_avg = (parsed
.withWatermark("event_time", "2 minutes")  # tolerate late events up to 2 min
.groupBy(
    col("device_id"),
    window(col("event_time"), "1 minute").alias("win")
)
.agg(avg("metric").alias("avg_metric"))
.select(
    col("device_id"),
    col("win.start").alias("window_start"),
    col("win.end").alias("window_end"),
    col("avg_metric")
)
)

In [None]:
silver_q = (minute_avg.writeStream
            .format("delta")
            .option("checkpointLocation", paths["ckpt_silver"])
            .outputMode("append")  # with watermark + tumbling window, we can append final results
            .option("mergeSchema", "true")
            .start(paths["silver_minavg"]))

In [None]:
print(silver_q.isActive)
silver_q.stop()
# bronze_q.stop()

# spark.streams.awaitAnyTermination(10)

In [None]:
spark.sql(f"CREATE TABLE IF NOT EXISTS iot_bronze USING DELTA LOCATION '{paths['bronze'][5:]}'")
spark.sql(f"CREATE TABLE IF NOT EXISTS iot_silver_minavg USING DELTA LOCATION '{paths['silver_minavg'][5:]}'")

In [None]:
spark.sql(f"describe extended iot_silver_minavg").show(truncate=False)

In [None]:
spark.sql("SELECT * FROM iot_silver_minavg").show(truncate=False)

In [None]:
df_bronze = spark.read.parquet(paths["bronze"])
df_silver = spark.read.parquet(paths["silver_minavg"])

In [None]:
df_bronze.count()

In [None]:
df_silver.show()