# Fetch Weather Data

This notebook fetches historical weather data from the Open-Meteo API
and loads it into a Delta table for use by the declarative pipeline.

In [None]:
# Get parameters from widgets
dbutils.widgets.text("catalog", "hive_metastore", "Catalog")
dbutils.widgets.text("schema", "default", "Schema")
dbutils.widgets.text("start_date", "2023-01-01", "Start Date")
dbutils.widgets.text("end_date", "2023-12-31", "End Date")

catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
start_date = dbutils.widgets.get("start_date")
end_date = dbutils.widgets.get("end_date")

print(f"Catalog: {catalog}")
print(f"Schema: {schema}")
print(f"Date range: {start_date} to {end_date}")

In [None]:
# Set catalog and schema
spark.sql(f"USE CATALOG `{catalog}`")
spark.sql(f"USE SCHEMA `{schema}`")

In [None]:
import requests
import pandas as pd
from datetime import datetime, timedelta

# NYC coordinates
NYC_LATITUDE = 40.7128
NYC_LONGITUDE = -74.0060

# Open-Meteo Historical Weather API endpoint
ARCHIVE_API_URL = "https://archive-api.open-meteo.com/v1/archive"

# Weather variables to fetch
HOURLY_VARIABLES = [
    "temperature_2m",
    "relative_humidity_2m",
    "precipitation",
    "rain",
    "snowfall",
    "wind_speed_10m",
    "weather_code",
]

In [None]:
def fetch_weather_chunk(start: str, end: str) -> pd.DataFrame:
    """Fetch weather data for a date range."""
    params = {
        "latitude": NYC_LATITUDE,
        "longitude": NYC_LONGITUDE,
        "start_date": start,
        "end_date": end,
        "hourly": ",".join(HOURLY_VARIABLES),
        "timezone": "America/New_York",
    }
    
    response = requests.get(ARCHIVE_API_URL, params=params, timeout=30)
    response.raise_for_status()
    data = response.json()
    
    hourly = data["hourly"]
    
    return pd.DataFrame({
        "datetime": pd.to_datetime(hourly["time"]),
        "temperature_2m": hourly.get("temperature_2m"),
        "relative_humidity_2m": hourly.get("relative_humidity_2m"),
        "precipitation": hourly.get("precipitation"),
        "rain": hourly.get("rain"),
        "snowfall": hourly.get("snowfall"),
        "wind_speed_10m": hourly.get("wind_speed_10m"),
        "weather_code": hourly.get("weather_code"),
    })

In [None]:
# Fetch weather data in chunks to avoid API timeouts
print(f"Fetching weather data from {start_date} to {end_date}...")

start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
chunk_days = 30

chunks = []
current_start = start

while current_start < end:
    current_end = min(current_start + timedelta(days=chunk_days - 1), end)
    print(f"  Fetching {current_start.date()} to {current_end.date()}...")
    
    chunk_df = fetch_weather_chunk(
        start=current_start.strftime("%Y-%m-%d"),
        end=current_end.strftime("%Y-%m-%d"),
    )
    chunks.append(chunk_df)
    current_start = current_end + timedelta(days=1)

weather_df = pd.concat(chunks, ignore_index=True)
print(f"\nFetched {len(weather_df)} weather records")

In [None]:
# Convert to Spark DataFrame and write to Delta table
spark_df = spark.createDataFrame(weather_df)

table_name = "weather_raw"
print(f"Writing to {catalog}.{schema}.{table_name}...")

spark_df.write.format("delta").mode("overwrite").saveAsTable(table_name)

print(f"Successfully wrote weather data to {table_name}")

In [None]:
# Show sample of the data
spark.table(table_name).show(5)