In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType

spark = SparkSession.builder \
    .appName("KafkaStreamToParquet") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .getOrCreate()


In [2]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, IntegerType

schema = StructType() \
    .add("user_id", IntegerType()) \
    .add("event_type", StringType()) \
    .add("target_id", IntegerType()) \
    .add("timestamp", StringType()) \
    .add("device", StringType()) \
    .add("location", StringType())


In [3]:
df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "host.docker.internal:9092")\
    .option("subscribe", "social_events") \
    .option("startingOffsets", "latest") \
    .load()

df_json = df_raw.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")


In [None]:
query = df_json.writeStream \
    .format("parquet") \
    .option("path", "/home/jovyan/work/parquet_output/") \
    .option("checkpointLocation", "/home/jovyan/work/parquet_output/checkpoint/") \
    .outputMode("append") \
    .start()

query.awaitTermination()
