## Sending Test Event from dbw to Event Hub without secret (directly)

In [0]:
# Import the necessary classes from the Azure Event Hubs SDK
from azure.eventhub import EventHubProducerClient, EventData
import json

# ──────────────────────────────────────────────────────────────────────────────
# 1. Event Hub Configuration
# ──────────────────────────────────────────────────────────────────────────────

# Replace with your Event Hub namespace connection string, Event hub -> shared access policies -> open our databrick policy -> copy connection string primary key 
EVENT_HUB_CONNECTION_STRING = "Endpoint=sb://eventh-weather-streaming-namespace.servicebus.windows.net/;SharedAccessKeyName=for-databricks;SharedAccessKey=viPgu7KNfVILLyz+A7X2vP0uI1HBbXnEN+AEhI4oOqg=;EntityPath=weather-streaming-event-hub"

# Replace with your Event Hub name (the specific hub within the namespace)
EVENT_HUB_NAME = "weather-streaming-event-hub"

# ──────────────────────────────────────────────────────────────────────────────
# 2. Initialize the Event Hub Producer
# ──────────────────────────────────────────────────────────────────────────────

# Create a producer client to send messages to the Event Hub, responsible for sending the events towards the event hub
producer = EventHubProducerClient.from_connection_string(
    conn_str=EVENT_HUB_CONNECTION_STRING,
    eventhub_name=EVENT_HUB_NAME
)

# ──────────────────────────────────────────────────────────────────────────────
# 3. Define a Function to Send Events
# ──────────────────────────────────────────────────────────────────────────────

def send_event(event: dict):
    """
    Sends a single JSON-serializable event to Azure Event Hub.

    Parameters:
        event (dict): The event payload to send.
    """
    # Create a new batch. Batching helps optimize throughput.
    event_data_batch = producer.create_batch()
    
    # Serialize the event dict to a JSON string and wrap it in EventData
    event_json = json.dumps(event)
    event_data = EventData(event_json)

    """
    event = {
    "event_id": 42,
    "event_name": "temperature_reading",
    "value_celsius": 23.7,
    "timestamp": "2025-05-27T12:34:56Z"
    }
    
    event_json = json.dumps(event)
    print(event_json)
    # Output:
    # {"event_id": 42, "event_name": "temperature_reading", "value_celsius": 23.7, "timestamp": "2025-05-27T12:34:56Z"}

    # 3) Wrap that JSON string in an EventData object:
    event_data = EventData(event_json)

    # Internally, EventData stores the JSON as its body (in bytes). 
    # You can inspect it in Databricks like so:
    print(event_data.body_as_str(encoding='UTF-8'))
    # Output:
    # {"event_id": 42, "event_name": "temperature_reading", "value_celsius": 23.7, "timestamp": "2025-05-27T12:34:56Z"}

    """
    
    # Add this EventData to the batch
    event_data_batch.add(event_data)
    
    # Send the batch to the Event Hub
    producer.send_batch(event_data_batch)
    print(f"Sent event: {event_json}")

# ──────────────────────────────────────────────────────────────────────────────
# 4. Create a Sample Event and Send It
# ──────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    # Example payload — can be any JSON-serializable content
    sample_event = {
        "event_id": 1111,
        "event_name": "Test Event"
    }
    
    # Send the sample event to validate connectivity
    send_event(sample_event)
    
    # ──────────────────────────────────────────────────────────────────────────
    # 5. Clean Up
    # ──────────────────────────────────────────────────────────────────────────
    
    # Close the producer to free up resources
    producer.close()
    print("Event Hub producer closed.")

## Sending Test Event from dbw to Event Hub with secret

In [0]:
# Import the necessary classes from the Azure Event Hubs SDK
from azure.eventhub import EventHubProducerClient, EventData
import json

# ──────────────────────────────────────────────────────────────────────────────
# 1. Event Hub Configuration
# ──────────────────────────────────────────────────────────────────────────────


# Retrieve Connection String Securely from Key Vault
# ──────────────────────────────────────────────────────────────────────────────
# This assumes you've already created a Databricks secret scope named "key-vault-secret-scope"
# and stored your Event Hub connection string under the secret name "eventhub-connection-string-secret".
eventhub_connection_string = dbutils.secrets.get(
    scope="key-vault-secret-scope",
    key="eventhub-connection-string-secret"
)

# Replace with your Event Hub name (the specific hub within the namespace)
EVENT_HUB_NAME = "weather-streaming-event-hub"

# ──────────────────────────────────────────────────────────────────────────────
# 2. Initialize the Event Hub Producer
# ──────────────────────────────────────────────────────────────────────────────

# Create a producer client to send messages to the Event Hub, responsidble for sending the vents towards the event hub
producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_string,
    eventhub_name=EVENT_HUB_NAME
)

# ──────────────────────────────────────────────────────────────────────────────
# 3. Define a Function to Send Events
# ──────────────────────────────────────────────────────────────────────────────

def send_event(event: dict):
    """
    Sends a single JSON-serializable event to Azure Event Hub.

    Parameters:
        event (dict): The event payload to send.
    """
    # Create a new batch. Batching helps optimize throughput.
    event_data_batch = producer.create_batch()
    
    # Serialize the event dict to a JSON string and wrap it in EventData
    event_json = json.dumps(event)
    event_data = EventData(event_json)
    
    # Add this EventData to the batch
    event_data_batch.add(event_data)
    
    # Send the batch to the Event Hub
    producer.send_batch(event_data_batch)
    print(f"Sent event: {event_json}")

# ──────────────────────────────────────────────────────────────────────────────
# 4. Create a Sample Event and Send It
# ──────────────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    # Example payload — can be any JSON-serializable content
    sample_event = {
        "event_id": 2222,
        "event_name": "Secret scope test"
    }
    
    # Send the sample event to validate connectivity
    send_event(sample_event)
    
    # ──────────────────────────────────────────────────────────────────────────
    # 5. Clean Up
    # ──────────────────────────────────────────────────────────────────────────
    
    # Close the producer to free up resources
    producer.close()
    print("Event Hub producer closed.")

## API Testing

In [0]:
# ──────────────────────────────────────────────────────────────────────────────
# 1. Imports
# ──────────────────────────────────────────────────────────────────────────────
import requests    # For making HTTP requests to the Weather API
import json        # For parsing and pretty-printing JSON responses

# ──────────────────────────────────────────────────────────────────────────────
# 2. Retrieve API Key from Azure Key Vault
# ──────────────────────────────────────────────────────────────────────────────
# This assumes you have a Databricks secret scope named "key-vault-scope"
# and that you've stored your Weather API key under the secret name "weather-api-key" in Azure key vault.
weather_api_key = dbutils.secrets.get(
    scope="key-vault-secret-scope",
    key="weather-api-key"
)

# ──────────────────────────────────────────────────────────────────────────────
# 3. Define the API Endpoint and Parameters
# ──────────────────────────────────────────────────────────────────────────────
location = "Kolkata"                   # City for which you want the weather
base_url = "http://api.weatherapi.com/v1"
current_url = f"{base_url}/current.json"

# The Weather API expects two query parameters:
#  - 'key': your API key
#  - 'q':  the location string (city name, coordinates, ZIP, etc.)
params = {
    "key": weather_api_key,
    "q": location
}

# ──────────────────────────────────────────────────────────────────────────────
# 4. Make the HTTP GET Request
# ──────────────────────────────────────────────────────────────────────────────
response = requests.get(current_url, params=params)

# ──────────────────────────────────────────────────────────────────────────────
# 5. Handle the Response
# ──────────────────────────────────────────────────────────────────────────────
if response.status_code == 200:
    # 200 means Success: parse the JSON payload
    current_weather = response.json()
    
    # Pretty-print the weather data
    print("Current Weather:")
    print(json.dumps(current_weather, indent=2))
else:
    # Something went wrong: print status code and error text
    print(f"Error: {response.status_code}, {response.text}")


## Code for getting weather data

In [0]:
# # COMMAND ----------
# ### Complete code for getting weather data

# # COMMAND ----------

# ──────────────────────────────────────────────────────────────────────────────
# 0) Imports for full-featured API functions
# ──────────────────────────────────────────────────────────────────────────────
import requests
import json

# ──────────────────────────────────────────────────────────────────────────────
# 1) Helper: Uniform response handling
# ──────────────────────────────────────────────────────────────────────────────
def handle_response(response):
    """
    Returns parsed JSON on HTTP 200, otherwise an error message.
    """
    if response.status_code == 200:
        return response.json()
    return {"error": f"{response.status_code}: {response.text}"}

# ──────────────────────────────────────────────────────────────────────────────
# 2) Fetch current weather + AQI
# ──────────────────────────────────────────────────────────────────────────────
def get_current_weather(base_url, api_key, location):
    url = f"{base_url}current.json"
    params = {"key": api_key, "q": location, "aqi": "yes"}
    response = requests.get(url, params=params)
    return handle_response(response)

# ──────────────────────────────────────────────────────────────────────────────
# 3) Fetch multi-day forecast
# ──────────────────────────────────────────────────────────────────────────────
def get_forecast_weather(base_url, api_key, location, days):
    url = f"{base_url}/forecast.json"
    params = {"key": api_key, "q": location, "days": days}
    response = requests.get(url, params=params)
    return handle_response(response)

# ──────────────────────────────────────────────────────────────────────────────
# 4) Fetch weather alerts
# ──────────────────────────────────────────────────────────────────────────────
def get_alerts(base_url, api_key, location):
    url = f"{base_url}/alerts.json"
    params = {"key": api_key, "q": location, "alerts": "yes"}
    response = requests.get(url, params=params)
    return handle_response(response)

# ──────────────────────────────────────────────────────────────────────────────
# 5) Flatten & merge disparate API responses
# ──────────────────────────────────────────────────────────────────────────────
def flatten_data(current_weather, forecast_weather, alerts):
    """
    Extracts key fields from each response and merges them into one flat dict.
    """
    # Location & current conditions
    loc = current_weather.get("location", {})
    curr = current_weather.get("current", {})
    cond = curr.get("condition", {})
    aqi = curr.get("air_quality", {})
    
    # Forecast days list and alerts list
    forecast_days = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_items = alerts.get("alerts", {}).get("alert", [])
    
    # Build a single flat record
    flattened = {
        # Location metadata
        "name": loc.get("name"),
        "region": loc.get("region"),
        "country": loc.get("country"),
        "lat": loc.get("lat"),
        "lon": loc.get("lon"),
        "localtime": loc.get("localtime"),
        # Current weather details
        "temp_c": curr.get("temp_c"),
        "condition_text": cond.get("text"),
        "wind_kph": curr.get("wind_kph"),
        "humidity": curr.get("humidity"),
        # Air quality sub-fields
        "air_quality": {k: aqi.get(k) for k in aqi.keys()},
        # Active alerts (if any)
        "alerts": [
            {
                "headline": a.get("headline"),
                "severity": a.get("severity"),
                "description": a.get("desc")
            }
            for a in alert_items
        ],
        # 3-day forecast summary
        "forecast": [
            {
                "date": d.get("date"),
                "maxtemp_c": d.get("day", {}).get("maxtemp_c"),
                "mintemp_c": d.get("day", {}).get("mintemp_c"),
                "condition": d.get("day", {}).get("condition", {}).get("text")
            }
            for d in forecast_days
        ]
    }
    return flattened

# ──────────────────────────────────────────────────────────────────────────────
# 6) Main orchestration function
# ──────────────────────────────────────────────────────────────────────────────
def fetch_weather_data():
    """
    Retrieves current, forecast, and alert data, merges it,
    and prints a single consolidated JSON payload.
    """
    base_url = "http://api.weatherapi.com/v1/"
    location = "Kolkata"
    api_key = dbutils.secrets.get(scope="key-vault-secret-scope", key="weather-api-key")
    
    # 1. Pull raw data
    current = get_current_weather(base_url, api_key, location)
    forecast = get_forecast_weather(base_url, api_key, location, days=3)
    alerts = get_alerts(base_url, api_key, location)
    
    # 2. Normalize & merge into one record
    merged = flatten_data(current, forecast, alerts)
    
    # 3. Output for inspection or downstream processing
    print("Weather Data:")
    print(json.dumps(merged, indent=3))

# ──────────────────────────────────────────────────────────────────────────────
# 7) Kick off the pipeline
# ──────────────────────────────────────────────────────────────────────────────
fetch_weather_data()



## Sending the complete weather data to Event Hub


#### Combining above 2 code blocks together (Above we have the complete code to get weather data + Script to send data to event hub =  Sending weather data as a single event in event hub)

In [0]:
# Import libraries for HTTP requests, JSON handling, and Event Hub communication
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData

# ────────────────────────────────────────────────────────────────
# 1. Configure Event Hub connection
# ────────────────────────────────────────────────────────────────

# Securely retrieve the Event Hub connection string from Azure Key Vault
eventhub_connection_string = dbutils.secrets.get(
    scope="key-vault-secret-scope",
    key="eventhub-connection-string-secret"
)

# Define the Event Hub name where the event will be published
EVENT_HUB_NAME = "weather-streaming-event-hub"

# Initialize the Event Hub producer client using the secure connection string
producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_string, 
    eventhub_name=EVENT_HUB_NAME
)

# ────────────────────────────────────────────────────────────────
# 2. Function to send an event (JSON) to Azure Event Hub
# ────────────────────────────────────────────────────────────────

def send_event(event):
    # Create a new batch to group events before sending
    event_data_batch = producer.create_batch()
    
    # Convert the event dictionary into a JSON string and wrap it in EventData
    event_data_batch.add(EventData(json.dumps(event)))
    
    # Send the batch to the Event Hub
    producer.send_batch(event_data_batch)

# ────────────────────────────────────────────────────────────────
# 3. Helper function to handle HTTP response
# ────────────────────────────────────────────────────────────────

def handle_response(response):
    if response.status_code == 200:
        # Return parsed JSON if request was successful
        return response.json()
    else:
        # Return an error message if request failed
        return f"Error: {response.status_code}, {response.text}"

# ────────────────────────────────────────────────────────────────
# 4. Functions to retrieve weather data from the API
# ────────────────────────────────────────────────────────────────

def get_current_weather(base_url, api_key, location):
    # Call the current weather API including air quality info
    current_weather_url = f"{base_url}/current.json"
    params = {'key': api_key, 'q': location, "aqi": 'yes'}
    response = requests.get(current_weather_url, params=params)
    return handle_response(response)

def get_forecast_weather(base_url, api_key, location, days):
    # Call the forecast API for specified number of days
    forecast_url = f"{base_url}/forecast.json"
    params = {"key": api_key, "q": location, "days": days}
    response = requests.get(forecast_url, params=params)
    return handle_response(response)

def get_alerts(base_url, api_key, location):
    # Call the alerts API to check for severe weather
    alerts_url = f"{base_url}/alerts.json"
    params = {'key': api_key, 'q': location, "alerts": 'yes'}
    response = requests.get(alerts_url, params=params)
    return handle_response(response)

# ────────────────────────────────────────────────────────────────
# 5. Merge and flatten all API responses into a single dict
# ────────────────────────────────────────────────────────────────

def flatten_data(current_weather, forecast_weather, alerts):
    # Safely extract sections from nested API responses
    location_data = current_weather.get("location", {})
    current = current_weather.get("current", {})
    condition = current.get("condition", {})
    air_quality = current.get("air_quality", {})
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])
    alert_list = alerts.get("alerts", {}).get("alert", [])

    # Create a clean, structured event record
    flattened_data = {
        # Location metadata
        'name': location_data.get('name'),
        'region': location_data.get('region'),
        'country': location_data.get('country'),
        'lat': location_data.get('lat'),
        'lon': location_data.get('lon'),
        'localtime': location_data.get('localtime'),

        # Current weather conditions
        'temp_c': current.get('temp_c'),
        'is_day': current.get('is_day'),
        'condition_text': condition.get('text'),
        'condition_icon': condition.get('icon'),
        'wind_kph': current.get('wind_kph'),
        'wind_degree': current.get('wind_degree'),
        'wind_dir': current.get('wind_dir'),
        'pressure_in': current.get('pressure_in'),
        'precip_in': current.get('precip_in'),
        'humidity': current.get('humidity'),
        'cloud': current.get('cloud'),
        'feelslike_c': current.get('feelslike_c'),
        'uv': current.get('uv'),

        # Air quality measurements
        'air_quality': {
            'co': air_quality.get('co'),
            'no2': air_quality.get('no2'),
            'o3': air_quality.get('o3'),
            'so2': air_quality.get('so2'),
            'pm2_5': air_quality.get('pm2_5'),
            'pm10': air_quality.get('pm10'),
            'us-epa-index': air_quality.get('us-epa-index'),
            'gb-defra-index': air_quality.get('gb-defra-index')
        },

        # Weather alerts
        'alerts': [
            {
                'headline': alert.get('headline'),
                'severity': alert.get('severity'),
                'description': alert.get('desc'),
                'instruction': alert.get('instruction')
            }
            for alert in alert_list
        ],

        # 3-day forecast
        'forecast': [
            {
                'date': day.get('date'),
                'maxtemp_c': day.get('day', {}).get('maxtemp_c'),
                'mintemp_c': day.get('day', {}).get('mintemp_c'),
                'condition': day.get('day', {}).get('condition', {}).get('text')
            }
            for day in forecast
        ]
    }

    return flattened_data

# ────────────────────────────────────────────────────────────────
# 6. Main function to coordinate all steps
# ────────────────────────────────────────────────────────────────

def fetch_weather_data():
    # Base URL of the Weather API
    base_url = "http://api.weatherapi.com/v1/"
    
    # Desired city
    location = "Kolkata"

    # Get the Weather API key securely from Key Vault
    weatherapikey = dbutils.secrets.get(
        scope="key-vault-secret-scope", 
        key="weather-api-key"
    )

    # Step 1: Call all three API endpoints
    current_weather = get_current_weather(base_url, weatherapikey, location)
    forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3)
    alerts = get_alerts(base_url, weatherapikey, location)

    # Step 2: Merge into a flat event structure
    merged_data = flatten_data(current_weather, forecast_weather, alerts)

    # Step 3: Send to Event Hub
    send_event(merged_data)

# ────────────────────────────────────────────────────────────────
# 7. Trigger the process
# ────────────────────────────────────────────────────────────────

# This line initiates the full process: fetch → merge → send
fetch_weather_data()

## The above code only sends one event with the latest weather data hence modifying the code for Spark Structured Streaming Loop (1 sec rate)


In [0]:
# ──────────────────────────────────────────────────────────────────────────────
# 1) Imports: Event Hub client, JSON, HTTP requests, and Spark
# ──────────────────────────────────────────────────────────────────────────────
from azure.eventhub import EventHubProducerClient, EventData  # Azure SDK for sending events
import json                                                    # JSON serialization
import requests                                                # REST API calls
# Note: Spark is already available in Databricks for streaming

# ──────────────────────────────────────────────────────────────────────────────
# 2) Secure configuration: retrieve secrets from Key Vault
# ──────────────────────────────────────────────────────────────────────────────
EVENT_HUB_NAME = "weather-streaming-event-hub"
eventhub_connection_string = dbutils.secrets.get(
    scope="key-vault-secret-scope", 
    key="eventhub-connection-string-secret"
)
weatherapikey = dbutils.secrets.get(
    scope="key-vault-secret-scope", 
    key="weather-api-key"
)

# ──────────────────────────────────────────────────────────────────────────────
# 3) Initialize the Event Hub producer client once
# ──────────────────────────────────────────────────────────────────────────────
producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_string,
    eventhub_name=EVENT_HUB_NAME
)

# ──────────────────────────────────────────────────────────────────────────────
# 4) Helper: wrap a dict as JSON and send to Event Hub
# ──────────────────────────────────────────────────────────────────────────────
def send_event(event: dict):
    batch = producer.create_batch()           # Create a new batch container
    batch.add(EventData(json.dumps(event)))   # Serialize dict → JSON → EventData
    producer.send_batch(batch)                # Push the batch to the hub

# ──────────────────────────────────────────────────────────────────────────────
# 5) Helper: unified API response handling
# ──────────────────────────────────────────────────────────────────────────────
def handle_response(response):
    if response.status_code == 200:
        return response.json()                # Return parsed JSON on success
    return { "error": f"{response.status_code}: {response.text}" }

# ──────────────────────────────────────────────────────────────────────────────
# 6) Weather API calls (current, forecast, alerts)
# ──────────────────────────────────────────────────────────────────────────────
def get_current_weather(base_url, api_key, location):
    url = f"{base_url}/current.json"
    params = {'key': api_key, 'q': location, 'aqi': 'yes'}
    return handle_response(requests.get(url, params=params))

def get_forecast_weather(base_url, api_key, location, days):
    url = f"{base_url}/forecast.json"
    params = {'key': api_key, 'q': location, 'days': days}
    return handle_response(requests.get(url, params=params))

def get_alerts(base_url, api_key, location):
    url = f"{base_url}/alerts.json"
    params = {'key': api_key, 'q': location, 'alerts': 'yes'}
    return handle_response(requests.get(url, params=params))

# ──────────────────────────────────────────────────────────────────────────────
# 7) Flatten nested API outputs into one simple dict
# ──────────────────────────────────────────────────────────────────────────────
def flatten_data(current, forecast, alerts):
    loc = current.get("location", {})
    cur = current.get("current", {})
    cond = cur.get("condition", {})
    aqi = cur.get("air_quality", {})
    days = forecast.get("forecast", {}).get("forecastday", [])
    alerts_list = alerts.get("alerts", {}).get("alert", [])

    return {
        "name":     loc.get("name"),
        "region":   loc.get("region"),
        "country":  loc.get("country"),
        "temp_c":   cur.get("temp_c"),
        "condition": cond.get("text"),
        "air_quality": {k: aqi.get(k) for k in aqi},
        "forecast": [
            {
                "date":       d.get("date"),
                "max_temp":   d.get("day", {}).get("maxtemp_c"),
                "min_temp":   d.get("day", {}).get("mintemp_c"),
                "condition":  d.get("day", {}).get("condition", {}).get("text")
            }
            for d in days
        ],
        "alerts": [
            {
                "headline":    a.get("headline"),
                "severity":    a.get("severity"),
                "description": a.get("desc")
            }
            for a in alerts_list
        ]
    }

# ──────────────────────────────────────────────────────────────────────────────
# 8) Fetch + flatten all weather data (no sending)
# ──────────────────────────────────────────────────────────────────────────────
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1"
    location = "Kolkata"  # or any city
    # Call each endpoint
    curr    = get_current_weather(base_url, weatherapikey, location)
    forecast= get_forecast_weather(base_url, weatherapikey, location, days=3)
    alrt    = get_alerts(base_url, weatherapikey, location)
    # Merge into one record
    return flatten_data(curr, forecast, alrt)

# ──────────────────────────────────────────────────────────────────────────────
# 9) Batch processor invoked by Spark Structured Streaming
# ──────────────────────────────────────────────────────────────────────────────
def process_batch(batch_df, batch_id):
    """
    Called for each micro-batch of the Spark stream.
    We ignore batch_df content (using rate source) and instead
    fetch real weather data and send it.
    """
    try:
        data = fetch_weather_data()   # Get fresh weather snapshot
        send_event(data)              # Publish to Event Hub
    except Exception as e:
        # Log and rethrow to allow Spark to surface the error
        print(f"Error in batch {batch_id}: {e}")
        raise

# ──────────────────────────────────────────────────────────────────────────────
# 10) Define a dummy streaming source (rate) for pacing
# ──────────────────────────────────────────────────────────────────────────────
streaming_df = (
    spark.readStream
         .format("rate")               # Built-in source that generates rows at a fixed rate
         .option("rowsPerSecond", 1)   # 1 row per second → trigger one batch per second
         .load()
)

# ──────────────────────────────────────────────────────────────────────────────
# 11) Hook our processor into the streaming query
# ──────────────────────────────────────────────────────────────────────────────
query = (
    streaming_df.writeStream
               .foreachBatch(process_batch)   # Call process_batch() each micro-batch
               .start()
)

# Wait forever (or until manually stopped)
query.awaitTermination()

# ──────────────────────────────────────────────────────────────────────────────
# 12) Cleanup: close the Event Hub producer when done
# ──────────────────────────────────────────────────────────────────────────────
producer.close()


## Sending data from API to Event hub every 30 Seconds (as the weather data doesn't change every second)

In [0]:
# ──────────────────────────────────────────────────────────────────────────────
# 1) Imports: Event Hub client, JSON, HTTP, and scheduling utilities
# ──────────────────────────────────────────────────────────────────────────────
from azure.eventhub import EventHubProducerClient, EventData  # Azure SDK for sending events
import json                                                    # JSON serialization
import requests                                                # REST API calls
from datetime import datetime, timedelta                       # Time comparison for throttling

# ──────────────────────────────────────────────────────────────────────────────
# 2) Secure configuration: retrieve secrets from Azure Key Vault
# ──────────────────────────────────────────────────────────────────────────────
EVENT_HUB_NAME = "weather-streaming-event-hub"
eventhub_connection_string = dbutils.secrets.get(
    scope="key-vault-secret-scope",
    key="eventhub-connection-string-secret"                           # Secret storing the Event Hub connection
)
weather_api_key = dbutils.secrets.get(
    scope="key-vault-secret-scope",
    key="weather-api-key"                                        # Secret storing the Weather API key
)

# ──────────────────────────────────────────────────────────────────────────────
# 3) Initialize a single Event Hub producer client
# ──────────────────────────────────────────────────────────────────────────────
producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_string,
    eventhub_name=EVENT_HUB_NAME
)

# ──────────────────────────────────────────────────────────────────────────────
# 4) Helper: send a Python dict as JSON to Event Hub
# ──────────────────────────────────────────────────────────────────────────────
def send_event(event: dict):
    batch = producer.create_batch()             # Create an optimized batch container
    batch.add(EventData(json.dumps(event)))     # Serialize dict → JSON → EventData
    producer.send_batch(batch)                  # Publish the batch to Event Hub

# ──────────────────────────────────────────────────────────────────────────────
# 5) Helper: unified API response handling
# ──────────────────────────────────────────────────────────────────────────────
def handle_response(response):
    if response.status_code == 200:
        return response.json()                  # Return parsed JSON on success
    return { "error": f"{response.status_code}: {response.text}" }

# ──────────────────────────────────────────────────────────────────────────────
# 6) Weather API wrappers: current, forecast, alerts
# ──────────────────────────────────────────────────────────────────────────────
def get_current_weather(base_url, api_key, location):
    url = f"{base_url}/current.json"
    params = {'key': api_key, 'q': location, 'aqi': 'yes'}
    return handle_response(requests.get(url, params=params))

def get_forecast_weather(base_url, api_key, location, days):
    url = f"{base_url}/forecast.json"
    params = {'key': api_key, 'q': location, 'days': days}
    return handle_response(requests.get(url, params=params))

def get_alerts(base_url, api_key, location):
    url = f"{base_url}/alerts.json"
    params = {'key': api_key, 'q': location, 'alerts': 'yes'}
    return handle_response(requests.get(url, params=params))

# ──────────────────────────────────────────────────────────────────────────────
# 7) Flatten and merge API responses into one clean dict
# ──────────────────────────────────────────────────────────────────────────────
def flatten_data(current, forecast, alerts):
    loc = current.get("location", {})
    cur = current.get("current", {})
    cond = cur.get("condition", {})
    aqi = cur.get("air_quality", {})
    days = forecast.get("forecast", {}).get("forecastday", [])
    al = alerts.get("alerts", {}).get("alert", [])

    return {
        # Basic location info
        "name": loc.get("name"),
        "region": loc.get("region"),
        "country": loc.get("country"),

        # Current conditions
        "temp_c": cur.get("temp_c"),
        "condition": cond.get("text"),

        # Air quality readings
        "air_quality": {k: aqi.get(k) for k in aqi},

        # Next 3-day forecast
        "forecast": [
            {
                "date": d.get("date"),
                "max_temp": d.get("day", {}).get("maxtemp_c"),
                "min_temp": d.get("day", {}).get("mintemp_c"),
                "condition": d.get("day", {}).get("condition", {}).get("text")
            } for d in days
        ],

        # Active weather alerts
        "alerts": [
            {
                "headline": x.get("headline"),
                "severity": x.get("severity"),
                "description": x.get("desc")
            } for x in al
        ]
    }

# ──────────────────────────────────────────────────────────────────────────────
# 8) Fetch + flatten all weather data
# ──────────────────────────────────────────────────────────────────────────────
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1"
    location = "Kolkata"  # Change as needed
    curr    = get_current_weather(base_url, weather_api_key, location)
    forecast= get_forecast_weather(base_url, weather_api_key, location, days=3)
    alrt    = get_alerts(base_url, weather_api_key, location)
    return flatten_data(curr, forecast, alrt)

# ──────────────────────────────────────────────────────────────────────────────
# 9) Throttle control: track when last event was sent
# ──────────────────────────────────────────────────────────────────────────────
last_sent_time = datetime.now() - timedelta(seconds=30)  # Ensure immediate first send
#Suppose you run the code at 1:00 a.m, then the value for the variable would be 12:59:30 a.m.

# ──────────────────────────────────────────────────────────────────────────────
# 10) Batch processor invoked by Spark Structured Streaming
# ──────────────────────────────────────────────────────────────────────────────
def process_batch(batch_df, batch_id):
    """
    process_batch(batch_df, batch_id)
    Purpose: Every time Spark’s streaming “heartbeat” fires (once per batch), this function runs—and only actually sends weather data if at least 30 seconds have passed since the last send.

    batch_df
    A mini-DataFrame of whatever rows Spark just received. In our test we’re using the built-in “rate” source (one dummy row per second). We don’t use its contents—we just need it to trigger our function.

    batch_id
    A simple counter (0, 1, 2, …) Spark assigns to each batch. Handy for logging or troubleshooting if something goes wrong.
    """

    global last_sent_time
    #why global variable? so that it can be used in another function (in this case process batch)
    now = datetime.now()                                # Current timestamp
    elapsed = (now - last_sent_time).total_seconds()    # Seconds since last send

    if elapsed >= 30:
    """
    We check how many seconds have passed since our last successful send. If it’s 30 seconds or more, we proceed; otherwise we skip this batch.
    """
        try:
            data = fetch_weather_data()  # Retrieve latest weather snapshot
            send_event(data)             # Publish to Event Hub
            """
            Trigger: Whenever Spark adds a new row to streaming_df, it bundles that row into the micro-batch and calls this function.

            We ignore batch_df’s contents; instead we treat its arrival as a signal to call the weather API.
            """

            last_sent_time = now         # Update throttle timestamp
            print(f"Event sent at {now.isoformat()}")
        except Exception as e:
            print(f"Error in batch {batch_id}: {e}")
            raise

# ──────────────────────────────────────────────────────────────────────────────
# 11) Define a dummy streaming source to trigger batches
# ──────────────────────────────────────────────────────────────────────────────
streaming_df = (
    spark.readStream
         .format("rate")               # Generates rows at a fixed rate
         .option("rowsPerSecond", 1)   # One row per second
         .load()
)
"""
Rate source: emits exactly one fake row per second.
Since weather API is not a streaming datasource, we cannot directly stream data from it, its just an API. We are using rate to emit it as a streaming source.
Using rate means no external dependency; we generate “rows” purely to drive our logic.
"""

# ──────────────────────────────────────────────────────────────────────────────
# 12) Attach processor to the stream and start
# ──────────────────────────────────────────────────────────────────────────────
query = (
    streaming_df.writeStream
               .foreachBatch(process_batch)  # Call our function each batch
               .start()
)
"""
writeStream: begins defining how to output the stream.
.foreachBatch(process_batch): tells Spark “for each micro-batch, call process_batch(batch_df, batch_id).”
.start(): kicks off the continuous streaming job.
"""

# Await termination (runs until manually stopped)
query.awaitTermination()

# ──────────────────────────────────────────────────────────────────────────────
# 13) Cleanup: close the Event Hub producer when done
# ──────────────────────────────────────────────────────────────────────────────
producer.close()
