In [0]:
# from azure.eventhub import EventHubProducerClient, EventData
# import json

# #Event Hub Configuration
# EVENT_HUB_CONNECTION_STRING = "Endpoint=sb://wstreamingnamespace.servicebus.windows.net/;SharedAccessKeyName=fordatabricks;SharedAccessKey==;EntityPath=weatherstreamingeventhub"
# EVENT_HUB_NAME = "weatherstreamingeventhub"

# #Initialize Event Hub Producer
# producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME)

# # Function to send Event to Event Hub
# def send_event(event):
#   event_data_batch = producer.create_batch()
#   event_data_batch.add(EventData(json.dumps(event)))
#   producer.send_batch(event_data_batch)

# # sample JSON event
# event = {
#   "event_id": 11,
#   "event_name": "Test Event",
# }
# #send the event
# send_event(event)

# #close the producer
# producer.close()

#Sending a Test Event to the Event Hub

In [0]:
from azure.eventhub import EventHubProducerClient, EventData
import json

#Event Hub Configuration

#Getting secret value from Azure Key Vault
EVENT_HUB_CONNECTION_STRING = dbutils.secrets.get(scope = "key-vault-scope", key = "eventhub-connection-string")
EVENT_HUB_NAME = "weatherstreamingeventhub"

#Initialize Event Hub Producer
producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME)

# Function to send Event to Event Hub
def send_event(event):
  event_data_batch = producer.create_batch()
  event_data_batch.add(EventData(json.dumps(event)))
  producer.send_batch(event_data_batch)

# sample JSON event
event = {
  "event_id": 1100,
  "event_name": "Test Key vault",
}
#send the event
send_event(event)

#close the producer
producer.close()

#API TESTING


In [0]:
import requests
import json

#Getting secret value from Azure Key Vault
weatherapikey= dbutils.secrets.get(scope = "key-vault-scope", key = "weatherapikey")
location = "Boston"

base_url = " http://api.weatherapi.com/v1/"

current_weather_url = f"{base_url}/current.json"

parameters = {
    "key": weatherapikey,
    "q": location, 
}

response = requests.get(current_weather_url, params=parameters)

if response.status_code == 200:
    current_weather = response.json()
    print("current_weather")
    print(json.dumps(current_weather, indent=3))
else:
    print(f"Error: {response.status_code}, {response.text}")

current_weather
{
   "location": {
      "name": "Boston",
      "region": "Massachusetts",
      "country": "United States of America",
      "lat": 42.3583,
      "lon": -71.0603,
      "tz_id": "America/New_York",
      "localtime_epoch": 1754419947,
      "localtime": "2025-08-05 14:52"
   },
   "current": {
      "last_updated_epoch": 1754419500,
      "last_updated": "2025-08-05 14:45",
      "temp_c": 21.7,
      "temp_f": 71.1,
      "is_day": 1,
      "condition": {
         "text": "Partly cloudy",
         "icon": "//cdn.weatherapi.com/weather/64x64/day/116.png",
         "code": 1003
      },
      "wind_mph": 8.9,
      "wind_kph": 14.4,
      "wind_degree": 71,
      "wind_dir": "ENE",
      "pressure_mb": 1028.0,
      "pressure_in": 30.37,
      "precip_mm": 0.0,
      "precip_in": 0.0,
      "humidity": 71,
      "cloud": 75,
      "feelslike_c": 21.7,
      "feelslike_f": 71.1,
      "windchill_c": 22.7,
      "windchill_f": 72.8,
      "heatindex_c": 24.8,
      "hea

#Complete code for getting weather data from API

In [0]:
import requests
import json 

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        "key": api_key,
        "q": location,
        "aqi": "yes"
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get forecast weather data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_weather_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }    
    response = requests.get(forecast_weather_url, params=params)
    return handle_response(response)

# Function to get alerts data
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        "key": api_key,
        "q": location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)    

# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alerts_list = alerts.get("alerts", {}).get("alert", [])
    
    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_mph': current.get('wind_mph'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('description'),
                'instruction': alert.get('instruction'),
            }
            for alert in alerts_list    
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text'),
            }
            for day in forecast
        ]
    }
    return flattened_data

# Main program
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"
    location = "Boston"  
    weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")
    # Get data from API
    current_weather = get_current_weather(base_url, weatherapikey, location)
    forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3) 
    alerts = get_alerts(base_url, weatherapikey, location)
    # Flatten and merge the data
    merged_data = flatten_data(current_weather, forecast_weather, alerts)
    print('weather data:', json.dumps(merged_data, indent=3))
    
# Calling the main program
fetch_weather_data()

weather data: {
   "name": "Boston",
   "region": "Massachusetts",
   "country": "United States of America",
   "lat": 42.3583,
   "lon": -71.0603,
   "localtime": "2025-08-07 11:10",
   "temp_c": 20.6,
   "is_day": 1,
   "condition_text": "Partly cloudy",
   "condition_icon": "//cdn.weatherapi.com/weather/64x64/day/116.png",
   "wind_mph": 5.4,
   "wind_kph": 8.6,
   "wind_degree": 96,
   "wind_dir": "E",
   "pressure_in": 30.4,
   "precip_in": 0.0,
   "humidity": 81,
   "cloud": 75,
   "feelslike_c": 20.6,
   "uv": 5.2,
   "air_quality": {
      "co": 314.5,
      "no2": 7.4,
      "o3": 136.0,
      "pm2_5": 22.755,
      "pm10": 23.125,
      "us-epa-index": 2,
      "gb-defra-index": 2
   },
   "alerts": [],
   "forecast": [
      {
         "date": "2025-08-07",
         "maxtemp_c": 23.2,
         "mintemp_c": 15.9,
         "condition": "Mist"
      },
      {
         "date": "2025-08-08",
         "maxtemp_c": 24.8,
         "mintemp_c": 15.9,
         "condition": "Partly Cl

#Sending weather data to event hub

In [0]:
import requests
from azure.eventhub import EventHubProducerClient, EventData
import json 

#Event Hub Configuration

#Getting secret value from Azure Key Vault
EVENT_HUB_CONNECTION_STRING = dbutils.secrets.get(scope = "key-vault-scope", key = "eventhub-connection-string")
EVENT_HUB_NAME = "weatherstreamingeventhub"

#Initialize Event Hub Producer
producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME)

# Function to send Event to Event Hub
def send_event(event):
  event_data_batch = producer.create_batch()
  event_data_batch.add(EventData(json.dumps(event)))
  producer.send_batch(event_data_batch)

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        "key": api_key,
        "q": location,
        "aqi": "yes"
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get forecast weather data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_weather_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }    
    response = requests.get(forecast_weather_url, params=params)
    return handle_response(response)

# Function to get alerts data
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        "key": api_key,
        "q": location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)    

# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alerts_list = alerts.get("alerts", {}).get("alert", [])
    
    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_mph': current.get('wind_mph'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('description'),
                'instruction': alert.get('instruction'),
            }
            for alert in alerts_list    
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text'),
            }
            for day in forecast
        ]
    }
    return flattened_data

# Main program
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"
    location = "Boston"  
    weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")
    # Get data from API
    current_weather = get_current_weather(base_url, weatherapikey, location)
    forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3) 
    alerts = get_alerts(base_url, weatherapikey, location)
    # Flatten and merge the data
    merged_data = flatten_data(current_weather, forecast_weather, alerts)
    
    #sending a weather data to the event hub
    send_event(merged_data)
    
# Calling the main program
fetch_weather_data()

#sending the weather data in streaming fashion

In [0]:
weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")

In [0]:
import requests
from azure.eventhub import EventHubProducerClient, EventData
import json 

#Event Hub Configuration

#Getting secret value from Azure Key Vault
EVENT_HUB_CONNECTION_STRING = dbutils.secrets.get(scope = "key-vault-scope", key = "eventhub-connection-string")
EVENT_HUB_NAME = "weatherstreamingeventhub"
weatherapikey= dbutils.secrets.get(scope = "key-vault-scope", key = "weatherapikey")

#Initialize Event Hub Producer


# Function to send Event to Event Hub
def send_event(event):
  producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME)  
  event_data_batch = producer.create_batch()
  event_data_batch.add(EventData(json.dumps(event)))
  producer.send_batch(event_data_batch)
  producer.close()

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        "key": api_key,
        "q": location,
        "aqi": "yes"
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get forecast weather data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_weather_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }    
    response = requests.get(forecast_weather_url, params=params)
    return handle_response(response)

# Function to get alerts data
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        "key": api_key,
        "q": location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)    

# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alerts_list = alerts.get("alerts", {}).get("alert", [])
    
    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_mph': current.get('wind_mph'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('description'),
                'instruction': alert.get('instruction'),
            }
            for alert in alerts_list    
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text'),
            }
            for day in forecast
        ]
    }
    return flattened_data


def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"
    location = "Boston"  
    
    # Get data from API
    current_weather = get_current_weather(base_url, weatherapikey, location)
    forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3) 
    alerts = get_alerts(base_url, weatherapikey, location)
    # Flatten and merge the data
    merged_data = flatten_data(current_weather, forecast_weather, alerts)
    return merged_data
#main program
def process_batch(batch_df, batch_id):
    try:
        #fetch weather data
        weather_data = fetch_weather_data()

        #send the weather data(current weather data)
        send_event(weather_data)

    except Exception as e:
        print(f"Error sending events in batch {batch_id}: {str(e)}")
        raise e

#set up a streaming source(for eg, rate source for testing purpose)
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

#write the stream data using foreachBatch to send weather data to event hub
query = streaming_df.writeStream.foreachBatch(process_batch).start()
query.awaitTermination()



com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

#sending the weather data to event hub every 30 seconds

In [0]:
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData
from datetime import datetime, timedelta
 

#Event Hub Configuration

#Getting secret value from Azure Key Vault
EVENT_HUB_CONNECTION_STRING = dbutils.secrets.get(scope = "key-vault-scope", key = "eventhub-connection-string")
EVENT_HUB_NAME = "weatherstreamingeventhub"
weatherapikey= dbutils.secrets.get(scope = "key-vault-scope", key = "weatherapikey")

#Initialize Event Hub Producer


# Function to send Event to Event Hub
def send_event(event):
  producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME)  
  event_data_batch = producer.create_batch()
  event_data_batch.add(EventData(json.dumps(event)))
  producer.send_batch(event_data_batch)
  producer.close()

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        "key": api_key,
        "q": location,
        "aqi": "yes"
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get forecast weather data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_weather_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }    
    response = requests.get(forecast_weather_url, params=params)
    return handle_response(response)

# Function to get alerts data
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        "key": api_key,
        "q": location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)    

# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alerts_list = alerts.get("alerts", {}).get("alert", [])
    
    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_mph': current.get('wind_mph'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('description'),
                'instruction': alert.get('instruction'),
            }
            for alert in alerts_list    
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text'),
            }
            for day in forecast
        ]
    }
    return flattened_data


def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"
    location = "Boston"  
    
    # Get data from API
    current_weather = get_current_weather(base_url, weatherapikey, location)
    forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3) 
    alerts = get_alerts(base_url, weatherapikey, location)
    # Flatten and merge the data
    merged_data = flatten_data(current_weather, forecast_weather, alerts)
    return merged_data

#function to process each batch of streaming data
last_sent_time = datetime.now() - timedelta(seconds=30) #Initialize the last sent time

#main program
def process_batch(batch_df, batch_id):
    global last_sent_time
    try:

        #Get current time
        current_time = datetime.now()

        #check if 30 seconds have passed since the last event was sent
        if (current_time - last_sent_time).total_seconds() >= 30:
            #fetch weather data
            weather_data = fetch_weather_data()

            #send the weather data(current weather data)
            send_event(weather_data)

            #update the last sent time
            last_sent_time = current_time
            print(f'Event sent at {last_sent_time}')

        # #fetch weather data
        # weather_data = fetch_weather_data()

        # #send the weather data(current weather data)
        # send_event(weather_data)

    except Exception as e:
        print(f"Error sending events in batch {batch_id}: {str(e)}")
        raise e

#set up a streaming source(for eg, rate source for testing purpose)
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

#write the stream data using foreachBatch to send weather data to event hub
query = streaming_df.writeStream.foreachBatch(process_batch).start()
query.awaitTermination()

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can