
#### Sending a test event to the Event Hub

In [0]:
from azure.eventhub import EventHubProducerClient, EventData
import json

# Get secret value from Key Vault
eventhub_connection_string = dbutils.secrets.get(scope="key-vault-scope", key="eventhub-connection-string")

EVENT_HUB_NAME = "weatherstreamingeventhub"

# Initialize the eventhub producer
producer = EventHubProducerClient.from_connection_string(eventhub_connection_string, eventhub_name=EVENT_HUB_NAME)

# Function to send events to Event Hub
def send_event(event):
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

# Sample JSON envent
event = {
    "event_id": 2222,
    "event_name": "Key Vault Test"
}

# Send the event
send_event(event)

# Close the producer
producer.close()

#### API Testing

In [0]:
import requests
import json

# Get secret value from Key Vault
weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")
location = "Burgas"

base_url = "https://api.weatherapi.com/v1/"

current_weather_url = f"{base_url}/current.json"

params = {
    'key': weatherapikey,
    'q': location,
}

response = requests.get(current_weather_url, params=params)
if response.status_code == 200:
    current_weather = response.json()
    print("Current weather:")
    print(json.dumps(current_weather, indent=3))
else:
    print(f"Error: {response.status_code}, {response.text}")

Current weather:
{
   "location": {
      "name": "Burgas",
      "region": "Burgas",
      "country": "Bulgaria",
      "lat": 42.5,
      "lon": 27.4667,
      "tz_id": "Europe/Sofia",
      "localtime_epoch": 1736780657,
      "localtime": "2025-01-13 17:04"
   },
   "current": {
      "last_updated_epoch": 1736780400,
      "last_updated": "2025-01-13 17:00",
      "temp_c": 4.1,
      "temp_f": 39.4,
      "is_day": 0,
      "condition": {
         "text": "Clear",
         "icon": "//cdn.weatherapi.com/weather/64x64/night/113.png",
         "code": 1000
      },
      "wind_mph": 16.6,
      "wind_kph": 26.6,
      "wind_degree": 39,
      "wind_dir": "NE",
      "pressure_mb": 1031.0,
      "pressure_in": 30.45,
      "precip_mm": 0.01,
      "precip_in": 0.0,
      "humidity": 65,
      "cloud": 0,
      "feelslike_c": -0.8,
      "feelslike_f": 30.5,
      "windchill_c": -0.6,
      "windchill_f": 30.9,
      "heatindex_c": 4.3,
      "heatindex_f": 39.7,
      "dewpoint_c": -


#### Complete code for getting weather data

In [0]:
import requests
import json

# Function to handle the API response
def handle_response(response):
  if response.status_code == 200:
    return response.json()
  else:
    return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
  current_weather_url = f"{base_url}/current.json"
  params = {
    'key': api_key,
    'q': location,
    'aqi': 'yes'
  }
  response = requests.get(current_weather_url, params=params)
  return handle_response(response)

# Function to get forecast data
def get_forecast_weather(base_url, api_key, location, days):
  forecast_url = f"{base_url}/forecast.json"
  params = {
    'key': api_key,
    'q': location,
    'days': days,
  }
  response = requests.get(forecast_url, params=params)
  return handle_response(response)


# Function to get alerts
def get_alerts(base_url, api_key, location):
  alerts_url = f"{base_url}/alerts.json"
  params = {
    'key': api_key,
    'q': location,
    'alerts': 'yes'
  }
  response = requests.get(alerts_url, params=params)
  return handle_response(response)


# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
  location_data = current_weather.get("location", {})
  current = current_weather.get("current", {})
  condition = current.get("condition", {})
  air_quality = current.get("air_quality", {})
  forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
  alert_list = alerts.get("alerts", {}).get("alert", [])

  flattened_data = {
    'name': location_data.get("name"),
    'region': location_data.get("region"),
    'country': location_data.get("country"),
    'lat': location_data.get("lat"),
    'lon': location_data.get("lon"),
    'localtime': location_data.get("localtime"),
    'temp_c': current.get('temp_c'), 
    'is_day': current.get('is_day'),
    'condition_text': condition.get('text'),
    'condition_icon': condition.get('icon'),
    'wind_kph': current.get('wind_kph'),
    'wind_degree': current.get('wind_degree'),
    'wind_dir': current.get('wind_dir'),
    'pressure_mb': current.get('pressure_mb'),
    'precip_mm': current.get('precip_mm'),
    'humidity': current.get('humidity'),
    'cloud': current.get('cloud'),
    'feelslike_c': current.get('feelslike_c'),
    'uv': current.get('uv'),

    'air_quality':{
      'co': air_quality.get('co'),
      'no2': air_quality.get('no2'),
      'o3': air_quality.get('o3'),
      'so2': air_quality.get('so2'),
      'pm2_5': air_quality.get('pm2_5'),
      'pm10': air_quality.get('pm10'),
      'us-epa-index': air_quality.get('us-epa-index'), 
      'gb-defra-index': air_quality.get('gb-defra-index'),
    }, 

    'alerts': [
      {
        'headline': alert.get('headline'),
        'severity': alert.get('severity'),
        'description': alert.get('description'),
        'instruction': alert.get('instruction')
      }
      for alert in alert_list
    ],

    'forecast': [
      {
        'date': day.get('date'),
        'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
        'mintemp_c': day.get('day', {}).get('mintemp_c'),
        'condition': day.get('day', {}).get('condition', {}).get('text')
      }
      for day in forecast
    ]
  }

  return flattened_data


# Main function
def fetch_weather_data():
  base_url = "https://api.weatherapi.com/v1/"
  location = "Burgas"
  weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")

  # Get data from API
  current_weather = get_current_weather(base_url, weatherapikey, location)
  forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3)
  alerts = get_alerts(base_url, weatherapikey, location)

  # Flatten and merge data
  merged_data = flatten_data(current_weather, forecast_weather, alerts)
  print("Weather data:", json.dumps(merged_data, indent=3))

# Call the main function
fetch_weather_data()

Weather data: {
   "name": "Burgas",
   "region": "Burgas",
   "country": "Bulgaria",
   "lat": 42.5,
   "lon": 27.4667,
   "localtime": "2025-01-13 21:33",
   "temp_c": 2.0,
   "is_day": 0,
   "condition_text": "Clear",
   "condition_icon": "//cdn.weatherapi.com/weather/64x64/night/113.png",
   "wind_kph": 27.0,
   "wind_degree": 24,
   "wind_dir": "NNE",
   "pressure_mb": 1033.0,
   "precip_mm": 0.02,
   "humidity": 75,
   "cloud": 0,
   "feelslike_c": -3.6,
   "uv": 0.0,
   "air_quality": {
      "co": 403.3,
      "no2": 9.805,
      "o3": 43.0,
      "so2": 2.96,
      "pm2_5": 15.725,
      "pm10": 17.02,
      "us-epa-index": 2,
      "gb-defra-index": 2
   },
   "alerts": [
      {
         "severity": "Severe",
         "description": null,
         "instruction": ""
      },
      {
         "severity": "Severe",
         "description": null,
         "instruction": ""
      }
   ],
   "forecast": [
      {
         "date": "2025-01-13",
         "maxtemp_c": 4.4,
         "m

#### Sending the complete weather data to the Event Hub

In [0]:
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData

# Get secret value from Key Vault
eventhub_connection_string = dbutils.secrets.get(scope="key-vault-scope", key="eventhub-connection-string")

EVENT_HUB_NAME = "weatherstreamingeventhub"

# Initialize the eventhub producer
producer = EventHubProducerClient.from_connection_string(eventhub_connection_string, eventhub_name=EVENT_HUB_NAME)

# Function to send events to Event Hub
def send_event(event):
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

# Function to handle the API response
def handle_response(response):
  if response.status_code == 200:
    return response.json()
  else:
    return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
  current_weather_url = f"{base_url}/current.json"
  params = {
    'key': api_key,
    'q': location,
    'aqi': 'yes'
  }
  response = requests.get(current_weather_url, params=params)
  return handle_response(response)

# Function to get forecast data
def get_forecast_weather(base_url, api_key, location, days):
  forecast_url = f"{base_url}/forecast.json"
  params = {
    'key': api_key,
    'q': location,
    'days': days,
  }
  response = requests.get(forecast_url, params=params)
  return handle_response(response)

# Function to get alerts
def get_alerts(base_url, api_key, location):
  alerts_url = f"{base_url}/alerts.json"
  params = {
    'key': api_key,
    'q': location,
    'alerts': 'yes'
  }
  response = requests.get(alerts_url, params=params)
  return handle_response(response)


# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
  location_data = current_weather.get("location", {})
  current = current_weather.get("current", {})
  condition = current.get("condition", {})
  air_quality = current.get("air_quality", {})
  forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
  alert_list = alerts.get("alerts", {}).get("alert", [])

  flattened_data = {
    'name': location_data.get("name"),
    'region': location_data.get("region"),
    'country': location_data.get("country"),
    'lat': location_data.get("lat"),
    'lon': location_data.get("lon"),
    'localtime': location_data.get("localtime"),
    'temp_c': current.get('temp_c'), 
    'is_day': current.get('is_day'),
    'condition_text': condition.get('text'),
    'condition_icon': condition.get('icon'),
    'wind_kph': current.get('wind_kph'),
    'wind_degree': current.get('wind_degree'),
    'wind_dir': current.get('wind_dir'),
    'pressure_mb': current.get('pressure_mb'),
    'precip_mm': current.get('precip_mm'),
    'humidity': current.get('humidity'),
    'cloud': current.get('cloud'),
    'feelslike_c': current.get('feelslike_c'),
    'uv': current.get('uv'),

    'air_quality':{
      'co': air_quality.get('co'),
      'no2': air_quality.get('no2'),
      'o3': air_quality.get('o3'),
      'so2': air_quality.get('so2'),
      'pm2_5': air_quality.get('pm2_5'),
      'pm10': air_quality.get('pm10'),
      'us-epa-index': air_quality.get('us-epa-index'), 
      'gb-defra-index': air_quality.get('gb-defra-index'),
    }, 

    'alerts': [
      {
        'headline': alert.get('headline'),
        'severity': alert.get('severity'),
        'description': alert.get('description'),
        'instruction': alert.get('instruction')
      }
      for alert in alert_list
    ],

    'forecast': [
      {
        'date': day.get('date'),
        'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
        'mintemp_c': day.get('day', {}).get('mintemp_c'),
        'condition': day.get('day', {}).get('condition', {}).get('text')
      }
      for day in forecast
    ]
  }

  return flattened_data


# Main function
def fetch_weather_data():
  base_url = "https://api.weatherapi.com/v1/"
  location = "Burgas"
  weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")

  # Get data from API
  current_weather = get_current_weather(base_url, weatherapikey, location)
  forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3)
  alerts = get_alerts(base_url, weatherapikey, location)

  # Flatten and merge data
  merged_data = flatten_data(current_weather, forecast_weather, alerts)
  # Send the weather data to the Event Hub
  send_event(merged_data)

# Call the main function
fetch_weather_data()

#### Sending the weather data as a continuous stream

In [0]:
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData
from datetime import datetime, timedelta


# Get secret value from Key Vault
eventhub_connection_string = dbutils.secrets.get(scope="key-vault-scope", key="eventhub-connection-string")

EVENT_HUB_NAME = "weatherstreamingeventhub"

# Initialize the eventhub producer
producer = EventHubProducerClient.from_connection_string(eventhub_connection_string, eventhub_name=EVENT_HUB_NAME)

# Function to send events to Event Hub
def send_event(event):
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

# Function to handle the API response
def handle_response(response):
  if response.status_code == 200:
    return response.json()
  else:
    return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_current_weather(base_url, api_key, location):
  current_weather_url = f"{base_url}/current.json"
  params = {
    'key': api_key,
    'q': location,
    'aqi': 'yes'
  }
  response = requests.get(current_weather_url, params=params)
  return handle_response(response)

# Function to get forecast data
def get_forecast_weather(base_url, api_key, location, days):
  forecast_url = f"{base_url}/forecast.json"
  params = {
    'key': api_key,
    'q': location,
    'days': days,
  }
  response = requests.get(forecast_url, params=params)
  return handle_response(response)

# Function to get alerts
def get_alerts(base_url, api_key, location):
  alerts_url = f"{base_url}/alerts.json"
  params = {
    'key': api_key,
    'q': location,
    'alerts': 'yes'
  }
  response = requests.get(alerts_url, params=params)
  return handle_response(response)


# Flatten and merge the data
def flatten_data(current_weather, forecast_weather, alerts):
  location_data = current_weather.get("location", {})
  current = current_weather.get("current", {})
  condition = current.get("condition", {})
  air_quality = current.get("air_quality", {})
  forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
  alert_list = alerts.get("alerts", {}).get("alert", [])

  flattened_data = {
    'name': location_data.get("name"),
    'region': location_data.get("region"),
    'country': location_data.get("country"),
    'lat': location_data.get("lat"),
    'lon': location_data.get("lon"),
    'localtime': location_data.get("localtime"),
    'temp_c': current.get('temp_c'), 
    'is_day': current.get('is_day'),
    'condition_text': condition.get('text'),
    'condition_icon': condition.get('icon'),
    'wind_kph': current.get('wind_kph'),
    'wind_degree': current.get('wind_degree'),
    'wind_dir': current.get('wind_dir'),
    'pressure_mb': current.get('pressure_mb'),
    'precip_mm': current.get('precip_mm'),
    'humidity': current.get('humidity'),
    'cloud': current.get('cloud'),
    'feelslike_c': current.get('feelslike_c'),
    'uv': current.get('uv'),

    'air_quality':{
      'co': air_quality.get('co'),
      'no2': air_quality.get('no2'),
      'o3': air_quality.get('o3'),
      'so2': air_quality.get('so2'),
      'pm2_5': air_quality.get('pm2_5'),
      'pm10': air_quality.get('pm10'),
      'us-epa-index': air_quality.get('us-epa-index'), 
      'gb-defra-index': air_quality.get('gb-defra-index'),
    }, 

    'alerts': [
      {
        'headline': alert.get('headline'),
        'severity': alert.get('severity'),
        'description': alert.get('description'),
        'instruction': alert.get('instruction')
      }
      for alert in alert_list
    ],

    'forecast': [
      {
        'date': day.get('date'),
        'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
        'mintemp_c': day.get('day', {}).get('mintemp_c'),
        'condition': day.get('day', {}).get('condition', {}).get('text')
      }
      for day in forecast
    ]
  }

  return flattened_data

def fetch_weather_data():
  base_url = "https://api.weatherapi.com/v1/"
  location = "Burgas"
  weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")

  # Get data from API
  current_weather = get_current_weather(base_url, weatherapikey, location)
  forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3)
  alerts = get_alerts(base_url, weatherapikey, location)

  # Flatten and merge data
  merged_data = flatten_data(current_weather, forecast_weather, alerts)
  
  return merged_data

# Initialize last sent time
# Update data every 30 seconds
last_sent_time = datetime.now() - timedelta(seconds=30)

def process_batch(batch_df, batch_id):
  global last_sent_time
  try:
      # Get current time
      current_time = datetime.now()
      
      # Check if X seconds have passed since last event was sent
      if (current_time - last_sent_time).total_seconds() >= 30:
        # Fetch weather data
        weather_data = fetch_weather_data()

        # Send the weather data 
        send_event(weather_data)

        # Update last sent time
        last_sent_time = current_time
        print(f"Event sent at {last_sent_time}")

  except Exception as e:
      print(f"Error sending events {batch_id}: {e}")
      raise e 

# Set up a streaming source
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Write the streaming data using foreachBatch to send weather data to Event Hub
query = streaming_df.writeStream.foreachBatch(process_batch).start()

query.awaitTermination()

# Call the main function
producer.close()

Event sent at 2025-01-14 15:33:43.085430
Event sent at 2025-01-14 15:34:13.878504


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can