In [0]:
%run "/config"

In [0]:
# Define Kafka broker parameters and read streaming data from topics
kakfa_df_raw = (spark.readStream
                   .format("kafka") 
                   .option("kafka.bootstrap.servers", ip_address) 
                   .option("subscribePattern", ".*")
                   .option("startingOffsets", "earliest")  # Read earliest message first
                   .load()) 

In [0]:
# Decode raw dataframe
decoded_df = kakfa_df_raw.selectExpr("CAST(value AS STRING) as json_record")
display(decoded_df.limit(10))

json_record
"{""Record"": ""383ca1aa-1fba-4e30-8019-097a0096e684"", ""Hole"": 1, ""Golfer"": ""Sam"", ""Stroke"": 1, ""Par"": 3, ""Yards_To_Pin"": 119, ""Pin_Distance"": 200, ""Club"": ""8 Iron""}"
"{""Record"": ""aa225dc0-35dc-47be-a3fb-3c497883450d"", ""Hole"": 1, ""Golfer"": ""Sam"", ""Stroke"": 2, ""Par"": 3, ""Yards_To_Pin"": 139, ""Pin_Distance"": 200, ""Club"": ""52 Degree Wedge""}"
"{""Record"": ""100b8bd3-db76-456d-933d-5753e794ce62"", ""Hole"": 1, ""Golfer"": ""Sam"", ""Stroke"": 3, ""Par"": 3, ""Yards_To_Pin"": 162, ""Pin_Distance"": 200, ""Club"": ""Putter""}"
"{""Record"": ""9afd74e6-b45c-45d1-8cdd-85740fdd021a"", ""Hole"": 1, ""Golfer"": ""Sam"", ""Stroke"": 4, ""Par"": 3, ""Yards_To_Pin"": 200, ""Pin_Distance"": 200, ""Club"": ""Putter""}"
"{""Record"": ""3fd0f83c-dd56-4065-b686-8c7469804cf3"", ""Hole"": 2, ""Golfer"": ""Sam"", ""Stroke"": 1, ""Par"": 4, ""Yards_To_Pin"": 217, ""Pin_Distance"": 440, ""Club"": ""Driver""}"
"{""Record"": ""309d8c31-903e-4c76-8f5e-bd1f4092d670"", ""Hole"": 2, ""Golfer"": ""Sam"", ""Stroke"": 2, ""Par"": 4, ""Yards_To_Pin"": 323, ""Pin_Distance"": 440, ""Club"": ""6 Iron""}"
"{""Record"": ""41482f80-ee8e-47ba-8ff5-80c64e035277"", ""Hole"": 2, ""Golfer"": ""Sam"", ""Stroke"": 3, ""Par"": 4, ""Yards_To_Pin"": 390, ""Pin_Distance"": 440, ""Club"": ""54 Degree Wedge""}"
"{""Record"": ""9d0ee576-bcd6-4ff0-a669-18bacdcdd2bf"", ""Hole"": 2, ""Golfer"": ""Sam"", ""Stroke"": 4, ""Par"": 4, ""Yards_To_Pin"": 434, ""Pin_Distance"": 440, ""Club"": ""52 Degree Wedge""}"
"{""Record"": ""19d3e1b8-38cf-45bb-90c6-45fd0d81ffcf"", ""Hole"": 2, ""Golfer"": ""Sam"", ""Stroke"": 5, ""Par"": 4, ""Yards_To_Pin"": 440, ""Pin_Distance"": 440, ""Club"": ""56 Degree Wedge""}"
"{""Record"": ""eaf7ea91-7fdd-404a-b9f6-0798f944072d"", ""Hole"": 3, ""Golfer"": ""Sam"", ""Stroke"": 1, ""Par"": 5, ""Yards_To_Pin"": 215, ""Pin_Distance"": 459, ""Club"": ""Driver""}"


In [0]:
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql.functions import from_json, col

json_schema = StructType() \
    .add("Record", StringType()) \
    .add("Hole", IntegerType()) \
    .add("Golfer", StringType()) \
    .add("Stroke", IntegerType()) \
    .add("Par", IntegerType()) \
    .add("Yards_To_Pin", IntegerType()) \
    .add("Pin_Distance", IntegerType()) \
    .add("Club", StringType()) 


df_cleaned = decoded_df.select(
    from_json(col("json_record"), json_schema).alias("data")
).select("data.*")

display(df_cleaned.limit(10))

Record,Hole,Golfer,Stroke,Par,Yards_To_Pin,Pin_Distance,Club
383ca1aa-1fba-4e30-8019-097a0096e684,1,Sam,1,3,119,200,8 Iron
aa225dc0-35dc-47be-a3fb-3c497883450d,1,Sam,2,3,139,200,52 Degree Wedge
100b8bd3-db76-456d-933d-5753e794ce62,1,Sam,3,3,162,200,Putter
9afd74e6-b45c-45d1-8cdd-85740fdd021a,1,Sam,4,3,200,200,Putter
3fd0f83c-dd56-4065-b686-8c7469804cf3,2,Sam,1,4,217,440,Driver
309d8c31-903e-4c76-8f5e-bd1f4092d670,2,Sam,2,4,323,440,6 Iron
41482f80-ee8e-47ba-8ff5-80c64e035277,2,Sam,3,4,390,440,54 Degree Wedge
9d0ee576-bcd6-4ff0-a669-18bacdcdd2bf,2,Sam,4,4,434,440,52 Degree Wedge
19d3e1b8-38cf-45bb-90c6-45fd0d81ffcf,2,Sam,5,4,440,440,56 Degree Wedge
eaf7ea91-7fdd-404a-b9f6-0798f944072d,3,Sam,1,5,215,459,Driver


In [0]:
# Write decoded dataframe to ADLS as JSON file
stream_writer = (df_cleaned.writeStream
         .format("parquet") 
         .option("path", container_path) # ADLS container
         .option("checkpointLocation", checkpoint_path)  # Fault tolerance location
         .outputMode("append")
         .start())

# Keep stream running until terminated
stream_writer.awaitTermination()