##**Sending Weather Data from Weather API to Azure Event Hub for every 30seconds**

In [0]:
from azure.eventhub import EventHubProducerClient, EventData
import json
import requests
from datetime import datetime, timedelta

#EventHub configuration:
eventhub_connection_string = dbutils.secrets.get(scope="key-vault-scope",key="eventhub-connection-string")
eventhub_name = "weatherstreameventhub"
weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")


#Initilising the EventHub Producer

#Functions to send Events to EventHub
def send_event(event):
    producer = EventHubProducerClient.from_connection_string(conn_str=eventhub_connection_string, eventhub_name = eventhub_name)
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)
    producer.close()

#function to handle the API response:
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error: {response.status_code},{response.text}")
  
#Function to get current weather and air quality data:
def get_current_weather(base_url,weatherapikey,location):
    current_weather_url = base_url + "/current.json"
    params = {
        'key':weatherapikey,
        'q': location,
        'aqi': 'yes'
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)
# Function to get forcast data
def get_forecast_weather(base_url,weatherapikey,location,days):
    forecast_weather_url = base_url + "/forecast.json"
    params = {
        'key':weatherapikey,
        'q':location,
        'days':days,
    }
    response = requests.get(forecast_weather_url, params = params)
    return handle_response(response)

#Functions to get alerts:
def get_alerts(base_url,weatherapikey,location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        'key': weatherapikey,
        'q':location,
        'alerts':'yes'
    }
    response = requests.get(alerts_url, params = params)
    return handle_response(response)   

#Merge and Flatten Data:
def flatten_data(current_weather,forecast_weather,alerts):
    location_data = current_weather.get("location",{})
    current = current_weather.get("current",{})
    condition = current.get("condition",{})
    air_quality = current.get("air_quality",{})
    forecast = forecast_weather.get("forecast",{}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
     'name': location_data.get('name'),
     'region': location_data.get('region'),
     'country': location_data.get('country'),
     'lat': location_data.get('lat'),
     'lon': location_data.get('lon'),
     'localtime': location_data.get('localtime'),
     'temp_c': current.get('temp_c'),
     'is_day': current.get('is_day'),
     'condition_text': condition.get('text'),
     'condition_icon': condition.get('icon'),
     'wind_kph': current.get('wind_kph'),
     'wind_degree': current.get('wind_degree'),
     'wind_dir': current.get('wind_dir'),
     'pressure_in': current.get('pressure_in'),
     'precip_in': current.get('precip_in'),
     'humidity': current.get('humidity'),
     'cloud': current.get('cloud'),
     'feelslike_c': current.get('feelslike_c'),
     'uv': current.get('uv'),
     'air_quality':{
         'co': air_quality.get('co'),
         'no2': air_quality.get('no2'),
         'o3': air_quality.get('o3'),
         'so2': air_quality.get('so2'),
         'pm2_5': air_quality.get('pm2_5'),
         'pm10': air_quality.get('pm10'),
         'us-epa-index': air_quality.get('us-epa-index'),
         'gb-defra-index': air_quality.get('gb-defra-index') 
         
     },
     'alerts' : [
        {
            'headline': alert.get('headline'),
            'severity': alert.get('severity'),
            'description': alert.get('desc'),
            'instruction':alert.get('instruction') 
        } for alert in alert_list
       ],
       'forecast': [
          {
            'date': i.get('date'),
            'maxtemp_c': i.get('day',{}).get('maxtemp_c'),
            'mintemp_c': i.get('day',{}).get('mintemp_c'),
            'condition': i.get('day',{}).get('condition',{}).get('text')
        } for i in forecast
       ]   
      }

    return flattened_data

# Main Program
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1"
    location = "chennai"
    
    #get data from API 
    alerts = get_alerts(base_url,weatherapikey,location)
    current_weather = get_current_weather(base_url, weatherapikey, location)  
    forecast_weather = get_forecast_weather(base_url, weatherapikey, location,3)    
    
    #Flatten and Merge Data:
    return flatten_data(current_weather,forecast_weather,alerts)

# Function to process each batch of streaming data
now= datetime.now()
last_sent_time = datetime.now() - timedelta(seconds=30) # initialize last set time:

# Main Program:
def process_batch(batch_df,batch_id):
    global last_sent_time
    try:
        #get current time:
        current_time = datetime.now()
        #check if 30 seconds has passed?
        if (current_time - (last_sent_time) ).total_seconds() >=30:
            #fetch the weather data:
            weather_data = fetch_weather_data()
            #send the weather data:
            send_event(weather_data)
            #update the last sent time:
            last_sent_time = current_time
    
    except Exception as e:
        print(f"Error sending events in batch {batch_id}: {str(e)}")
        raise e

#Set up a Streaming Source
stream_df = spark.readStream.format("rate").option("rowPerSecond", 1).load()

#write the Streaming data using foreeachBatch to send weather data to EventHub
query = stream_df.writeStream.foreachBatch(process_batch).start()

query.awaitTermination()


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

### Importing the Neccesary Libraries

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import avg, count, round, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType

In [0]:
# Azure Event Hubs Connection String, Event Hub Namespace, name, and key

CONNECTION_STR = dbutils.secrets.get(scope="key-vault-scope",key="eventhub-connection-string")
EVENTHUB_NAME = "weatherstreameventhub"

connectionConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(CONNECTION_STR),
  'eventhubs.name':EVENTHUB_NAME 
}

In [0]:
#Creating Catalog-Weather
storage_location = "abfss://unity-catalog-storage@dbstoragefpzslu7ufrq22.dfs.core.windows.net/1912776267571840"
spark.sql(f"CREATE CATALOG IF NOT EXISTS weather MANAGED LOCATION '{storage_location}'")


DataFrame[]

In [0]:
#Creating Schema-(Bronze, Silver and Gold)
spark.sql("CREATE SCHEMA IF NOT EXISTS weather.bronze")
spark.sql("CREATE SCHEMA IF NOT EXISTS weather.silver")
spark.sql("CREATE SCHEMA IF NOT EXISTS weather.gold")


DataFrame[]

In [0]:
# Creating Volume for each Schema
spark.sql("CREATE VOLUME weather.gold.gold_volume")
spark.sql("CREATE VOLUME weather.silver.silver_volume")
spark.sql("CREATE VOLUME weather.bronze.bronze_volume")

DataFrame[]

In [0]:
# ReadStream to load data from Azure Event Hub into df Spark DataFrame
df = spark.readStream \
    .format("eventhubs") \
    .options(**connectionConf) \
    .load()

# Displaying stream 
df.display()

# Write Stream
df.writeStream\
    .option("checkpointLocation","/Volumes/weather/bronze/bronze_volume")\
    .outputMode("append")\
    .format("delta")\
    .toTable("weather.bronze.bronze_data")                          

body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTE1IDIwOjA3Iiw= (truncated),0,4295011144,171,2025-08-15T14:39:59.995Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTE1IDIwOjA3Iiw= (truncated),0,4295012104,172,2025-08-15T14:40:30.042Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTE1IDIwOjE1Iiw= (truncated),0,4295013064,173,2025-08-15T14:41:01.543Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTE1IDIwOjE1Iiw= (truncated),0,4295014024,174,2025-08-15T14:41:31.386Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)


In [0]:
%sql
select * from weather.bronze.bronze_data;

body,partition,offset,sequenceNumber,enqueuedTime,publisher,partitionKey,properties,systemProperties
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTEyIDE1OjUwIiw= (truncated),0,4294993000,152,2025-08-12T10:22:40.054Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTEyIDE1OjUwIiw= (truncated),0,4294993952,153,2025-08-12T10:23:10.196Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTEyIDE1OjUwIiw= (truncated),0,4294994904,154,2025-08-12T10:23:40.181Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTEyIDE1OjUwIiw= (truncated),0,4294995856,155,2025-08-12T10:24:11.166Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTEyIDE1OjUwIiw= (truncated),0,4294996808,156,2025-08-12T10:24:42.182Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTEyIDE1OjUwIiw= (truncated),0,4294997760,157,2025-08-12T10:25:12.948Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTEyIDE1OjU3Iiw= (truncated),0,4294998712,158,2025-08-12T10:25:44.48Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTEyIDE1OjU3Iiw= (truncated),0,4294999664,159,2025-08-12T10:26:14.121Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTEyIDE1OjU3Iiw= (truncated),0,4295000616,160,2025-08-12T10:26:45.684Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)
eyJuYW1lIjogIkNoZW5uYWkiLCAicmVnaW9uIjogIlRhbWlsIE5hZHUiLCAiY291bnRyeSI6ICJJbmRpYSIsICJsYXQiOiAxMy4wODMzLCAibG9uIjogODAuMjgzMywgImxvY2FsdGltZSI6ICIyMDI1LTA4LTE1IDIwOjA4Iiw= (truncated),0,4295004424,164,2025-08-15T14:34:34.255Z,,,Map(),Map(x-opt-sequence-number-epoch -> -1)


In the Bronze layer Data is stored as binary format hence we define Schema and transform the data in the Silver layer

In [0]:
#Define the Json Schema

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType
json_schema = StructType([
    StructField('name',StringType(),True),
    StructField("region", StringType(), True),
    StructField("country", StringType(), True),
    StructField("lat", DoubleType(), True),
    StructField("lon", DoubleType(), True),
    StructField("localtime", StringType(), True),
    StructField("temp_c", DoubleType(), True),
    StructField("is_day", IntegerType(), True),
    StructField("condition_text", StringType(), True),
    StructField("condition_icon", StringType(), True),
    StructField("wind_kph", DoubleType(), True),
    StructField("wind_degree", IntegerType(), True),
    StructField("wind_dir", StringType(), True),
    StructField("pressure_in", DoubleType(), True),
    StructField("precip_in", DoubleType(), True),
    StructField("humidity", IntegerType(), True),
    StructField("cloud", IntegerType(), True),
    StructField("feelslike_c", DoubleType(), True),
    StructField("uv", DoubleType(), True),
    StructField("air_quality", StructType([
        StructField("co", DoubleType(), True),
        StructField("no2", DoubleType(), True),
        StructField("o3", DoubleType(), True),
        StructField("so2", DoubleType(), True),
        StructField("pm2_5", DoubleType(), True),
        StructField("pm10", DoubleType(), True),
        StructField("us-epa-index", IntegerType(), True),
        StructField("gb-defra-index", IntegerType(), True)
    ])),
    StructField("alerts", ArrayType(StringType()), True),
    StructField("forecast", ArrayType(StructType([
        StructField("date", StringType(), True),
        StructField("maxtemp_c", DoubleType(), True),
        StructField("mintemp_c", DoubleType(), True),
        StructField("condition", StringType(), True)
    ])), True)
])


In [0]:
#Reading Streaming Data from Delta Table
df = spark.readStream.format("delta")\
          .table("weather.bronze.bronze_data")\
          .withColumn("body",col("body").cast("STRING"))\
          .withColumn("body",from_json(col("body"),json_schema))   

#casting all columns into String
df_string = df.select(
    col("body.name").cast("string").alias("name"),
    col("body.region").cast("string").alias("region"),
    col("body.country").cast("string").alias("country"),
    col("body.lat").cast("string").alias("latitude"),
    col("body.lon").cast("string").alias("longitude"),
    to_timestamp(col("body.localtime"), "yyyy-MM-dd HH:mm").alias("localtime"),
    col("body.temp_c").cast("string").alias("temperature_c"),
    col("body.is_day").cast("string").alias("is_day"),
    col("body.condition_text").cast("string").alias("condition_text"),
    col("body.condition_icon").cast("string").alias("condition_icon"),
    col("body.wind_kph").cast("string").alias("wind_kph"),
    col("body.wind_degree").cast("string").alias("wind_degree"),
    col("body.wind_dir").cast("string").alias("wind_direction"),
    col("body.pressure_in").cast("string").alias("pressure_in"),
    col("body.precip_in").cast("string").alias("precip_in"),
    col("body.humidity").cast("string").alias("humidity"),
    col("body.cloud").cast("string").alias("cloud"),
    col("body.feelslike_c").cast("string").alias("feelslike_c"),
    col("body.uv").cast("string").alias("uv"),
    col("body.air_quality.co").cast("string").alias("air_quality_co"),
    col("body.air_quality.no2").cast("string").alias("air_quality_no2"),
    col("body.air_quality.o3").cast("string").alias("air_quality_o3"),
    col("body.air_quality.so2").cast("string").alias("air_quality_so2"),
    col("body.air_quality.pm2_5").cast("string").alias("air_quality_pm2_5"),
    col("body.air_quality.pm10").cast("string").alias("air_quality_pm10"),
    col("body.air_quality.us-epa-index").cast("string").alias("us_epa_index"),
    col("body.air_quality.gb-defra-index").cast("string").alias("gb_defra_index"),
    col("body.forecast").alias("forecast")
)

display(df_string)

#write Stream Data to Silver Layer
df_string.writeStream\
    .option("checkpointLocation","/Volumes/weather/silver/silver_volume")\
    .outputMode("append")\
    .format("delta")\
    .toTable("weather.silver.silver_data")            

name,region,country,latitude,longitude,localtime,temperature_c,is_day,condition_text,condition_icon,wind_kph,wind_degree,wind_direction,pressure_in,precip_in,humidity,cloud,feelslike_c,uv,air_quality_co,air_quality_no2,air_quality_o3,air_quality_so2,air_quality_pm2_5,air_quality_pm10,us_epa_index,gb_defra_index,forecast
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:57:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,22.3,305,NW,29.65,0.0,66,75,33.2,2.9,625.3,32.375,43.0,25.16,23.495,30.34,2,2,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:57:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,22.3,305,NW,29.65,0.0,66,75,33.2,2.9,625.3,32.375,43.0,25.16,23.495,30.34,2,2,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:57:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,22.3,305,NW,29.65,0.0,66,75,33.2,2.9,625.3,32.375,43.0,25.16,23.495,30.34,2,2,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-15T20:08:00Z,31.4,0,Partly cloudy,//cdn.weatherapi.com/weather/64x64/night/116.png,23.0,181,S,29.59,0.02,71,50,39.0,0.0,495.8,12.025,108.0,16.095,19.98,30.895,2,2,"List(List(2025-08-15, 33.8, 27.6, Patchy rain nearby), List(2025-08-16, 32.2, 27.6, Patchy rain nearby), List(2025-08-17, 29.2, 25.6, Patchy rain nearby))"


In [0]:
%sql
select * from weather.silver.silver_data

name,region,country,latitude,longitude,localtime,temperature_c,is_day,condition_text,condition_icon,wind_kph,wind_degree,wind_direction,pressure_in,precip_in,humidity,cloud,feelslike_c,uv,air_quality_co,air_quality_no2,air_quality_o3,air_quality_so2,air_quality_pm2_5,air_quality_pm10,us_epa_index,gb_defra_index,forecast
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:57:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,22.3,305,NW,29.65,0.0,66,75,33.2,2.9,625.3,32.375,43.0,25.16,23.495,30.34,2,2,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:57:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,22.3,305,NW,29.65,0.0,66,75,33.2,2.9,625.3,32.375,43.0,25.16,23.495,30.34,2,2,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:57:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,22.3,305,NW,29.65,0.0,66,75,33.2,2.9,625.3,32.375,43.0,25.16,23.495,30.34,2,2,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T16:01:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,22.0,309,NW,29.65,0.0,66,75,33.2,1.0,419.95,26.825,52.0,29.045,20.72,29.97,2,2,"List(List(2025-08-12, 33.2, 27.5, Patchy rain nearby), List(2025-08-13, 26.7, 25.3, Moderate rain), List(2025-08-14, 30.6, 26.0, Patchy rain nearby))"


Flatten the Forecast Data using Explode() and transfer the final data into Gold Layer

In [0]:
silver_df = spark.readStream.format("delta").table("weather.silver.silver_data")

exploded_df = silver_df.withColumn("forecast",explode(col("forecast")))

gold_df = exploded_df.select(
    col("name"),
    col("region"),
    col("country"),
    col("latitude"),
    col("longitude"),
    col("localtime"),
    col("temperature_c"),
    col("is_day"),
    col("condition_text"),
    col("condition_icon"),
    col("wind_kph"),
    col("wind_degree"),
    col("wind_direction"),
    col("pressure_in"),
    col("precip_in"),
    col("humidity"),
    col("cloud"),
    col("feelslike_c"),
    col("uv"),
    col("air_quality_co"),
    col("air_quality_no2"),
    col("air_quality_o3"),
    col("air_quality_so2"),
    col("air_quality_pm2_5"),
    col("air_quality_pm10"),
    col("us_epa_index"),
    col("gb_defra_index"),
    col("forecast.date").alias("forecast_date"),
    col("forecast.maxtemp_c").alias("forecast_maxtemp_c"),
    col("forecast.mintemp_c").alias("forecast_mintemp_c"),
    col("forecast.condition").alias("forecast_condition")
)        
display(gold_df)

gold_df.writeStream\
    .format("delta") \
    .option("checkpointLocation","/Volumes/weather/gold/gold_volume") \
    .outputMode("append") \
    .toTable("weather.gold.gold_data")            

name,region,country,latitude,longitude,localtime,temperature_c,is_day,condition_text,condition_icon,wind_kph,wind_degree,wind_direction,pressure_in,precip_in,humidity,cloud,feelslike_c,uv,air_quality_co,air_quality_no2,air_quality_o3,air_quality_so2,air_quality_pm2_5,air_quality_pm10,us_epa_index,gb_defra_index,forecast_date,forecast_maxtemp_c,forecast_mintemp_c,forecast_condition
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-12,33.2,27.5,Patchy rain nearby
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-13,26.7,25.3,Moderate rain
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-14,30.6,26.0,Patchy rain nearby
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-12,33.2,27.5,Patchy rain nearby
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-13,26.7,25.3,Moderate rain
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-14,30.6,26.0,Patchy rain nearby
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-12,33.2,27.5,Patchy rain nearby
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-13,26.7,25.3,Moderate rain
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-14,30.6,26.0,Patchy rain nearby
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-12,33.2,27.5,Patchy rain nearby


In [0]:
%sql
select * from weather.gold.gold_data 

name,region,country,latitude,longitude,localtime,temperature_c,is_day,condition_text,condition_icon,wind_kph,wind_degree,wind_direction,pressure_in,precip_in,humidity,cloud,feelslike_c,uv,air_quality_co,air_quality_no2,air_quality_o3,air_quality_so2,air_quality_pm2_5,air_quality_pm10,us_epa_index,gb_defra_index,forecast_date,forecast_maxtemp_c,forecast_mintemp_c,forecast_condition
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-12,33.2,27.5,Patchy rain nearby
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-13,26.7,25.3,Moderate rain
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-14,30.6,26.0,Patchy rain nearby
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-12,33.2,27.5,Patchy rain nearby
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-13,26.7,25.3,Moderate rain
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-14,30.6,26.0,Patchy rain nearby
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-12,33.2,27.5,Patchy rain nearby
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-13,26.7,25.3,Moderate rain
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-14,30.6,26.0,Patchy rain nearby
Chennai,Tamil Nadu,India,13.0833,80.2833,2025-08-12T15:50:00Z,31.0,1,Partly cloudy,//cdn.weatherapi.com/weather/64x64/day/116.png,23.4,285,WNW,29.65,0.0,66,75,33.8,7.8,608.65,36.075,34.0,14.06,37.37,42.735,2,4,2025-08-12,33.2,27.5,Patchy rain nearby


In [0]:
#  Access Token:
#    dapia03ebce33ceb36526665a60c24237540
