### Event Test


### Key Vault Configuration

In [0]:
from azure.eventhub import EventHubProducerClient, EventData
import json

# Event Hub Configuration
event_hub_connection_string = dbutils.secrets.get(scope="key-vault-scope", key="eventhub-connection-string")
EVENT_HUB_NAME = "weatherstreamingeventhub"

# Initiate the event hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_string, eventhub_name=EVENT_HUB_NAME)

#Fucntion to send events to event hub
def send_events(event):
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

# Sample Json event
event = {
    "event_id" : 2222,
    "event_name" : "Key vault Test Event"
}

# Send the event
send_events(event)

# Close the producer
producer.close()


### API Testing


In [0]:
import json
import requests

# Getting secret value from Key Vault
weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")
location = "Chennai" 

base_url = "http://api.weatherapi.com/v1/"

current_weather_url = f"{base_url}/current.json"

params = {
    "key": weatherapikey,
    "q": location,
}

response = requests.get(current_weather_url, params=params)

if response.status_code == 200:
    current_weather = response.json()
    print("Current weather:")
    print(json.dumps(response.json(), indent=3))
else:
    print(f"Error: {response.status_code}, {response.text}")

Current weather:
{
   "location": {
      "name": "Chennai",
      "region": "Tamil Nadu",
      "country": "India",
      "lat": 13.0833,
      "lon": 80.2833,
      "tz_id": "Asia/Kolkata",
      "localtime_epoch": 1742118031,
      "localtime": "2025-03-16 15:10"
   },
   "current": {
      "last_updated_epoch": 1742117400,
      "last_updated": "2025-03-16 15:00",
      "temp_c": 32.1,
      "temp_f": 89.8,
      "is_day": 1,
      "condition": {
         "text": "Partly cloudy",
         "icon": "//cdn.weatherapi.com/weather/64x64/day/116.png",
         "code": 1003
      },
      "wind_mph": 11.4,
      "wind_kph": 18.4,
      "wind_degree": 114,
      "wind_dir": "ESE",
      "pressure_mb": 1010.0,
      "pressure_in": 29.83,
      "precip_mm": 0.0,
      "precip_in": 0.0,
      "humidity": 67,
      "cloud": 50,
      "feelslike_c": 38.0,
      "feelslike_f": 100.3,
      "windchill_c": 29.8,
      "windchill_f": 85.6,
      "heatindex_c": 32.9,
      "heatindex_f": 91.2,
     


## Getting Weather Data

In [0]:
import requests
import json

#function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"
    
# Function to get the current weather and air quality data
def get_current_weathe(base_url,api_key,location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        "key": api_key,
        "q": location,
        "aqi": "yes"
        }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get the forecast weather data
def get_forecast_weather(base_url,api_key,location, days):
    forecast_weather_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }
    response = requests.get(forecast_weather_url, params=params)
    return handle_response(response)

# Function to get Alerts
def get_alerts(base_url,api_key,location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        "key": api_key,
        "q": location,
        "alerts": "yes"
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_and_merge(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alerts_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get("name"),
        'region': location_data.get("region"),
        'country': location_data.get("country"),
        'lat': location_data.get("lat"),
        'lon': location_data.get("lon"),
        'localtime': current.get("localtime"),
        'temp_c': current.get("temp_c"),
        'is_day': current.get("is_day"),
        'condition_text': condition.get("text"),
        'condition_icon': condition.get("icon"),
        'wind_kph': current.get("wind_kph"),
        'wind_degree': current.get("wind_degree"),
        'wind_dir': current.get("wind_dir"),
        'pressure_mb': current.get('pressure_mb'),
        'pressure_in': current.get('pressure_in'),
        'precip_mm': current.get('precip_mm'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality' : {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us_epa_index'),
            'gb-defra-index': air_quality.get('gb_defra_index'),
        },
        'alerts' : [
            {
            'headline': alert.get('headline'),
            'description': alert.get('description'),
            'severity': alert.get('severity'),
            'instruction': alert.get('instruction'),
            }
                    for alert in alerts_list
        ],
        'forecast' : [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day',{}).get('maxtemp_c'),
                'mintemp_c': day.get('day',{}).get('mintemp_c'),
                'condition': day.get('day',{}).get('condition',{}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data

# Main Program

def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"
    location = "Manama"
    weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")

    # Get data from API
    current_weather = get_current_weathe(base_url,weatherapikey,location)
    forecast_weather = get_forecast_weather(base_url,weatherapikey,location, 5)
    alerts = get_alerts(base_url,weatherapikey,location)

    # Flatten and merge data
    merged_data =  flatten_and_merge(current_weather, forecast_weather, alerts)
    print("Weather data:", json.dumps(merged_data, indent=3))

# Calling the main program
fetch_weather_data()

Weather data: {
   "name": "Manama",
   "region": "Al Manamah",
   "country": "Bahrain",
   "lat": 26.2361,
   "lon": 50.5831,
   "localtime": null,
   "temp_c": 27.3,
   "is_day": 1,
   "condition_text": "Sunny",
   "condition_icon": "//cdn.weatherapi.com/weather/64x64/day/113.png",
   "wind_kph": 14.0,
   "wind_degree": 5,
   "wind_dir": "N",
   "pressure_mb": 1017.0,
   "pressure_in": 30.03,
   "precip_mm": 0.0,
   "humidity": 30,
   "cloud": 0,
   "feelslike_c": 28.6,
   "uv": 6.0,
   "air_quality": {
      "co": 821.4,
      "no2": 63.64,
      "o3": 76.0,
      "so2": 23.495,
      "pm2_5": 52.17,
      "pm10": 86.21,
      "us-epa-index": null,
      "gb-defra-index": null
   },
   "alerts": [],
   "forecast": [
      {
         "date": "2025-03-16",
         "maxtemp_c": 21.8,
         "mintemp_c": 19.9,
         "condition": "Partly Cloudy "
      },
      {
         "date": "2025-03-17",
         "maxtemp_c": 22.3,
         "mintemp_c": 20.4,
         "condition": "Sunny"
   

In [0]:
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData

# Event Hub Configuration
event_hub_connection_string = dbutils.secrets.get(scope="key-vault-scope", key="eventhub-connection-string")
EVENT_HUB_NAME = "weatherstreamingeventhub"

# Initiate the event hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_string, eventhub_name=EVENT_HUB_NAME)

#Fucntion to send events to event hub
def send_events(event):
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

#function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"
    
# Function to get the current weather and air quality data
def get_current_weathe(base_url,api_key,location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        "key": api_key,
        "q": location,
        "aqi": "yes"
        }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get the forecast weather data
def get_forecast_weather(base_url,api_key,location, days):
    forecast_weather_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }
    response = requests.get(forecast_weather_url, params=params)
    return handle_response(response)

# Function to get Alerts
def get_alerts(base_url,api_key,location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        "key": api_key,
        "q": location,
        "alerts": "yes"
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_and_merge(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alerts_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get("name"),
        'region': location_data.get("region"),
        'country': location_data.get("country"),
        'lat': location_data.get("lat"),
        'lon': location_data.get("lon"),
        'localtime': current.get("localtime"),
        'temp_c': current.get("temp_c"),
        'is_day': current.get("is_day"),
        'condition_text': condition.get("text"),
        'condition_icon': condition.get("icon"),
        'wind_kph': current.get("wind_kph"),
        'wind_degree': current.get("wind_degree"),
        'wind_dir': current.get("wind_dir"),
        'pressure_mb': current.get('pressure_mb'),
        'pressure_in': current.get('pressure_in'),
        'precip_mm': current.get('precip_mm'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality' : {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us_epa_index'),
            'gb-defra-index': air_quality.get('gb_defra_index'),
        },
        'alerts' : [
            {
            'headline': alert.get('headline'),
            'description': alert.get('description'),
            'severity': alert.get('severity'),
            'instruction': alert.get('instruction'),
            }
                    for alert in alerts_list
        ],
        'forecast' : [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day',{}).get('maxtemp_c'),
                'mintemp_c': day.get('day',{}).get('mintemp_c'),
                'condition': day.get('day',{}).get('condition',{}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data

# Main Program

def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"
    location = "Manama"
    weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")

    # Get data from API
    current_weather = get_current_weathe(base_url,weatherapikey,location)
    forecast_weather = get_forecast_weather(base_url,weatherapikey,location, 5)
    alerts = get_alerts(base_url,weatherapikey,location)

    # Flatten and merge data
    merged_data =  flatten_and_merge(current_weather, forecast_weather, alerts)
    
    # Sending Weather data to Event Hub
    send_events(merged_data)

# Calling the main program
fetch_weather_data()


## Streamin Weather data

In [0]:
from azure.eventhub import EventHubProducerClient, EventData
from datetime import datetime, timedelta
import requests
import json

# Event Hub Configuration
event_hub_connection_string = dbutils.secrets.get(scope="key-vault-scope", key="eventhub-connection-string")
EVENT_HUB_NAME = "weatherstreamingeventhub"

# Initiate the event hub producer
producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_string, eventhub_name=EVENT_HUB_NAME)

#Fucntion to send events to event hub
def send_events(event):
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

#function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"
    
# Function to get the current weather and air quality data
def get_current_weathe(base_url,api_key,location):
    current_weather_url = f"{base_url}/current.json"
    params = {
        "key": api_key,
        "q": location,
        "aqi": "yes"
        }
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

# Function to get the forecast weather data
def get_forecast_weather(base_url,api_key,location, days):
    forecast_weather_url = f"{base_url}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }
    response = requests.get(forecast_weather_url, params=params)
    return handle_response(response)

# Function to get Alerts
def get_alerts(base_url,api_key,location):
    alerts_url = f"{base_url}/alerts.json"
    params = {
        "key": api_key,
        "q": location,
        "alerts": "yes"
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_and_merge(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alerts_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get("name"),
        'region': location_data.get("region"),
        'country': location_data.get("country"),
        'lat': location_data.get("lat"),
        'lon': location_data.get("lon"),
        'localtime': current.get("localtime"),
        'temp_c': current.get("temp_c"),
        'is_day': current.get("is_day"),
        'condition_text': condition.get("text"),
        'condition_icon': condition.get("icon"),
        'wind_kph': current.get("wind_kph"),
        'wind_degree': current.get("wind_degree"),
        'wind_dir': current.get("wind_dir"),
        'pressure_mb': current.get('pressure_mb'),
        'pressure_in': current.get('pressure_in'),
        'precip_mm': current.get('precip_mm'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality' : {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us_epa_index'),
            'gb-defra-index': air_quality.get('gb_defra_index'),
        },
        'alerts' : [
            {
            'headline': alert.get('headline'),
            'description': alert.get('description'),
            'severity': alert.get('severity'),
            'instruction': alert.get('instruction'),
            }
                    for alert in alerts_list
        ],
        'forecast' : [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day',{}).get('maxtemp_c'),
                'mintemp_c': day.get('day',{}).get('mintemp_c'),
                'condition': day.get('day',{}).get('condition',{}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data

# Main Program
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"
    location = "Manama"
    weatherapikey = dbutils.secrets.get(scope="key-vault-scope", key="weatherapikey")

    # Get data from API
    current_weather = get_current_weathe(base_url,weatherapikey,location)
    forecast_weather = get_forecast_weather(base_url,weatherapikey,location, 5)
    alerts = get_alerts(base_url,weatherapikey,location)

    # Flatten and merge data
    merged_data =  flatten_and_merge(current_weather, forecast_weather, alerts)
    return merged_data

# Function to process each batch of streaming data
last_sent_time  = datetime.now() - timedelta(seconds=30) # Initiate the sent time to 30 seconds 

def process_batch(batch_df, batch_id):
    global last_sent_time
    try:
        current_time = datetime.now()
        if (current_time - last_sent_time).total_seconds() > 30:
            # Fetch weathe data
            weather_data= fetch_weather_data()

            # Send weather data
            send_events(weather_data)
            
            # Update the sent time
            last_sent_time = current_time
            print(f"Sending weather data to event hub at {current_time}")

    except Exception as e:
        print(f"Error processing batch {batch_id}: {str(e)}")
        raise e

# set up streaming source to read data from event hub
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Write the streaming data using foreachBatch to send weather data to event hub
query = streaming_df.writeStream.foreachBatch(process_batch).start()

query.awaitTermination()

# Close the producer after termination
producer.close()