In [0]:
# List all secret scopes
scopes = dbutils.secrets.listScopes()

# Print the secret scopes
for scope in scopes:
    print(scope.name)
    

key-vault-scope


In [0]:
## test api 
import json
import requests

#set api key 
api_key = dbutils.secrets.get(scope = 'key-vault-scope',key = 'weather-api-key')

def check_request_status(response):
    """"
    Check the status of the request
    """
    if response.status_code == 200:
        print(f'Request successful')
        
        return response.json()
    else:
        print(f'Request failed: {response.status_code}')

def get_current_weather_data(api_key, location):
    """
    Get the current weather data for a given location
    """

    base_url = f"http://api.weatherapi.com/v1/current.json?key={api_key}&q={location}&aqi=no"
    
    response = requests.get(base_url)
    
    response = check_request_status(response)

    return response


def forcast_weather_data(
                        city,
                        api_key,
                        days = 1):
    """
    Get future weather data for a given location
    
    """
    base_url = f"http://api.weatherapi.com/v1/forecast.json?key={api_key}&q={city}&days={days}"
    
    response = requests.get(base_url)
    response = check_request_status(response)
    return response 


def get_alerts(
                        city,
                        api_key, 
                        alerts = "no",):
    """
    Get future weather data for a given location
    
    """
    base_url = f"http://api.weatherapi.com/v1/forecast.json?key={api_key}&q={city}&alerts={alerts}"
    
    response = requests.get(base_url)
    response = check_request_status(response)
    return response 


def flatten_and_merge_data(current_weather, forcast_weather, alerts):
    """
    merge the data into a single dataframe
    """
    
    

    condition = current_weather.get("condition",{})

    location = current_weather.get("location")

    temperature = current_weather.get("temp_c",{})


    forecast_weather = forcast_weather.get("forecast")['forecastday']

    alert_data = alerts.get("alerts").get('alert',[])


    flattened_data = {
    'CURRENT_DATA': {
        'name': current_weather.get("location",{}).get("name"),
        'region': current_weather.get("location",{}).get("region"),
        'country': current_weather.get("location",{}).get("country"),
        'lat': current_weather.get("location",{}).get("lat"),
        'lon': current_weather.get("location",{}).get("lon"),
        'localtime': current_weather.get("location",{}).get("localtime"),
        'cloud': current_weather.get("current").get('cloud'),
        'humidity': current_weather.get("current").get('humidity'),
        'pressure_mb': current_weather.get("current").get('pressure_mb'),
        'precip_mm': current_weather.get("current").get('precip_mm'),
        'wind_dir': current_weather.get("current").get('wind_dir'),
        'condition': condition.get('text'),
        'temperature': temperature
    },
    'FORECAST_DATA': {
        'forecastday': [day.get('day') for day in forecast_weather],
        'date': [day.get('date') for day in forecast_weather],
        'day': [day.get('day') for day in forecast_weather],
        'uv': [day.get('uv') for day in forecast_weather],
        'feelslike': [day.get('feelslike_c') for day in forecast_weather],
        'avgtemp': [day.get('avgtemp_c') for day in forecast_weather],
        'maxtemp': [day.get('maxtemp_c') for day in forecast_weather],
        'mintemp': [day.get('mintemp_c') for day in forecast_weather],
        'totalprecip_mm': [day.get('totalprecip_mm') for day in forecast_weather]
    },
    'ALERTS': {}
    }

    # Process alerts if alert_data is a list of dictionaries
    if alert_data:  # Check if alert_data is not empty
        flattened_data['ALERTS'] = [

            {   '--------' :'----------------------------------------',
                'headline': alert.get('headline'),
                'event': alert.get('event'),
                'severity': alert.get('severity'),
                'urgency': alert.get('urgency'),
                'certainty': alert.get('certainty'),
                'description': alert.get('desc'),
                'effective': alert.get('effective'),
                'expires': alert.get('expires'),
                
            }
            for alert in alert_data
        ]  

    return flattened_data




    

    


In [0]:
def get_full_weather_data(api_key, location):

    """ 
     
    """



    current_weather = get_current_weather_data(api_key = api_key, location = 'New Haven')

    forcast_weather = forcast_weather_data(api_key = api_key, city = 'New Haven', days = 5)

    alerts = get_alerts(api_key = api_key, city = 'New Haven', alerts = "yes")


    flattened_data = flatten_and_merge_data(current_weather, forcast_weather, alerts)
    

    with open('weather_report.json', 'w') as f:
        json.dump(flattened_data, f, indent=4)
    

    return flattened_data







Request successful
Request successful
Request successful


In [0]:
##send complete report to event hub
import json 
import requests
from azure.eventhub import EventHubProducerClient,EventData

def push_to_eventhub(event_hub_connection_string, event_hub_name,   
               data):
    producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_connection_string, eventhub_name=event_hub_name)

    event_data_batch = producer.create_batch()
    
    # Prepare EventData object with JSON formatted string
    event_data = EventData(json.dumps(data))
    
    try:
                event_data_batch.add(event_data)
    
    except ValueError:
                # If batch is full, send it first, then create a new one
                producer.send_batch(event_data_batch)
                event_data_batch = producer.create_batch()
                event_data_batch.add(event_data)

    # Ensure all data is sent
    producer.send_batch(event_data_batch)





In [0]:
#push to event hub 

from datetime import datetime
import time
from datetime import timedelta
def stream_to_eventhub(df, epoch_id):
    
    time_interval = 30
    
    current_time = datetime.now()
    next_30_secs = current_time + timedelta(seconds= time_interval)

    


    while True:
        current_time = datetime.now()
        next_30_secs = current_time + timedelta(seconds=time_interval)

        # Get weather data and push to Event Hub
        weather_report = get_full_weather_data(api_key=api_key, location='New Haven')

        event_hub_connection_string = dbutils.secrets.get(scope='key-vault-scope', key='eventhub-connection-string')
        event_hub_name = 'weather-event-hub'

        push_to_eventhub(event_hub_connection_string, event_hub_name, weather_report)

        print(f'event sent at {current_time} ')

        # Wait until the next interval
        time.sleep(time_interval)
        



In [0]:
#weather data streaming 
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Weather Streaming").getOrCreate()

stream_df = spark.readStream.format("rate") .option("rowsPerSecond", 1).load()

query = stream_df.writeStream.foreachBatch(stream_to_eventhub).start()
query.awaitTermination()

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can