## Sending a test event to Event HUB

In [0]:
from azure.eventhub import EventHubProducerClient, EventData

# Connection details
EVENT_HUB_CONNECTION_STRING = "Endpoint=sb://weatherstreamingprojectnamespace.servicebus.windows.net/;SharedAccessKeyName=fordatbricks;SharedAccessKey=Ry+NOjHe06zi2KrgduSzBNjaJHmcp1WDk+AEhMVvlZM=;EntityPath=weatherstreamingeventhub"
EVENT_HUB_NAME = "weatherstreamingeventhub"

# Create an Event Hub producer client
producer = EventHubProducerClient.from_connection_string(
    conn_str=EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME
)

# Send a test event
with producer:
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData("Test message from Databricks!"))
    producer.send_batch(event_data_batch)

print("Event sent successfully!")

Event sent successfully!


## Using Key Vault

In [0]:
from azure.eventhub import EventHubProducerClient, EventData

# Connection details
EVENT_HUB_CONNECTION_STRING = dbutils.secrets.get(scope="key-vault-scope",key="eventhub-connection-string")
EVENT_HUB_NAME = "weatherstreamingeventhub"

# Create an Event Hub producer client
producer = EventHubProducerClient.from_connection_string(
    conn_str=EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME
)

# Send a test event
with producer:
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData("Using Dynamic using keyvalut "))
    producer.send_batch(event_data_batch)

print("Event sent successfully!")

Event sent successfully!


## API Testing 

In [0]:
import requests
import json 

# Getting secret value from key vault 
weatherapikey = dbutils.secrets.get(scope="key-vault-scope",key="weatherapikey")
location = "Chennai"

base_url = "http://api.weatherapi.com/v1"

current_weather_url = f"{base_url}/current.json"

params = {
    'key': weatherapikey,
    'q': location,
}

response = requests.get(current_weather_url, params=params)

if response.status_code == 200:
    current_weather = response.json()
    print("Current Weather:")
    print(json.dumps(current_weather, indent=4))
else:
    print(f"Error: {response.status_code} - {response.text}")


Current Weather:
{
    "location": {
        "name": "Chennai",
        "region": "Tamil Nadu",
        "country": "India",
        "lat": 13.0833,
        "lon": 80.2833,
        "tz_id": "Asia/Kolkata",
        "localtime_epoch": 1748169648,
        "localtime": "2025-05-25 16:10"
    },
    "current": {
        "last_updated_epoch": 1748169000,
        "last_updated": "2025-05-25 16:00",
        "temp_c": 33.2,
        "temp_f": 91.8,
        "is_day": 1,
        "condition": {
            "text": "Patchy light rain",
            "icon": "//cdn.weatherapi.com/weather/64x64/day/293.png",
            "code": 1180
        },
        "wind_mph": 8.1,
        "wind_kph": 13.0,
        "wind_degree": 219,
        "wind_dir": "SW",
        "pressure_mb": 1002.0,
        "pressure_in": 29.59,
        "precip_mm": 0.0,
        "precip_in": 0.0,
        "humidity": 63,
        "cloud": 75,
        "feelslike_c": 40.7,
        "feelslike_f": 105.3,
        "windchill_c": 31.3,
        "windchi

## Complete code for getting weather data

In [0]:
import requests
import json

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code} - {response.text}"

# Function to get Current Weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        'key': api_key,
        'q': location,
        "aqi": "yes"
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get Forecast Data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_url = f"{base_url}/forecast.json"
    params = {
        'key': api_key,
        'q': location,
        "days": days,
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get Alerts
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        'key': api_key,
        'q': location,
        "alerts": "yes"
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction')
            }
            for alert in alert_list
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data

# Main Program
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1"
    location = "Chennai"
    api_key = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")

    # Get data from API
    current_weather = get_current_weather(base_url, api_key, location)
    forecast_weather = get_forecast_weather(base_url, api_key, location, 3)
    alerts = get_alerts(base_url, api_key, location)

    # Flatten and merge data
    merged_data = flatten_data(current_weather, forecast_weather, alerts)
    print("Weather Data:", json.dumps(merged_data, indent=3))

# Calling the Main program
fetch_weather_data()

Weather Data: {
   "name": "Chennai",
   "region": "Tamil Nadu",
   "country": "India",
   "lat": 13.0833,
   "lon": 80.2833,
   "localtime": "2025-05-25 16:49",
   "temp_c": 32.3,
   "is_day": 1,
   "condition_text": "Light rain",
   "condition_icon": "//cdn.weatherapi.com/weather/64x64/day/296.png",
   "wind_kph": 13.0,
   "wind_degree": 219,
   "wind_dir": "SW",
   "pressure_in": 29.56,
   "precip_in": 0.0,
   "humidity": 67,
   "cloud": 75,
   "feelslike_c": 38.5,
   "uv": 0.4,
   "air_quality": {
      "co": 388.5,
      "no2": 10.545,
      "o3": 111.0,
      "so2": 24.605,
      "pm2_5": 20.535,
      "pm10": 23.865,
      "us-epa-index": 2,
      "gb-defra-index": 2
   },
   "alerts": [
      {
         "headline": "Heavy to very heavy rainfall with extremely heavy rainfall",
         "severity": "Severe",
         "description": "Heavy to Very Heavy Rainfall with isolated Extremely Heavy Rainfall very likely at isolated places over Costal Karnataka, Kerala, Mahe, South Interio


## Sending Complete weather data to the Event HUB 

In [0]:
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData

# Connection details
EVENT_HUB_CONNECTION_STRING = dbutils.secrets.get(scope="key-vault-scope",key="eventhub-connection-string")
EVENT_HUB_NAME = "weatherstreamingeventhub"

# Create an Event Hub producer client
producer = EventHubProducerClient.from_connection_string(
    conn_str=EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME
)

def save_event(event):
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code} - {response.text}"

# Function to get Current Weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        'key': api_key,
        'q': location,
        "aqi": "yes"
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get Forecast Data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_url = f"{base_url}/forecast.json"
    params = {
        'key': api_key,
        'q': location,
        "days": days,
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get Alerts
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        'key': api_key,
        'q': location,
        "alerts": "yes"
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction')
            }
            for alert in alert_list
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data

# Main Program
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1"
    location = "Chennai"
    api_key = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")

    # Get data from API
    current_weather = get_current_weather(base_url, api_key, location)
    forecast_weather = get_forecast_weather(base_url, api_key, location, 3)
    alerts = get_alerts(base_url, api_key, location)

    # Flatten and merge data
    merged_data = flatten_data(current_weather, forecast_weather, alerts)
    # print("Weather Data:", json.dumps(merged_data, indent=3))
    return merged_data
    
    # Sending the weather data to event HUB 
    # save_event(merged_data)

def process_batch(batch_df,batch_id):
    try:
        weather_data = fetch_weather_data()
        save_event(weather_data)
    except Exception as e:
        print(f"Error sending events in batch {batch_df}: {str(e)}")
        raise e

# set up a streaming source (for example, rate source for testing purpose)
streaming_df = spark.readStream.format("rate").option('rowPerSecond',1).load()

# write the streaming data using foreach to send weather data to Event HuB
query = streaming_df.writeStream.foreachBatch(process_batch).start()

query.awaitTermination()

# Calling the Main program
producer.close()

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

## Sending the weather data to event HUB in every 30 seconds

In [0]:
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData
from datetime import datetime, timedelta

# Connection details
EVENT_HUB_CONNECTION_STRING = dbutils.secrets.get(scope="key-vault-scope",key="eventhub-connection-string")
EVENT_HUB_NAME = "weatherstreamingeventhub"

# Create an Event Hub producer client
producer = EventHubProducerClient.from_connection_string(
    conn_str=EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME
)

def save_event(event):
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code} - {response.text}"

# Function to get Current Weather and air quality data
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        'key': api_key,
        'q': location,
        "aqi": "yes"
    }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get Forecast Data
def get_forecast_weather(base_url, api_key, location, days):
    forecast_url = f"{base_url}/forecast.json"
    params = {
        'key': api_key,
        'q': location,
        "days": days,
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get Alerts
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        'key': api_key,
        'q': location,
        "alerts": "yes"
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction')
            }
            for alert in alert_list
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data

# Main Program
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1"
    location = "Chennai"
    api_key = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")

    # Get data from API
    current_weather = get_current_weather(base_url, api_key, location)
    forecast_weather = get_forecast_weather(base_url, api_key, location, 3)
    alerts = get_alerts(base_url, api_key, location)

    # Flatten and merge data
    merged_data = flatten_data(current_weather, forecast_weather, alerts)
    # print("Weather Data:", json.dumps(merged_data, indent=3))
    return merged_data
    
    # Sending the weather data to event HUB 
    # save_event(merged_data)

# main program 

## function to process batch of streaming data
last_sent_time = datetime.now() - timedelta(seconds=30)

def process_batch(batch_df,batch_id):
    global last_sent_time

    try:
        current_time = datetime.now()

        if (current_time - last_sent_time ).total_seconds() >= 30:
            weather_data = fetch_weather_data()
            save_event(weather_data)

            last_sent_time = current_time
            print(f"Event Sent at {last_sent_time}")
    except Exception as e:
        print(f"Error sending events in batch {batch_df}: {str(e)}")
        raise e

# set up a streaming source (for example, rate source for testing purpose)
streaming_df = spark.readStream.format("rate").option('rowPerSecond',1).load()

# write the streaming data using foreach to send weather data to Event HuB
query = streaming_df.writeStream.foreachBatch(process_batch).start()

query.awaitTermination()

# Calling the Main program
producer.close()

Event Sent at 2025-05-25 11:49:39.598251
Event Sent at 2025-05-25 11:50:10.794526
Event Sent at 2025-05-25 11:50:41.433303


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can