In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("KafkaLoginCounter") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1") \
    .getOrCreate()


In [None]:
from pyspark.sql.functions import col, from_json, window
from pyspark.sql.types import StructType, StringType, TimestampType

kafka_bootstrap_servers = "localhost:9092"
kafka_topic = "user-events"

schema = StructType() \
    .add("user_id", StringType()) \
    .add("type", StringType()) \
    .add("timestamp", TimestampType())

raw_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "latest") \
    .load()

json_df = raw_df.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), schema).alias("data")) \
    .select("data.*")

login_counts = json_df.filter(col("type") == "login") \
    .groupBy(window(col("timestamp"), "1 minute")) \
    .count()

query = login_counts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", False) \
    .start()

query.awaitTermination()
