# Ingestion from weather API

In [0]:
from datetime import datetime, timedelta, UTC
import pandas as pd
import json
from requests import exceptions as requests_exceptions
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp, to_date
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType, TimestampType
from utils.fetch_openmeteo import fetch_weather_city_date

In [0]:
# Initialize widget for Date Parameter (Default to Yesterday if not provided)
default_date = (datetime.now(UTC) - timedelta(days=1)).strftime('%Y-%m-%d')
dbutils.widgets.text("processing_date", default_date, "Date (YYYY-MM-DD)")

# Get the parameter
processing_date = dbutils.widgets.get("processing_date")

print(f"Starting ingestion for date: {processing_date}")

In [0]:
DESTINATION_TABLE = "openmeteo_hourly_historical"
DESTINATION_SCHEMA = "bronze"
DESTINATION_CATALOG = "workspace"

In [0]:
# input - TODO Parameterize

CITIES = [
    {"city": "New York", "lat": 40.7128, "lon": -74.0060},
    {"city": "Chicago", "lat": 41.8781, "lon": -87.6298},
    {"city": "Houston", "lat": 29.7604, "lon": -95.3698},
    {"city": "Phoenix", "lat": 33.4484, "lon": -112.0740}, 
    {"city": "Philadelphia", "lat": 39.9526, "lon": -75.1652},
    {"city": "San Francisco", "lat": 37.7749, "lon": -122.4194}
]

HOURLY_VARIABLES = [
    "temperature_2m",
    "relativehumidity_2m",
    "surface_pressure",
    "precipitation",
    "windspeed_10m",
    "winddirection_10m",
    "cloudcover"
]

In [0]:
data = []
for loc in CITIES:
    try:
        print(f"Fetching data for {loc['city']}")
        out = fetch_weather_city_date(
            loc['city'],
            loc['lat'],
            loc['lon'],
            processing_date,
            processing_date,
            HOURLY_VARIABLES
            )
        if out is not None:
            data.append(out)

    except requests_exceptions.RequestException as e:
        print(f"Error fetching data for {loc['city']}: {e}")
        continue # prioritizing getting "some" data rather than hold up the pipeline for a single error

In [0]:
if not data:
    dbutils.notebook.exit("No data fetched. Check API or Parameters.")

In [0]:
full_df = pd.concat(data, ignore_index=True)

In [0]:
spark_df = spark.createDataFrame(full_df)

In [0]:
final_df = spark_df \
    .withColumn("ingestion_timestamp", current_timestamp()) \
    .withColumn("observation_date", to_date(col("time")))

In [0]:
final_df.createOrReplaceTempView("final_df")

In [0]:
# only use for intial load
# final_df.write\
#     .format("delta")\
#     .mode("overwrite")\
#     .option("mergeSchema", "true")\
#     .partitionBy("observation_date")\
#     .saveAsTable(f"{DESTINATION_CATALOG}.{DESTINATION_SCHEMA}.{DESTINATION_TABLE}")

In [0]:
d = spark.sql(
    f'''
    INSERT OVERWRITE TABLE {DESTINATION_CATALOG}.{DESTINATION_SCHEMA}.{DESTINATION_TABLE}
    PARTITION (observation_date = '{processing_date}')
    SELECT * except(observation_date) FROM final_df
    ''')