In [0]:
import logging  # For logging errors or information
import requests  # For making HTTP requests to the weather API
import json  # For handling JSON data serialization/deserialization
from azure.eventhub import *  # For sending data to Azure Event Hub
from datetime import *  # For working with dates and times

# Securely retrieve the Event Hub connection string from Databricks secrets
EVENT_HUB_CONNECTION_STRING = dbutils.secrets.get(
    scope="key-vault-scope", key="event-hub-key"
)

# Specify the Event Hub name to which events will be sent
EVENT_HUB_NAME = "weather-event-hub"

# Initialize the Event Hub producer client for sending events
producer = EventHubProducerClient.from_connection_string(
    conn_str=EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME
)

# Function to send a single event (as a dictionary) to Azure Event Hub
def send_event(event):
    event_data_batch = producer.create_batch()  # Create a batch for efficient sending
    event_data_batch.add(EventData(json.dumps(event)))  # Serialize event dict to JSON and add to batch
    producer.send_batch(event_data_batch)  # Send the batch to Event Hub
    print(f"sent the event to EVENT HUB: {event}")  # Log the sent event

# Function to handle the API response and return JSON or error message
def handle_response(response):
    if response.status_code == 200:  # Check if the response is successful
        return response.json()  # Parse and return JSON data
    else:
        return f"Error: {response.status_code}, {response.text}"  # Return error details

# Function to get current weather and air quality data from the API
def get_current_weather(base_url, api_key, location):
    current_weather_url = f"{base_url}/current.json"  # Construct endpoint URL
    params = {
        "key": api_key,
        "q": location,
        "aqi": "yes",
    }  # Set API parameters
    response = requests.get(current_weather_url, params=params)  # Make GET request
    return handle_response(response)  # Process and return response

# Function to get weather forecast data from the API
def get_forecast_weather(base_url, api_key, location, days):
    forecast_url = f"{base_url}/forecast.json"  # Construct endpoint URL
    params = {
        "key": api_key,
        "q": location,
        "days": days,
    }  # Set API parameters
    response = requests.get(forecast_url, params=params)  # Make GET request
    return handle_response(response)  # Process and return response

# Function to get weather alerts from the API
def get_alerts(base_url, api_key, location):
    alerts_url = f"{base_url}/alerts.json"  # Construct endpoint URL
    params = {
        "key": api_key,
        "q": location,
        "alerts": "yes",
    }  # Set API parameters
    response = requests.get(alerts_url, params=params)  # Make GET request
    return handle_response(response)  # Process and return response

# Function to flatten and merge current, forecast, and alert data into a single dictionary
def flatten_data(current_weather, forecast_weather, alerts):
    location_data = current_weather.get("location", {})  # Extract location info
    current = current_weather.get("current", {})  # Extract current weather info
    condition = current.get("condition", {})  # Extract weather condition
    air_quality = current.get("air_quality", {})  # Extract air quality info
    forecast = forecast_weather.get("forecast", {}).get("forecastday", [])  # Extract forecast days
    alert_list = alerts.get("alerts", {}).get("alert", [])  # Extract alerts list

    flattened_data = {
        "name": location_data.get("name"),  # City name
        "region": location_data.get("region"),  # Region name
        "country": location_data.get("country"),  # Country name
        "lat": location_data.get("lat"),  # Latitude
        "lon": location_data.get("lon"),  # Longitude
        "temp_c": current.get("temp_c"),  # Current temperature (Celsius)
        "is_day": current.get("is_day"),  # Day/night indicator
        "condition_text": condition.get("text"),  # Weather condition text
        "condition_icon": condition.get("icon"),  # Weather condition icon URL
        "wind_kph": current.get("wind_kph"),  # Wind speed (kph)
        "wind_degree": current.get("wind_degree"),  # Wind direction (degrees)
        "wind_dir": current.get("wind_dir"),  # Wind direction (text)
        "pressure_in": current.get("pressure_in"),  # Pressure (inches)
        "precip_in": current.get("precip_in"),  # Precipitation (inches)
        "humidity": current.get("humidity"),  # Humidity (%)
        "cloud": current.get("cloud"),  # Cloud cover (%)
        "feelslike_c": current.get("feelslike_c"),  # Feels like temperature (Celsius)
        "uv": current.get("uv"),  # UV index
        "air_quality": {  # Air quality details
            "co": air_quality.get("co"),
            "no2": air_quality.get("no2"),
            "o3": air_quality.get("o3"),
            "so2": air_quality.get("so2"),
            "pm2_5": air_quality.get("pm2_5"),
            "pm10": air_quality.get("pm10"),
            "us-epa-index": air_quality.get("us-epa-index"),
            "gb-defra-index": air_quality.get("gb-defra-index"),
        },
        "alerts": [
            {
                "headline": alert.get("headline"),  # Alert headline
                "severity": alert.get("severity"),  # Alert severity
                "description": alert.get("desc"),  # Alert description
                "instruction": alert.get("instruction"),  # Alert instructions
            }
            for alert in alert_list  # Loop through all alerts
        ],
        "forecast": [
            {
                "date": day.get("date"),  # Forecast date
                "maxtemp_c": day.get("day", {}).get("maxtemp_c"),  # Max temperature
                "mintemp_c": day.get("day", {}).get("mintemp_c"),  # Min temperature
                "condition": day.get("day", {}).get("condition", {}).get("text"),  # Forecast condition
            }
            for day in forecast  # Loop through all forecast days
        ],
    }
    return flattened_data  # Return the merged and flattened data

# Main function to fetch and process weather data from the API
def fetch_weather_data():
    base_url = "http://api.weatherapi.com/v1/"  # Base URL for weather API
    location = "Queretaro"  # City name (can be changed as needed)
    weatherapikey = dbutils.secrets.get(
        scope="key-vault-scope", key="weather-api-key"
    )  # Retrieve weather API key securely

    # Fetch current weather, forecast, and alerts from the API
    current_weather = get_current_weather(base_url, weatherapikey, location)
    forecast_weather = get_forecast_weather(base_url, weatherapikey, location, 3)
    alerts = get_alerts(base_url, weatherapikey, location)

    # Flatten and merge all data into a single dictionary
    merged_data = flatten_data(current_weather, forecast_weather, alerts)
    print(json.dumps(merged_data, indent=3))  # Print merged data for inspection
    return merged_data  # Return the merged data

from datetime import datetime, timedelta  # Only import what's needed

# Initialize last sent time to 30 seconds ago to allow immediate first send
last_sent_time = datetime.now() - timedelta(seconds=30)

# Function to process each batch of streaming data
def process_batch(batch_df, batch_id):
    global last_sent_time  # Use global variable to track last sent time
    try:
        current_time = datetime.now()  # Get current time
        # Only send event if 30 seconds have passed since last send
        if (current_time - last_sent_time).total_seconds() >= 30:
            weather_data = fetch_weather_data()  # Fetch latest weather data
            send_event(weather_data)  # Send weather data to Event Hub
            last_sent_time = current_time  # Update last sent time
            print(f"Event Sent at {last_sent_time}")  # Log send time
    except Exception as e:
        print(f"Error sending events in batch {batch_id}: {str(e)}")  # Log errors
        raise e  # Reraise exception for further handling

# Set up a streaming source (rate source for testing, emits rows per second)
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

# Write the streaming data using foreachBatch to send weather data to Event Hub
query = streaming_df.writeStream.foreachBatch(process_batch).start()

query.awaitTermination()  # Wait for the streaming query to finish

producer.close()  # Close the Event Hub producer after termination