In [0]:
from pyspark.sql.functions import from_json, col, to_timestamp , to_date, avg
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, unbase64, from_json


In [None]:
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "",
"fs.azure.account.oauth2.client.secret": '',
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant-id>/oauth2/token"}

dbutils.fs.mount(
source = "abfss://bronze@iotstorageanalytics.dfs.core.windows.net",
mount_point = "/mnt/iotdata",
extra_configs = configs)

In [0]:
dbutils.fs.ls("/mnt/iotdata/IoT-event-hub/03/2025/01/03/20/")

[FileInfo(path='dbfs:/mnt/iotdata/IoT-event-hub/03/2025/01/03/20/12.json', name='12.json', size=4255, modificationTime=1735935221000),
 FileInfo(path='dbfs:/mnt/iotdata/IoT-event-hub/03/2025/01/03/20/21.json', name='21.json', size=6844, modificationTime=1735935761000),
 FileInfo(path='dbfs:/mnt/iotdata/IoT-event-hub/03/2025/01/03/20/23.json', name='23.json', size=5377, modificationTime=1735935881000),
 FileInfo(path='dbfs:/mnt/iotdata/IoT-event-hub/03/2025/01/03/20/25.json', name='25.json', size=7822, modificationTime=1735936001000),
 FileInfo(path='dbfs:/mnt/iotdata/IoT-event-hub/03/2025/01/03/20/27.json', name='27.json', size=3421, modificationTime=1735936121000)]

In [0]:
bronzeDF = spark.read.json("/mnt/iotdata/IoT-event-hub/03/2025/01/03/20/*.json")

In [0]:

schema_of_payload = StructType([
    StructField("deviceId", StringType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", DoubleType(), True),
    StructField("timestamp", StringType(), True)
])


In [0]:
bronzeDF = spark.read.json("/mnt/iotdata/IoT-event-hub/03/2025/01/03/20/*.json")


decodedDF = bronzeDF.withColumn("decodedBody", unbase64(col("Body")).cast("string"))


parsedDF = decodedDF.withColumn("payload", from_json(col("decodedBody"), schema_of_payload))

parsedDF = parsedDF.select(
    col("EnqueuedTimeUtc"),
    col("SystemProperties"),
    col("payload.deviceId"),
    col("payload.temperature"),
    col("payload.humidity"),
    col("payload.timestamp")
)

In [0]:
silverDF = parsedDF \
    .withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ss'Z'")) \
    .filter(col("temperature").isNotNull() & col("humidity").isNotNull())

In [0]:

silverDF.write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/iotdata/silver/")

In [0]:
silverDF = spark.read.format("delta").load("/mnt/iotdata/silver/")

In [0]:
silverDF = silverDF.drop("SystemProperties")
silverDF = silverDF.drop("timestamp")

filteredDF = silverDF.filter(col("temperature") < 60)

filteredDF.write.format("delta") \
    .mode("overwrite") \
    .save("/mnt/iotdata/gold/")

In [0]:
goldDF = spark.read.format("delta").load("/mnt/iotdata/gold/")

goldDF.show(truncate=False)

+----------------------------+-----------------+-----------+--------+
|EnqueuedTimeUtc             |deviceId         |temperature|humidity|
+----------------------------+-----------------+-----------+--------+
|2025-01-03T20:11:19.4390000Z|mySimulatedDevice|23.5       |56.0    |
|2025-01-03T20:11:21.2520000Z|mySimulatedDevice|23.5       |56.0    |
|2025-01-03T20:11:22.7520000Z|mySimulatedDevice|23.5       |56.0    |
|2025-01-03T20:11:24.1110000Z|mySimulatedDevice|23.5       |56.0    |
|2025-01-03T20:11:25.5020000Z|mySimulatedDevice|23.5       |56.0    |
|2025-01-03T20:11:26.7990000Z|mySimulatedDevice|23.5       |56.0    |
|2025-01-03T20:12:37.2050000Z|mySimulatedDevice|23.5       |56.0    |
|2025-01-03T20:12:38.5020000Z|mySimulatedDevice|23.5       |56.0    |
|2025-01-03T20:12:39.6270000Z|mySimulatedDevice|23.5       |56.0    |
|2025-01-03T20:20:49.9680000Z|mySimulatedDevice|27.18      |45.67   |
|2025-01-03T20:20:55.0930000Z|mySimulatedDevice|24.56      |58.51   |
|2025-01-03T20:21:00