In [0]:

# test the connection to the event hub by sending a test event
from azure.eventhub import EventHubProducerClient, EventData
import json

#event hub config
EVENT_HUB_CONNECTION_STRING = "enter the connection string of your event hub"
EVENT_HUB_NAME = "weatherstreamg"

#intialize event producer
producer = EventHubProducerClient.from_connection_string(conn_str=EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME)

def send_event(event):
    event_data_batch=producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

event={
     "event_id": 4500,
     "event_type":"Test Event"
}

send_event(event)
producer.close()

In [0]:

#test the connection to the event hub by sending a test event using key vault scope in databricks
from azure.eventhub import EventHubProducerClient, EventData
import json

#event hub config
event_connection_string=dbutils.secrets.get(scope="keyvaultscope-name", key="event-connection-string")
EVENT_HUB_NAME = "weatherstreamg"

#intialize event producer
producer = EventHubProducerClient.from_connection_string(conn_str=event_connection_string, eventhub_name=EVENT_HUB_NAME)

def send_event(event):
    event_data_batch=producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

event={
     "event_id": 9999,
     "event_type":"Key Test Event"

}

send_event(event)

producer.close()

## API Test

In [0]:
import json
import requests

apikey=dbutils.secrets.get(scope="scope name", key="apikey name") 
location="Pune"
baseurl="http://api.weatherapi.com/v1/"
curr_url=f"{baseurl}/current.json"

params={"key":apikey,"q":location}
response=requests.get(curr_url, params=params)

if response.status_code==200:
  curr_weather = response.json()
  print("Current Weather:")
  print(json.dumps(curr_weather, indent=3))
else:
  print(f"Error: {response.status_code}, {response.text}")

## Get weather data


In [0]:
# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_curr_weather(baseurl, api_key, location):
    curr_weather_url = f"{baseurl}/current.json"
    params = {
        'key': api_key,
        'q': location,
        "aqi": 'yes'
    }
    response = requests.get(curr_weather_url, params=params)
    return handle_response(response)

# Function to get Forecast Data
def get_forecast_weather(baseurl, api_key, location, days):
    forecast_url = f"{baseurl}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get Alerts
def get_alerts(baseurl, api_key, location):
    alerts_url = f"{baseurl}/alerts.json"
    params = {
        'key': api_key,
        'q': location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_data(curr_weather, forecast_weather, alerts):
    location_data = curr_weather.get("location", {})
    current = curr_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction')
            }
            for alert in alert_list
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data
  

def fetch_weather_data():

    baseurl = "http://api.weatherapi.com/v1/"
    location = "Pune"
    apikey = dbutils.secrets.get(scope="scope-name", key="apikey")

    # Get data from API
    curr_weather = get_curr_weather(baseurl, apikey, location)
    forecast_weather = get_forecast_weather(baseurl, apikey, location, 3)
    alerts = get_alerts(baseurl, apikey, location)

    # Flatten and merge data
    merged_data = flatten_data(curr_weather, forecast_weather, alerts)
    print("Weather Data:", json.dumps(merged_data, indent=3))

# Calling the Main Program
fetch_weather_data()

Weather Data: {
   "name": "Pune",
   "region": "Maharashtra",
   "country": "India",
   "lat": 18.5333,
   "lon": 73.8667,
   "localtime": "2025-02-01 19:59",
   "temp_c": 24.9,
   "is_day": 0,
   "condition_text": "Clear",
   "condition_icon": "//cdn.weatherapi.com/weather/64x64/night/113.png",
   "wind_kph": 13.0,
   "wind_degree": 298,
   "wind_dir": "WNW",
   "pressure_in": 29.87,
   "precip_in": 0.0,
   "humidity": 26,
   "cloud": 4,
   "feelslike_c": 24.4,
   "uv": 0.0,
   "air_quality": {
      "co": 545.75,
      "no2": 4.07,
      "o3": 185.0,
      "so2": 17.205,
      "pm2_5": 51.985,
      "pm10": 53.65,
      "us-epa-index": 3,
      "gb-defra-index": 6
   },
   "alerts": [],
   "forecast": [
      {
         "date": "2025-02-01",
         "maxtemp_c": 32.5,
         "mintemp_c": 18.1,
         "condition": "Sunny"
      },
      {
         "date": "2025-02-02",
         "maxtemp_c": 32.5,
         "mintemp_c": 19.5,
         "condition": "Sunny"
      },
      {
        

## Send test event to hub

In [0]:
from azure.eventhub import EventHubProducerClient, EventData


#event hub config
event_connection_string=dbutils.secrets.get(scope="scope-name", key="event-connection-string")
EVENT_HUB_NAME = "weatherstreamg"

#intialize event producer
producer = EventHubProducerClient.from_connection_string(conn_str=event_connection_string, eventhub_name=EVENT_HUB_NAME)

def send_event(event):
    event_data_batch=producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_curr_weather(baseurl, api_key, location):
    curr_weather_url = f"{baseurl}/current.json"
    params = {
        'key': api_key,
        'q': location,
        "aqi": 'yes'
    }
    response = requests.get(curr_weather_url, params=params)
    return handle_response(response)

# Function to get Forecast Data
def get_forecast_weather(baseurl, api_key, location, days):
    forecast_url = f"{baseurl}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get Alerts
def get_alerts(baseurl, api_key, location):
    alerts_url = f"{baseurl}/alerts.json"
    params = {
        'key': api_key,
        'q': location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_data(curr_weather, forecast_weather, alerts):
    location_data = curr_weather.get("location", {})
    current = curr_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction')
            }
            for alert in alert_list 
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data
  ḥ

def fetch_weather_data():

    baseurl = "http://api.weatherapi.com/v1/"
    location = "Pune"  # You can replace with any city name based on your preference
    apikey = dbutils.secrets.get(scope="scope-name", key="apikey")

    # Get data from API
    curr_weather = get_curr_weather(baseurl, apikey, location)
    forecast_weather = get_forecast_weather(baseurl, apikey, location, 3)
    alerts = get_alerts(baseurl, apikey, location)

    # Flatten and merge data
    merged_data = flatten_data(curr_weather, forecast_weather, alerts)

    send_event(merged_data)

fetch_weather_data()



## Send stream event to hub

In [0]:
from azure.eventhub import EventHubProducerClient, EventData


#event hub config
event_connection_string=dbutils.secrets.get(scope="scope-name", key="event-connection-string")
EVENT_HUB_NAME = "weatherstreamg"

#intialize event producer
producer = EventHubProducerClient.from_connection_string(conn_str=event_connection_string, eventhub_name=EVENT_HUB_NAME)

def send_event(event):
    event_data_batch=producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_curr_weather(baseurl, api_key, location):
    curr_weather_url = f"{baseurl}/current.json"
    params = {
        'key': api_key,
        'q': location,
        "aqi": 'yes'
    }
    response = requests.get(curr_weather_url, params=params)
    return handle_response(response)

# Function to get Forecast Data
def get_forecast_weather(baseurl, api_key, location, days):
    forecast_url = f"{baseurl}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get Alerts
def get_alerts(baseurl, api_key, location):
    alerts_url = f"{baseurl}/alerts.json"
    params = {
        'key': api_key,
        'q': location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_data(curr_weather, forecast_weather, alerts):
    location_data = curr_weather.get("location", {})
    current = curr_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction')
            }
            for alert in alert_list
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data
  

def fetch_weather_data():

    baseurl = "http://api.weatherapi.com/v1/"
    location = "Pune"  # You can replace with any city name based on your preference
    apikey = dbutils.secrets.get(scope="scope-name", key="apikey")

    # Get data from API
    curr_weather = get_curr_weather(baseurl, apikey, location)
    forecast_weather = get_forecast_weather(baseurl, apikey, location, 3)
    alerts = get_alerts(baseurl, apikey, location)

    # Flatten and merge data
    merged_data = flatten_data(curr_weather, forecast_weather, alerts)

    send_event(merged_data)

def process_batch(batch_df, batch_id):
    try:     
        # Fetch weather data
        weather_data = fetch_weather_data()
        
        # Send the weather data (current weather part)
        send_event(weather_data)

    except Exception as e:
        print(f"Error sending events in batch {batch_id}: {str(e)}")
        raise e

streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

#foreachBatch to send weather data to Event Hub
query = streaming_df.writeStream.foreachBatch(process_batch).start()

query.awaitTermination()

producer.close()


## Send data to event hub every hour

In [0]:
from azure.eventhub import EventHubProducerClient, EventData
from datetime import datetime, timedelta


#event hub config
event_connection_string=dbutils.secrets.get(scope="scope-name", key="event-connection-string")
EVENT_HUB_NAME = "weatherstreamg"

#intialize event producer
producer = EventHubProducerClient.from_connection_string(conn_str=event_connection_string, eventhub_name=EVENT_HUB_NAME)

def send_event(event):
    event_data_batch=producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

# Function to handle the API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        return f"Error: {response.status_code}, {response.text}"

# Function to get current weather and air quality data
def get_curr_weather(baseurl, api_key, location):
    curr_weather_url = f"{baseurl}/current.json"
    params = {
        'key': api_key,
        'q': location,
        "aqi": 'yes'
    }
    response = requests.get(curr_weather_url, params=params)
    return handle_response(response)

# Function to get Forecast Data
def get_forecast_weather(baseurl, api_key, location, days):
    forecast_url = f"{baseurl}/forecast.json"
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

# Function to get Alerts
def get_alerts(baseurl, api_key, location):
    alerts_url = f"{baseurl}/alerts.json"
    params = {
        'key': api_key,
        'q': location,
        "alerts": 'yes'
    }
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# Flatten and merge the data
def flatten_data(curr_weather, forecast_weather, alerts):
    location_data = curr_weather.get("location", {})
    current = curr_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    flattened_data = {
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction')
            }
            for alert in alert_list
        ],
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text')
            }
            for day in forecast
        ]
    }
    return flattened_data
  

def fetch_weather_data():

    baseurl = "http://api.weatherapi.com/v1/"
    location = "Pune"  # You can replace with any city name based on your preference
    apikey = dbutils.secrets.get(scope="scope-name", key="apikey")

    # Get data from API
    curr_weather = get_curr_weather(baseurl, apikey, location)
    forecast_weather = get_forecast_weather(baseurl, apikey, location, 3)
    alerts = get_alerts(baseurl, apikey, location)

    # Flatten and merge data
    merged_data = flatten_data(curr_weather, forecast_weather, alerts)

    send_event(merged_data)

ast_sent_time = datetime.now() - timedelta(seconds=30)  # Initialize last sent time


# Main program
def process_batch(batch_df, batch_id):
    global last_sent_time
    try:
        # Get current time
        current_time = datetime.now()
        
        # Check if 30 seconds have passed since last event was sent
        if (current_time - last_sent_time).total_seconds() >= 30:
            # Fetch weather data
            weather_data = fetch_weather_data()
            
            # Send the weather data (current weather part)
            send_event(weather_data)

            # Update last sent time
            last_sent_time = current_time
            print(f'Event Sent at {last_sent_time}')

    except Exception as e:
        print(f"Error sending events in batch {batch_id}: {str(e)}")
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

#foreachBatch to send weather data to Event Hub
query = streaming_df.writeStream.foreachBatch(process_batch).start()

query.awaitTermination()

producer.close()
