# Structured streaming Demo

In [None]:
# Imports cell

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, BooleanType

from pyspark.sql.functions import window, col, count

# Convert timestamp (Unix epoch in seconds) to timestamp type
from pyspark.sql.functions import from_unixtime




In [None]:
# initialize the spark session
spark = SparkSession.builder \
    .appName("KafkaStreamingPrototype") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:4.1.0") \
    .getOrCreate()


In [None]:
# Define the initial raw streaming dataframe
raw_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:19092") \
    .option("subscribe", "wikistreams") \
    .option("startingOffsets", "earliest") \
    .load()


In [None]:
# Define a simplified schema - just the key fields for analysis
simple_wikimedia_schema = StructType([
    StructField("title", StringType(), True),
    StructField("user", StringType(), True),
    StructField("timestamp", LongType(), True),
    StructField("type", StringType(), True),
    StructField("wiki", StringType(), True),
    StructField("bot", BooleanType(), True),
    StructField("minor", BooleanType(), True),
    StructField("comment", StringType(), True),
    StructField("length", StructType([
        StructField("old", IntegerType(), True),
        StructField("new", IntegerType(), True)
    ]), True)
])

# Apply the schema to organize the raw input
parsed_df = raw_df.select(
    from_json(col("value").cast("string"), simple_wikimedia_schema).alias("data")
).select("data.*")

In [None]:
# A simple aggregation of raw data by number of bot edits
windowed_counts_df = parsed_df \
    .withColumn("event_time", from_unixtime(col("timestamp"))) \
    .groupBy(
        window(col("event_time"), "1 hour"),
        col("bot")
    ) \
    .agg(count("*").alias("edit_count")) \
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("bot"),
        col("edit_count")
    )

In [None]:

# Use console output (goes to Docker logs)
streaming_query_df = windowed_counts_df \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .option("truncate", False) \
    .start()