In [0]:
#importing dependencies

from azure.eventhub import EventHubProducerClient, EventData
import json
import requests
import logging
from datetime import datetime, timedelta
import time


In [0]:
# confirming the key vault scope
dbutils.secrets.listScopes()


[SecretScope(name='key-vault-secret-scope')]

In [0]:
#Configuring Event Hub and Key vault

Event_hub_name = "weatherstream-eventhub"
Event_hub_connection_string = dbutils.secrets.get(scope="key-vault-secret-scope", key="eventubdatabricksconnectionstring")
weatherapikey = dbutils.secrets.get(scope="key-vault-secret-scope", key="weatherapikey")

In [0]:
#initializing the event hub producer

producer = EventHubProducerClient.from_connection_string(
        conn_str=Event_hub_connection_string,
        eventhub_name=Event_hub_name
    )

In [0]:
#Setting up function to send data to Azure Event Hub

def send_to_event_hub(data):
    
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(data)))
    producer.send_batch(event_data_batch)
    logging.info("Data sent to Event Hub")

In [0]:
#Function to handle the API call

API_URL = "http://api.weatherapi.com/v1"

def fetch_data():
    """Fetch data from the API."""
    response = requests.get(API_URL)
    if response.status_code == 200:
        return response.json()  
    else:
        logging.error(f"API request failed: {response.status_code}")
        return None

In [0]:
#Function to get current weather and air quality 
def get_current_weather_and_air_quality(base_url, api_key, location):
    """Fetch current weather and air quality."""
    url = f"{base_url}/current.json"
    params = {"key": api_key, "q": location, "aqi": "yes"}
    response = requests.get(url, params=params)

    if response.status_code == 200:
        return response.json()
    else:
        logging.error(f"Failed to fetch weather data: {response.status_code} - {response.text}")
        return {}

In [0]:
def get_alerts(base_url, api_key, location):
    """Fetch weather alerts."""
    url = f"{base_url}/alerts.json"
    params = {"key": api_key, "q": location}
    response = requests.get(url, params=params)

    if response.status_code == 200:
        return response.json()
    else:
        logging.error(f"Failed to fetch alerts: {response.status_code} - {response.text}")
        return {"alerts": []}  # Return empty alerts if request fails

In [0]:
def fetch_and_send_data():
    """Fetch weather data and send to Event Hub"""
    base_url = "http://api.weatherapi.com/v1"
    location = "Canterbury"

    current_weather = get_current_weather_and_air_quality(base_url, weatherapikey, location)
    alerts = get_alerts(base_url, weatherapikey, location)
    
    # Add alerts data to weather data
    current_weather["alerts"] = alerts["alerts"]
    
    # Send to Event Hub
    send_to_event_hub(current_weather)


In [0]:
# Call function to fetch & send data every 30 seconds untill compute is stopped

while True:
    try:
        fetch_and_send_data()  # Fetch and send data to Event Hub
        logging.info("Waiting for 30 seconds before next fetch...")
        time.sleep(30)  # timing set at 30 seconds before next fetching
    except KeyboardInterrupt:
        logging.info("Process interrupted by user. Stopping...")
        break
    except Exception as e:
        logging.error(f"An error occurred: {str(e)}")


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
# Close Event Hub producer (at script shutdown)
producer.close()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:464)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:571)
	at com.data