In [1]:
spark.stop()

25/05/22 13:32:11 INFO  SparkContext:60 SparkContext is stopping with exitCode 0.
25/05/22 13:32:11 INFO  SparkUI:60 Stopped Spark web UI at http://6af08edc54fe:4040
25/05/22 13:32:07 INFO  StandaloneSchedulerBackend:60 Shutting down all executors
25/05/22 13:32:07 INFO  StandaloneSchedulerBackend$StandaloneDriverEndpoint:60 Asking each executor to shut down
25/05/22 13:32:07 INFO  MapOutputTrackerMasterEndpoint:60 MapOutputTrackerMasterEndpoint stopped!
25/05/22 13:32:07 INFO  MemoryStore:60 MemoryStore cleared
25/05/22 13:32:07 INFO  BlockManager:60 BlockManager stopped
25/05/22 13:32:07 INFO  BlockManagerMaster:60 BlockManagerMaster stopped
25/05/22 13:32:07 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:60 OutputCommitCoordinator stopped!
25/05/22 13:32:07 INFO  SparkContext:60 Successfully stopped SparkContext


In [2]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Streaming Process Files") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .getOrCreate()
)

spark

25/05/22 13:32:07 INFO  SparkContext:60 Running Spark version 3.5.5
25/05/22 13:32:07 INFO  SparkContext:60 OS info Linux, 5.15.167.4-microsoft-standard-WSL2, amd64
25/05/22 13:32:07 INFO  SparkContext:60 Java version 17.0.14
Setting Spark log level to "INFO".
25/05/22 13:32:07 INFO  ResourceUtils:60 No custom resources configured for spark.driver.
25/05/22 13:32:07 INFO  SparkContext:60 Submitted application: Streaming Process Files
25/05/22 13:32:07 INFO  ResourceProfile:60 Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 4, script: , vendor: , memory -> name: memory, amount: 2048, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
25/05/22 13:32:07 INFO  ResourceProfile:60 Limiting resource is cpus at 4 tasks per executor
25/05/22 13:32:07 INFO  ResourceProfileManager:60 Added ResourceProfile id: 0
25/05/22 13:32:07 INFO  SecurityManager:60 Changing view acls to: roo

In [3]:
# To allow automatic schemaInference while reading
spark.conf.set("spark.sql.streaming.schemaInference", True)

# Create the streaming_df to read from input directory
streaming_df = (
    spark
    .readStream
    .option("cleanSource", "archive")
    .option("sourceArchiveDir", "/opt/spark/spark-archive")
    .option("maxFilesPerTrigger", 1)
    .format("json")
    .load("/opt/spark/datasets/devices/*.json")
)

25/05/22 13:32:09 INFO  InMemoryFileIndex:60 It took 52 ms to list leaf files for 1 paths.
25/05/22 13:32:09 INFO  InMemoryFileIndex:60 It took 10 ms to list leaf files for 1 paths.
25/05/22 13:32:10 INFO  StandaloneSchedulerBackend$StandaloneDriverEndpoint:60 Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.18.0.11:41004) with ID 0,  ResourceProfileId 0
25/05/22 13:32:10 INFO  StandaloneSchedulerBackend$StandaloneDriverEndpoint:60 Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.18.0.13:54124) with ID 1,  ResourceProfileId 0
25/05/22 13:32:10 INFO  StandaloneSchedulerBackend$StandaloneDriverEndpoint:60 Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.18.0.12:46236) with ID 2,  ResourceProfileId 0
25/05/22 13:32:10 INFO  BlockManagerMasterEndpoint:60 Registering block manager 172.18.0.11:33849 with 1048.8 MiB RAM, BlockManagerId(0, 172.18.0.11, 33849, None)
25/05/22 13:32:10 INFO  BlockManagerMasterEndpoint:60 Registering b

In [4]:
# To the schema of the data, place a sample json file and change readStream to read 
streaming_df.printSchema()
# streaming_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)



In [5]:
# Lets explode the data as devices contains list/array of device reading
from pyspark.sql.functions import explode

exploded_df = streaming_df.withColumn("data_devices", explode("data.devices"))


In [6]:
# Check the schema of the exploded_df, place a sample json file and change readStream to read 
exploded_df.printSchema()
#exploded_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- data_devices: struct (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- measure: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- temperature: long (nullable = true)



In [7]:
# Flatten the exploded df
from pyspark.sql.functions import col

flattened_df = (
    exploded_df
    .drop("data")
    .withColumn("deviceId", col("data_devices.deviceId"))
    .withColumn("measure", col("data_devices.measure"))
    .withColumn("status", col("data_devices.status"))
    .withColumn("temperature", col("data_devices.temperature"))
    .drop("data_devices")
)


In [8]:
# Check the schema of the flattened_df, place a sample json file and change readStream to read 
flattened_df.printSchema()
#flattened_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: long (nullable = true)



In [9]:
# Write the output to console sink to check the output

(flattened_df
 .writeStream
 .format("csv")
 .outputMode("append")
 .option("path", "/opt/spark/datasets/output/device_data.csv")
 .option("checkpointLocation", "/opt/spark/spark-checkpoint")
 .start()
 .awaitTermination())

25/05/22 13:32:14 INFO  BlockManagerInfo:60 Removed broadcast_0_piece0 on 6af08edc54fe:42885 in memory (size: 35.2 KiB, free: 434.4 MiB)
25/05/22 13:32:14 INFO  BlockManagerInfo:60 Removed broadcast_0_piece0 on 172.18.0.12:40385 in memory (size: 35.2 KiB, free: 1048.8 MiB)
25/05/22 13:32:14 INFO  BlockManagerInfo:60 Removed broadcast_1_piece0 on 6af08edc54fe:42885 in memory (size: 8.0 KiB, free: 434.4 MiB)
25/05/22 13:32:14 INFO  BlockManagerInfo:60 Removed broadcast_1_piece0 on 172.18.0.12:40385 in memory (size: 8.0 KiB, free: 1048.8 MiB)
25/05/22 13:32:14 INFO  StateStoreCoordinatorRef:60 Registered StateStoreCoordinator endpoint
25/05/22 13:32:14 INFO  ResolveWriteToStream:60 Checkpoint root /opt/spark/spark-checkpoint resolved to file:/opt/spark/spark-checkpoint.
25/05/22 13:32:14 WARN  ResolveWriteToStream:72 spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/05/22 13:32:14 INFO  CheckpointFileManager:60 Writing atomically to file

KeyboardInterrupt: 

25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.
25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.
25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.


In [None]:
# Check the data at the output location



25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.
25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.
25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.
25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.
25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.
25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.
25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.
25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.
25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.
25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.
25/05/22 13:34:58 INFO  InMemoryFileIndex:60 It took 0 ms to list leaf files for 0 paths.
25/05/22 1