In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, to_date, to_timestamp, hour
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType, DoubleType

# Create a Spark session
spark = SparkSession.builder \
    .appName("KafkaStreamingToConsoleExample") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.8") \
    .getOrCreate()

In [2]:
# Kafka connection details
bootstrap_servers = "pkc-56d1g.eastus.azure.confluent.cloud:9092"
kafka_topic = "omar_topic"
kafka_username = "JUKQQM4ZM632RECA"
kafka_password = "UUkrPuSttgOC0U9lY3ZansNsKfN9fbxZPFwrGxudDrfv+knTD4rCwK+KdIzVPX0D"

# Define schema for the incoming JSON data
schema = StructType() \
    .add("eventType", StringType()) \
    .add("customerId", StringType()) \
    .add("productId", StringType()) \
    .add("timestamp", StringType()) \
    .add("metadata", StructType().add("category", StringType()).add("source", StringType())) \
    .add("quantity", IntegerType()) \
    .add("totalAmount", DoubleType()) \
    .add("paymentMethod", StringType()) \
    .add("recommendedProductId", StringType()) \
    .add("algorithm", StringType())

# Read data from Kafka topic as a streaming DataFrame
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("subscribe", kafka_topic) \
    .option("startingOffsets", "earliest") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config",
            f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_username}" password="{kafka_password}";') \
    .load()

In [3]:
# Parse the JSON data and handle schema variations
json_df = df.selectExpr("CAST(value AS STRING)").select(from_json("value", schema).alias("data"))

# Flatten the nested structure
flattened_df = json_df.select(
    col("data.eventType"),
    col("data.customerId"),
    col("data.productId"),
    col("data.timestamp").cast(TimestampType()).alias("timestamp"),
    col("data.metadata.category").alias("category"),
    col("data.metadata.source").alias("source"),
    col("data.quantity"),
    col("data.totalAmount"),
    col("data.paymentMethod"),
    col("data.recommendedProductId"),
    col("data.algorithm"),
    to_date(col("data.timestamp")).alias("event_date"),
    hour(to_timestamp(col("data.timestamp"))).alias("event_hour")
)

In [None]:
# Write data to console
query = flattened_df \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("checkpointLocation", "/user/itversity/checkpoints") \
    .option("path", "/user/itversity/casestudy/kafkastream") \
    .partitionBy("event_date", "event_hour") \
    .start()

query.awaitTermination()

In [None]:
spark.stop()