In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import os

In [None]:
kafka_server = os.getenv("kafka_server")
kafka_topic=os.getenv("kafka_topic")
kafka_hub=os.getenv("kafka_hub")
sasl_key= os.getenv("sasl_key")

In [0]:
df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", kafka_server)\
    .option("subscribe",kafka_topic)\
    .option("kafka.sasl.mechanism", "PLAIN")\
    .option("kafka.security.protocol", "SASL_SSL")\
    .option("kafka.sasl.jaas.config", 
            'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required '
            'username="$ConnectionString" '
            'password= f"Endpoint=sb://{kafka_hub}.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={sasl_key}=";')\
    .option("startingOffsets", "earliest")\
    .load()


In [0]:
from pyspark.sql.types import *

schema = StructType([
    StructField("status", StringType(), True),
    StructField("data", StructType([
        StructField("aqi", StringType(), True),
        StructField("idx", StringType(), True),
        StructField("attributions", ArrayType(
            StructType([
                StructField("url", StringType(), True),
                StructField("name", StringType(), True),
                StructField("logo", StringType(), True)
            ])
        ), True),
        StructField("city", StructType([
            StructField("geo", ArrayType(StringType()), True),
            StructField("name", StringType(), True),
            StructField("url", StringType(), True),
            StructField("location", StringType(), True)
        ]), True),
        StructField("dominentpol", StringType(), True),
        StructField("iaqi", StructType([
            StructField("co", StructType([StructField("v", StringType(), True)]), True),
            StructField("dew", StructType([StructField("v", StringType(), True)]), True),
            StructField("h", StructType([StructField("v", StringType(), True)]), True),
            StructField("no2", StructType([StructField("v", StringType(), True)]), True),
            StructField("o3", StructType([StructField("v", StringType(), True)]), True),
            StructField("p", StructType([StructField("v", StringType(), True)]), True),
            StructField("pm10", StructType([StructField("v", StringType(), True)]), True),
            StructField("pm25", StructType([StructField("v", StringType(), True)]), True),
            StructField("so2", StructType([StructField("v", StringType(), True)]), True),
            StructField("t", StructType([StructField("v", StringType(), True)]), True),
            StructField("w", StructType([StructField("v", StringType(), True)]), True),
            StructField("wg", StructType([StructField("v", StringType(), True)]), True)
        ]), True),
        StructField("time", StructType([
            StructField("s", StringType(), True),
            StructField("tz", StringType(), True),
            StructField("v", StringType(), True),
            StructField("iso", StringType(), True)
        ]), True),
        StructField("forecast", StructType([
            StructField("daily", StructType([
                StructField("pm10", ArrayType(
                    StructType([
                        StructField("avg", StringType(), True),
                        StructField("day", StringType(), True),
                        StructField("max", StringType(), True),
                        StructField("min", StringType(), True)
                    ])
                ), True),
                StructField("pm25", ArrayType(
                    StructType([
                        StructField("avg", StringType(), True),
                        StructField("day", StringType(), True),
                        StructField("max", StringType(), True),
                        StructField("min", StringType(), True)
                    ])
                ), True),
                StructField("uvi", ArrayType(
                    StructType([
                        StructField("avg", StringType(), True),
                        StructField("day", StringType(), True),
                        StructField("max", StringType(), True),
                        StructField("min", StringType(), True)
                    ])
                ), True)
            ]), True)
        ]), True),
        StructField("debug", StructType([
            StructField("sync", StringType(), True)
        ]), True)
    ]), True)
])


In [0]:
df_json =   df.select(
    from_json(col("value").cast("string"), schema).alias("df_json")
)

In [0]:
df_flat = df_json.select(
    col("df_json.data.aqi"),
    col("df_json.data.idx"),
    col("df_json.data.city.name").alias("city_name"),
    col("df_json.data.dominentpol").alias("dominant_pollutant"),
    col("df_json.data.iaqi.co.v").alias("co"),
    col("df_json.data.iaqi.dew.v").alias("dew"),
    col("df_json.data.iaqi.h.v").alias("humidity"),
    col("df_json.data.iaqi.no2.v").alias("no2"),
    col("df_json.data.iaqi.o3.v").alias("o3"),
    col("df_json.data.iaqi.p.v").alias("pressure"),
    col("df_json.data.iaqi.pm10.v").alias("pm10"),
    col("df_json.data.iaqi.pm25.v").alias("pm25"),
    col("df_json.data.iaqi.so2.v").alias("so2"),
    col("df_json.data.iaqi.t.v").alias("temperature"),
    col("df_json.data.iaqi.w.v").alias("wind_speed"),
    col("df_json.data.iaqi.wg.v").alias("wind_gust"),
    col("df_json.data.time.s").alias("time_s")
)

In [0]:
df_clean = df_flat.na.drop()
df_clean.display()

Stopping: 4739c0fe-8bb3-4385-94a5-b6050c2cca20


In [0]:
df_clean.writeStream.format("delta")\
    .option("checkpointLocation", "/Volumes/kafka_azure_databricks/bronze/checkpoint")\
    .trigger(once=True)\
    .outputMode("append")\
    .toTable('kafka_azure_databricks.bronze.weatherTable')


In [0]:
spark.sql("SHOW TABLES IN kafka_azure_databricks.bronze").select("tableName").display()


tableName
weathertable
display_query_1


In [0]:
%sql
SELECT * FROM kafka_azure_databricks.bronze.weathertable;


aqi,idx,city_name,dominant_pollutant,co,dew,humidity,no2,o3,pressure,pm10,pm25,so2,temperature,wind_speed,wind_gust,time_s
51,11847,"SIDCO Kurichi, Coimbatore, India",pm10,2.7,20,52.47,2.6,2.6,800.3,51,50,9.1,34.0,5.41,12.3,2025-07-30 15:00:00
68,10111,"Major Dhyan Chand National Stadium, Delhi, Delhi, India",pm25,3.2,28,62.75,10.4,4.0,973.05,37,68,5.2,31.45,1.0,10.2,2025-07-30 15:00:00
70,11313,"Bhopal Chauraha, Dewas, India",pm25,2.9,23,36.0,13.9,10.1,1004.0,66,70,26.3,30.3,1.0,9.7,2025-07-30 15:00:00
41,14152,"Mendonsa Colony, Dindigul, India",pm25,0.5,20,46.44,0.1,3.3,969.02,23,41,8.6,38.0,3.25,11.8,2025-07-30 15:00:00
94,13731,"IIPHG Lekawada, Gandhinagar, India",pm25,4.4,23,87.24,3.0,14.2,1002.7,70,94,4.9,29.0,0.58,7.7,2025-07-30 15:00:00
35,14697,"Zero Point GICI, Gangtok, India",pm25,1.9,17,97.16,1.1,18.2,616.48,2,35,0.3,20.0,0.33,7.2,2025-07-30 15:00:00
55,11899,"Sanjay Nagar, Ghaziabad, India",pm25,5.7,28,57.0,11.4,14.4,738.0,46,55,7.7,29.1,2.1,10.2,2025-07-30 15:00:00
96,11843,"GIDC, Ankleshwar, India",pm25,3.8,25,79.79,10.1,28.5,1000.0,52,96,20.3,31.84,3.33,8.7,2025-07-30 15:00:00
70,13874,"New DM Office, Arrah, India",pm25,3.1,27,88.0,2.5,3.2,997.0,41,70,2.9,26.675,0.55,9.2,2025-07-30 14:00:00
32,14700,"Gurdeo Nagar, Aurangabad, India",pm25,0.6,27,79.0,2.9,3.8,997.0,16,32,3.5,27.4,1.5,9.2,2025-07-30 15:00:00


In [0]:
%sql
INSERT INTO kafka_azure_databricks.bronze.weathertable (
  aqi, idx, city_name, dominant_pollutant, co, dew, humidity, no2, o3, pressure,
  pm10, pm25, so2, temperature, wind_speed, wind_gust, time_s
)
VALUES (
  59, 8192, 'Maninagar, Ahmedabad, India', 'pm25', 5.6, 23, 95.000, 6.7, 7.8, 1001,
  56, 59, 13.3, 28, 0.54, 7.7,
  TIMESTAMP('2025-08-01T15:00:00.000Z')
);


num_affected_rows,num_inserted_rows
1,1


In [0]:
%sql
select * from kafka_azure_databricks.bronze.weathertable

aqi,idx,city_name,dominant_pollutant,co,dew,humidity,no2,o3,pressure,pm10,pm25,so2,temperature,wind_speed,wind_gust,time_s
51,11847,"SIDCO Kurichi, Coimbatore, India",pm10,2.7,20,52.47,2.6,2.6,800.3,51,50,9.1,34.0,5.41,12.3,2025-07-30 15:00:00
68,10111,"Major Dhyan Chand National Stadium, Delhi, Delhi, India",pm25,3.2,28,62.75,10.4,4.0,973.05,37,68,5.2,31.45,1.0,10.2,2025-07-30 15:00:00
70,11313,"Bhopal Chauraha, Dewas, India",pm25,2.9,23,36.0,13.9,10.1,1004.0,66,70,26.3,30.3,1.0,9.7,2025-07-30 15:00:00
41,14152,"Mendonsa Colony, Dindigul, India",pm25,0.5,20,46.44,0.1,3.3,969.02,23,41,8.6,38.0,3.25,11.8,2025-07-30 15:00:00
94,13731,"IIPHG Lekawada, Gandhinagar, India",pm25,4.4,23,87.24,3.0,14.2,1002.7,70,94,4.9,29.0,0.58,7.7,2025-07-30 15:00:00
35,14697,"Zero Point GICI, Gangtok, India",pm25,1.9,17,97.16,1.1,18.2,616.48,2,35,0.3,20.0,0.33,7.2,2025-07-30 15:00:00
55,11899,"Sanjay Nagar, Ghaziabad, India",pm25,5.7,28,57.0,11.4,14.4,738.0,46,55,7.7,29.1,2.1,10.2,2025-07-30 15:00:00
96,11843,"GIDC, Ankleshwar, India",pm25,3.8,25,79.79,10.1,28.5,1000.0,52,96,20.3,31.84,3.33,8.7,2025-07-30 15:00:00
70,13874,"New DM Office, Arrah, India",pm25,3.1,27,88.0,2.5,3.2,997.0,41,70,2.9,26.675,0.55,9.2,2025-07-30 14:00:00
32,14700,"Gurdeo Nagar, Aurangabad, India",pm25,0.6,27,79.0,2.9,3.8,997.0,16,32,3.5,27.4,1.5,9.2,2025-07-30 15:00:00
