In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession  \
        .builder  \
        .appName('Spark Streaming with File')  \
        .config('spark.streaming.stopGracefullyOnShutdown', True)  \
        .master("local[*]")  \
        .getOrCreate()

spark

In [4]:
spark.conf.set('spark.sql.streaming.schemaInference', True)

# Creating the streaming_df to read from input directory
streaming_df = spark.readStream  \
                    .format("json")  \
                    .option("cleanSource", "archive")  \
                    .option("sourceArchiveDir", "archives\\device_data\\")  \
                    .option("maxFilesPerTrigger", 1)  \
                    .load("data\\input\\")

streaming_df

DataFrame[customerId: string, data: struct<devices:array<struct<deviceId:string,measure:string,status:string,temperature:bigint>>>, eventId: string, eventOffset: bigint, eventPublisher: string, eventTime: string]

In [7]:
streaming_df.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)



In [8]:
streaming_df.show(truncate=False)

AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[data\input\]

In [9]:
# to display the output with show, we should use read instead of readStream

non_streaming_df = spark.read  \
                    .format("json")  \
                    .option("cleanSource", "archive")  \
                    .option("sourceArchiveDir", "archives\\device_data\\")  \
                    .option("maxFilesPerTrigger", 1)  \
                    .load("data\\input\\")

non_streaming_df.show(truncate=False)

+----------+-----------------------------------------------------------------------+------------------------------------+-----------+--------------+--------------------------+
|customerId|data                                                                   |eventId                             |eventOffset|eventPublisher|eventTime                 |
+----------+-----------------------------------------------------------------------+------------------------------------+-----------+--------------+--------------------------+
|CI00119   |[[]]                                                                   |ba2ea9f4-a5d9-434e-8e4d-1c80c2d4b456|10000      |device        |2023-01-05 11:13:53.643364|
|CI00103   |[[[D001, C, ERROR, 15], [D002, C, SUCCESS, 16]]]                       |e3cb26d3-41b2-49a2-84f3-0156ed8d7502|10001      |device        |2023-01-05 11:13:53.643364|
|CI00104   |[[]]                                                                   |8c202190-bc24-4485-89ec-de78e602dd68

In [10]:
non_streaming_df.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)



In [12]:
from pyspark.sql.functions import explode, col

exploded_df = non_streaming_df.select("customerId", "eventId", "eventOffset", "eventPublisher", "eventTime", "data")  \
                                .withColumn("devices", explode('data.devices'))  \
                                .drop("data")

exploded_df.show(truncate=False)

+----------+------------------------------------+-----------+--------------+--------------------------+----------------------+
|customerId|eventId                             |eventOffset|eventPublisher|eventTime                 |devices               |
+----------+------------------------------------+-----------+--------------+--------------------------+----------------------+
|CI00103   |e3cb26d3-41b2-49a2-84f3-0156ed8d7502|10001      |device        |2023-01-05 11:13:53.643364|[D001, C, ERROR, 15]  |
|CI00103   |e3cb26d3-41b2-49a2-84f3-0156ed8d7502|10001      |device        |2023-01-05 11:13:53.643364|[D002, C, SUCCESS, 16]|
|CI00108   |aa90011f-3967-496c-b94b-a0c8de19a3d3|10003      |device        |2023-01-05 11:13:53.643364|[D004, C, SUCCESS, 16]|
|CI00106   |804e8fa3-307b-482e-b629-af880c52e884|10005      |device        |2023-01-05 11:13:53.643364|[D002, C, ERROR, 30]  |
|CI00106   |804e8fa3-307b-482e-b629-af880c52e884|10005      |device        |2023-01-05 11:13:53.643364|[D001, C

In [13]:
exploded_df.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- devices: struct (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- measure: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- temperature: long (nullable = true)



In [14]:
flattened_df = exploded_df.selectExpr("customerId", "eventId", "eventOffset", "eventPublisher", "eventTime", 
                                     "devices.deviceId as deviceId", "devices.measure as measure",
                                      "devices.status as status", "devices.temperature as temperature"
                                     )
flattened_df.show(truncate=False)

+----------+------------------------------------+-----------+--------------+--------------------------+--------+-------+-------+-----------+
|customerId|eventId                             |eventOffset|eventPublisher|eventTime                 |deviceId|measure|status |temperature|
+----------+------------------------------------+-----------+--------------+--------------------------+--------+-------+-------+-----------+
|CI00103   |e3cb26d3-41b2-49a2-84f3-0156ed8d7502|10001      |device        |2023-01-05 11:13:53.643364|D001    |C      |ERROR  |15         |
|CI00103   |e3cb26d3-41b2-49a2-84f3-0156ed8d7502|10001      |device        |2023-01-05 11:13:53.643364|D002    |C      |SUCCESS|16         |
|CI00108   |aa90011f-3967-496c-b94b-a0c8de19a3d3|10003      |device        |2023-01-05 11:13:53.643364|D004    |C      |SUCCESS|16         |
|CI00106   |804e8fa3-307b-482e-b629-af880c52e884|10005      |device        |2023-01-05 11:13:53.643364|D002    |C      |ERROR  |30         |
|CI00106   |8

In [15]:
flattened_df.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: long (nullable = true)



Streaming source and target

In [5]:
spark.conf.set('spark.sql.streaming.schemaInference', True)

# Creating the streaming_df to read from input directory
streaming_df = spark.readStream  \
                    .format("json")  \
                    .option("cleanSource", "archive")  \
                    .option("sourceArchiveDir", "archives\\device_data\\")  \
                    .option("maxFilesPerTrigger", 1)  \
                    .load("data\\input\\")

streaming_df

DataFrame[customerId: string, data: struct<devices:array<struct<deviceId:string,measure:string,status:string,temperature:bigint>>>, eventId: string, eventOffset: bigint, eventPublisher: string, eventTime: string]

In [6]:
from pyspark.sql.functions import explode, col

exploded_df = streaming_df.select("customerId", "eventId", "eventOffset", "eventPublisher", "eventTime", "data")  \
                                .withColumn("devices", explode('data.devices'))  \
                                .drop("data")

In [7]:
flattened_df = exploded_df.selectExpr("customerId", "eventId", "eventOffset", "eventPublisher", "eventTime", 
                                     "devices.deviceId as deviceId", "devices.measure as measure",
                                      "devices.status as status", "devices.temperature as temperature"
                                     )

In [None]:
# Write the output to console sink to check the output

writing_df = flattened_df.writeStream  \
                        .format('json')  \
                        .option('path', 'output\\device_data\\')  \
                        .option("checkpointLocation", "checkpoint_dir")  \
                        .outputMode("append")  \
                        .start()

# Start the streaming application to run until the following happens
# 1. Exception in the running program
# 2. Manual Interruption
writing_df.awaitTermination()

In [4]:
# Check the data at the output location

out_df = spark.read.json("output\\device_data\\")
out_df.show(truncate=False)

AnalysisException: Unable to infer schema for JSON at . It must be specified manually;

In [5]:
# Courtesy : https://github.com/subhamkharwal/ease-with-apache-spark/blob/master/32_spark_streaming_read_from_files.ipynb

# https://urlit.me/blog/pyspark-structured-streaming-read-from-files/