In [0]:
#imports
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.functions  import from_unixtime
from pyspark.sql.functions  import to_date
from pyspark.sql import Row
from pyspark.sql.functions import to_json, struct
from pyspark.sql import functions as F

In [0]:
#Creating the schema for the vehicle data json structure
jsonschema = StructType() \
.add("id", StringType()) \
.add("timestamp", TimestampType()) \
.add("rpm", IntegerType()) \
.add("speed", IntegerType()) \
.add("kms", IntegerType())

In [0]:
# We can use to this to reset the offset from where we want to start reading data from kafak provided data in that offset is available in Kafka Source
offset = '''
  {
  "VehicleDetails":{"0": 1}
  }
'''

In [0]:
#Reading data from kafka source
kafkaDF = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "20.172.199.135:9092") \
.option("subscribe", "VehicleDetails") \
.option("group.id", "Kafka-Demo") \
.option("startingOffsets","latest" ) \
.load()

In [0]:
#Converting binary datatype to string for the dataframe columns. Without this you cannot use from_json function as it expects the column datatype as string not binary
newkafkaDF=kafkaDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
display(newkafkaDF.limit(1))

key,value
,"{""id"": ""e335803a-5237-4eaf-bb70-f3172b445dc6"", ""timestamp"": ""2022-11-24 12:27:14.133936"", ""rpm"": 26, ""speed"": 77, ""kms"": 727}"


In [0]:
#Adding new column vehiclejson which is a struct and has 5 columns id, timestamp,rpm,speed and kms
newkafkaDF=newkafkaDF.withColumn('vehiclejson', from_json(col('value'),schema=jsonschema))

display(newkafkaDF.limit(3))

key,value,vehiclejson
,"{""id"": ""a49d8a19-59da-4d2f-bb3a-ffad909ea4e1"", ""timestamp"": ""2022-11-24 12:30:10.729934"", ""rpm"": 58, ""speed"": 73, ""kms"": 870}","List(a49d8a19-59da-4d2f-bb3a-ffad909ea4e1, 2022-11-24T12:30:10.729+0000, 58, 73, 870)"
,"{""id"": ""71c9227c-6028-4aa3-a674-9dfdbf54bc7d"", ""timestamp"": ""2022-11-24 12:30:11.388848"", ""rpm"": 50, ""speed"": 89, ""kms"": 211}","List(71c9227c-6028-4aa3-a674-9dfdbf54bc7d, 2022-11-24T12:30:11.388+0000, 50, 89, 211)"
,"{""id"": ""d3997c20-358e-4133-a2f4-133064f5028d"", ""timestamp"": ""2022-11-24 12:30:11.609593"", ""rpm"": 70, ""speed"": 100, ""kms"": 798}","List(d3997c20-358e-4133-a2f4-133064f5028d, 2022-11-24T12:30:11.609+0000, 70, 100, 798)"
