In [None]:
dbutils.widgets.text("catalog", "hive_metastore", "Catalog Name")
dbutils.widgets.text("schema", "dev", "Schema Name")

catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType, IntegerType
import requests
from datetime import datetime
import json

In [0]:
CALGARY_LAT = 51.0447
CALGARY_LON = -114.0719
WEATHER_API_URL = "https://api.open-meteo.com/v1/forecast"
CHECKPOINT_PATH = f"/tmp/{schema}/weather_stream_checkpoint"
OUTPUT_PATH = f"/tmp/{schema}/weather_data_output"
TRIGGER_INTERVAL = "30 seconds"

In [0]:
#dbutils.fs.rm(CHECKPOINT_PATH, recurse=True)
#dbutils.fs.rm(OUTPUT_PATH, recurse=True)

In [0]:
weather_schema = StructType([
    StructField("timestamp", TimestampType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("temperature", DoubleType(), True),
    StructField("humidity", IntegerType(), True),
    StructField("wind_speed", DoubleType(), True),
    StructField("wind_direction", IntegerType(), True),
    StructField("weather_code", IntegerType(), True),
    StructField("city", StringType(), True),
    StructField("fetch_time", TimestampType(), True)
])

In [0]:
def fetch_calgary_weather():
    """
    Fetch current weather data for Calgary from Open-Meteo API
    Returns a dictionary with weather information
    """
    try:
        params = {
            "latitude": CALGARY_LAT,
            "longitude": CALGARY_LON,
            "current": "temperature_2m,relative_humidity_2m,wind_speed_10m,wind_direction_10m,weather_code",
            "timezone": "America/Edmonton"
        }
        
        response = requests.get(WEATHER_API_URL, params=params, timeout=10)
        response.raise_for_status()
        
        data = response.json()
        current = data.get("current", {})
        
        return {
            "timestamp": datetime.fromisoformat(current.get("time", datetime.now().isoformat())),
            "latitude": CALGARY_LAT,
            "longitude": CALGARY_LON,
            "temperature": current.get("temperature_2m"),
            "humidity": current.get("relative_humidity_2m"),
            "wind_speed": current.get("wind_speed_10m"),
            "wind_direction": current.get("wind_direction_10m"),
            "weather_code": current.get("weather_code"),
            "city": "Calgary",
            "fetch_time": datetime.now()
        }
    except Exception as e:
        print(f"Error fetching weather data: {e}")
        return None

In [0]:
rate_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 1) \
    .load()

In [0]:
def process_batch(batch_df, batch_id):
    print(f"Processing batch {batch_id} at {datetime.now()}")
    
    weather_data = fetch_calgary_weather()
    
    if weather_data:
        weather_df = spark.createDataFrame([weather_data], schema=weather_schema)
        
        weather_df.write \
            .format("delta") \
            .mode("append") \
            .save(OUTPUT_PATH)
        
        print(f"✅ Batch {batch_id} - Weather data written: {weather_data['temperature']}°C, {weather_data['humidity']}% humidity")
    else:
        print(f"❌ Batch {batch_id} - Failed to fetch weather data")

In [0]:
streaming_query = rate_stream.writeStream \
    .foreachBatch(process_batch) \
    .outputMode("append") \
    .option("checkpointLocation", CHECKPOINT_PATH) \
    .trigger(processingTime=TRIGGER_INTERVAL) \
    .start()