In [0]:
import requests
import json
import logging
from typing import Any, Dict, List, Optional
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import col, to_timestamp, current_timestamp, date_sub

INFO:py4j.clientserver:Received command c on object id p0


In [0]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("WeatherDataPipeline")

INFO:py4j.clientserver:Received command c on object id p0


In [0]:
HEADERS = {"User-Agent": "(myweatherapp.com, contact@myweatherapp.com)"}
STATION_ID = "0128W"
BASE_URL = "https://api.weather.gov"
OBSERVATIONS_ENDPOINT = f"{BASE_URL}/stations/{STATION_ID}/observations"
STATION_ENDPOINT = f"{BASE_URL}/stations/{STATION_ID}"
CATALOG_NAME = "company_data"
SCHEMA_NAME = "weather"
TABLE_NAME = "station_observations"
FULL_TABLE_NAME = f"{CATALOG_NAME}.{SCHEMA_NAME}.{TABLE_NAME}"

INFO:py4j.clientserver:Received command c on object id p0


In [0]:
def fetch_station_metadata(station_id: str) -> Optional[Dict[str, Any]]:
    """
    Fetch metadata for a given station, including timezone.
    
    Args:
        station_id (str): The station ID to fetch metadata for.
        
    Returns:
        Optional[Dict[str, Any]]: A dictionary containing metadata or None if unavailable.
    """
    station_url = f"{BASE_URL}/stations/{station_id}"
    try:
        response = requests.get(station_url, headers=HEADERS)
        if response.status_code == 200:
            data = response.json()
            return {
                "station_id": station_id,
                "station_name": data.get("properties", {}).get("name", None),
                "timezone": data.get("properties", {}).get("timeZone", None)
            }
        else:
            logger.warning(f"Failed to fetch metadata for station {station_id}. Status code: {response.status_code}")
            return None
    except Exception as e:
        logger.error(f"Error fetching metadata for station {station_id}: {e}")
        return None

INFO:py4j.clientserver:Received command c on object id p0


In [0]:
def fetch_weather_data(station_id: str) -> List[Dict[str, Any]]:
    """
    Fetch weather observation data for a given station.
    
    Args:
        station_id (str): The station ID to fetch data for.
        
    Returns:
        List[Dict[str, Any]]: A list of observation dictionaries.
    """
    try:
        response = requests.get(OBSERVATIONS_ENDPOINT, headers=HEADERS)
        if response.status_code == 200:
            data = response.json()
            return data.get("features", [])
        else:
            logger.warning(f"Failed to fetch observations for station {station_id}. Status code: {response.status_code}")
            return []
    except Exception as e:
        logger.error(f"Error fetching observations for station {station_id}: {e}")
        return []

In [0]:
def transform_data_to_dataframe(observations: List[Dict[str, Any]], metadata: Dict[str, Any]) -> DataFrame:
    """
    Transform observation data and metadata into a PySpark DataFrame.

    Args:
        observations (List[Dict[str, Any]]): A list of observation dictionaries.
        metadata (Dict[str, Any]): Metadata containing timezone and station details.

    Returns:
        DataFrame: A PySpark DataFrame.
    """
    schema = StructType(
        [
            StructField("station_id", StringType(), True),
            StructField("station_name", StringType(), True),
            StructField("timezone", StringType(), True),
            StructField("latitude", DoubleType(), True),
            StructField("longitude", DoubleType(), True),
            StructField("timestamp_str", StringType(), True),  # Store as String initially
            StructField("temperature", DoubleType(), True),
            StructField("wind_speed", DoubleType(), True),
            StructField("humidity", DoubleType(), True),
        ]
    )

    def safe_round(value: Any, decimals: int = 2) -> Optional[float]:
        return round(float(value), decimals) if value is not None else None

    records = []
    for observation in observations:
        props = observation.get("properties", {})
        coords = observation.get("geometry", {}).get("coordinates", [None, None])
        records.append(
            {
                "station_id": metadata["station_id"],
                "station_name": metadata["station_name"],
                "timezone": metadata["timezone"],
                "latitude": coords[1],
                "longitude": coords[0],
                "timestamp_str": props.get("timestamp"),  # Keep as raw ISO-8601 string
                "temperature": safe_round(props.get("temperature", {}).get("value")),
                "wind_speed": safe_round(props.get("windSpeed", {}).get("value")),
                "humidity": safe_round(props.get("relativeHumidity", {}).get("value")),
            }
        )

    df = spark.createDataFrame(records, schema=schema)
    # Convert ISO-8601 string to TimestampType, and drop the original
    df = df.withColumn("timestamp", to_timestamp(col("timestamp_str")))
    df = df.drop("timestamp_str")
    # Filter for only the last 7 days
    df = df.filter(col("timestamp") >= date_sub(current_timestamp(), 7))

    return df

INFO:py4j.clientserver:Received command c on object id p0


In [0]:
def upsert_to_table(dataframe: DataFrame, table_name: str) -> None:
    """
    Upsert data into a Unity Catalog table using MERGE.
    
    Args:
        dataframe (DataFrame): The DataFrame to upsert.
        table_name (str): The table name.
    """
    dataframe.createOrReplaceTempView("temp_observations")
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            station_id STRING,
            station_name STRING,
            timezone STRING,
            latitude DOUBLE,
            longitude DOUBLE,
            timestamp TIMESTAMP,
            temperature DOUBLE,
            wind_speed DOUBLE,
            humidity DOUBLE
        )
        USING delta
    """)
    spark.sql(f"""
        MERGE INTO {table_name} target
        USING temp_observations source
        ON target.station_id = source.station_id AND target.timestamp = source.timestamp
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
    """)

INFO:py4j.clientserver:Received command c on object id p0


In [0]:
if __name__ == "__main__":
    metadata = fetch_station_metadata(STATION_ID)
    if metadata:
        observations = fetch_weather_data(STATION_ID)
        if observations:
            df = transform_data_to_dataframe(observations, metadata)
            upsert_to_table(df, FULL_TABLE_NAME)
            logger.info("Pipeline executed successfully.")
        else:
            logger.warning("No observations found.")
    else:
        logger.warning("Failed to retrieve station metadata.")

INFO:py4j.clientserver:Received command c on object id p0
INFO:WeatherDataPipeline:Pipeline executed successfully.
