In [3]:
# create spark session

from pyspark.sql import SparkSession

spark=(
    SparkSession
    .builder
    .appName("Streaming from Kafka")
    .config("spark.streaming.stopGracefullyonshutdown", True)
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
    .config("spark.sql.shuffle.partitions",4)
    .master("local[*]")
    .getOrCreate()
    )
    
spark


In [4]:
# create the kafka_df to read form kafka

kafka_df = (
    spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers","ed-kafka:29092")
    .option("subscribe","device-data")
    .option("startingoffsets","earliest")
    .load()

)

In [6]:
# view  schema for raw kafka_df

kafka_df.printSchema()
kafka_df.show()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|                  []|device-data|        0|     0|2026-02-03 12:21:...|            0|
|null|[7B 22 65 76 65 6...|device-data|        0|     1|2026-02-03 12:21:...|            0|
+----+--------------------+-----------+---------+------+--------------------+-------------+



In [10]:
# parse value from binary to string into kafka_json_df

from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value",expr("cast(value as string)"))

In [13]:
kafka_json_df.show()

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|                    |device-data|        0|     0|2026-02-03 12:21:...|            0|
|null|{"eventId": "ba2e...|device-data|        0|     1|2026-02-03 12:21:...|            0|
+----+--------------------+-----------+---------+------+--------------------+-------------+



In [15]:

from pyspark.sql.types import StringType, StructField, StructType, ArrayType,  LongType

json_schema = (
    StructType(
    [StructField('customerId',StringType(), True),
    StructField('data', StructType(
        [StructField('devices',
                    ArrayType(StructType([
                    StructField('deviceId', StringType(), True),
                    StructField('measure', StringType(), True),
                    StructField('status', StringType(), True),
                    StructField('temperature',LongType(), True)
                ]), True), True)
        ]), True),
StructField('eventId', StringType(), True),
StructField('eventoffset', LongType(), True),
StructField('eventPublisher', StringType(), True),
StructField('eventTime', StringType(), True)
])
)



In [17]:

# Apply the schema to payload to read the data

from pyspark.sql.functions import from_json, col

streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema))

In [18]:
# To the schema of the data, place a sample json file and change readStream to read 

streaming_df.printSchema()
streaming_df.show(truncate=False)

root
 |-- key: binary (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- values_json: struct (nullable = true)
 |    |-- customerId: string (nullable = true)
 |    |-- data: struct (nullable = true)
 |    |    |-- devices: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |    |-- measure: string (nullable = true)
 |    |    |    |    |-- status: string (nullable = true)
 |    |    |    |    |-- temperature: long (nullable = true)
 |    |-- eventId: string (nullable = true)
 |    |-- eventoffset: long (nullable = true)
 |    |-- eventPublisher: string (nullable = true)
 |    |-- eventTime: string (nullable = true)

+----+-------------------------------------------------

In [19]:
streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")

In [20]:
streaming_df.printSchema()
streaming_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventoffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)

+----------+----+------------------------------------+-----------+--------------+--------------------------+
|customerId|data|eventId                             |eventoffset|eventPublisher|eventTime                 |
+----------+----+------------------------------------+-----------+--------------+--------------------------+
|null      |null|null                                |null       |null          |null                      |

In [27]:
# Lets explode the data as devices contains List/array of device reading

from pyspark.sql.functions import explode

explode_df = streaming_df.withColumn("data_devices", explode("data.devices"))

In [28]:
# check the schema of the explode_df, place a sample json file and change readStream to read

explode_df.printSchema()


root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventoffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- data_devices: struct (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- measure: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- temperature: long (nullable = true)



In [29]:
# flatten the explode df
from pyspark.sql.functions import col
 
flattened_df = (
    explode_df
    .drop("data")
    .withColumn("deviceId", col("data_devices.deviceId"))
    .withColumn("measure", col("data_devices.measure"))
    .withColumn("status", col("data_devices.status"))
    .withColumn("temperature", col("data_devices.temperature"))
    .drop("data_devices")

)

In [30]:
# check the schema of the flattened_df, place a sample json file and change readStream to read

flattened_df.printSchema()
flattened_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventoffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: long (nullable = true)

+----------+-------+-----------+--------------+---------+--------+-------+------+-----------+
|customerId|eventId|eventoffset|eventPublisher|eventTime|deviceId|measure|status|temperature|
+----------+-------+-----------+--------------+---------+--------+-------+------+-----------+
+----------+-------+-----------+--------------+---------+--------+-------+------+-----------+



In [31]:
# create the kafka_df to read form kafka stream

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","ed-kafka:29092")
    .option("subscribe","device-data")
    .option("startingoffsets","earliest")
    .load()

)

In [32]:
kafka_df.printSchema()


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [33]:
# parse value from binary to string into kafka_json_df

from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value",expr("cast(value as string)"))

In [34]:

from pyspark.sql.types import StringType, StructField, StructType, ArrayType,  LongType

json_schema = (
    StructType(
    [StructField('customerId',StringType(), True),
    StructField('data', StructType(
        [StructField('devices',
                    ArrayType(StructType([
                    StructField('deviceId', StringType(), True),
                    StructField('measure', StringType(), True),
                    StructField('status', StringType(), True),
                    StructField('temperature',LongType(), True)
                ]), True), True)
        ]), True),
StructField('eventId', StringType(), True),
StructField('eventoffset', LongType(), True),
StructField('eventPublisher', StringType(), True),
StructField('eventTime', StringType(), True)
])
)



In [35]:

# Apply the schema to payload to read the data

from pyspark.sql.functions import from_json, col

streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema))

In [36]:
# To the schema of the data, place a sample json file and change readStream to read 

streaming_df.printSchema()


root
 |-- key: binary (nullable = true)
 |-- value: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- values_json: struct (nullable = true)
 |    |-- customerId: string (nullable = true)
 |    |-- data: struct (nullable = true)
 |    |    |-- devices: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |    |-- measure: string (nullable = true)
 |    |    |    |    |-- status: string (nullable = true)
 |    |    |    |    |-- temperature: long (nullable = true)
 |    |-- eventId: string (nullable = true)
 |    |-- eventoffset: long (nullable = true)
 |    |-- eventPublisher: string (nullable = true)
 |    |-- eventTime: string (nullable = true)



In [37]:
streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")

In [38]:
streaming_df.printSchema()


root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventoffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)



In [40]:
# Lets explode the data as devices contains List/array of device reading

from pyspark.sql.functions import explode

explode_df = streaming_df.withColumn("data_devices", explode("data.devices"))

explode_df.printSchema()


root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventoffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- data_devices: struct (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- measure: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- temperature: long (nullable = true)



In [42]:
# flatten the explode df
from pyspark.sql.functions import col
 
flattened_df = (
    explode_df
    .drop("data")
    .withColumn("deviceId", col("data_devices.deviceId"))
    .withColumn("measure", col("data_devices.measure"))
    .withColumn("status", col("data_devices.status"))
    .withColumn("temperature", col("data_devices.temperature"))
    .drop("data_devices")

)

flattened_df.printSchema()


root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventoffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: long (nullable = true)



In [None]:
# # write the output to console to check the output

(flattened_df
.writeStream
.format("console")
.outputMode("append")
.option("checkpointlocation","checkpoint_dir_kafka")
.start()
.awaitTermination())


