##### Client
----
fetch weather data from API, capture ingestion + API metadata, handle errors, support concurrent requests for multiple locations


In [231]:
import asyncio
from datetime import datetime, timezone
from typing import List, Optional

import httpx

In [232]:
async def fetch_weather_single(
        latitude: float,
        longitude: float,
        hourly_variables: list[str] | None = None,
        daily_variables: list[str] | None = None,
        forecast_days: int = 7,
        timezone_str: str = "UTC",
        base_url: str = "https://api.open-meteo.com/v1/forecast",
        timeout: int = 30,
        location: dict | None = None,
    ) -> dict:

    """ Fetch weather data from a single location
    
    Returns dict with:
    -  data: API response
    -  ingestion_metadata: client-side metadata
    -  api_metadata: API-side metadata from response
    """

    params = {
        "latitude": latitude,
        "longitude": longitude,
        "forecast_days": forecast_days,
        "timezone": timezone_str,
    }

    if hourly_variables:
        params["hourly"] = ",".join(hourly_variables)
    if daily_variables:
        params["daily"] = ",".join(daily_variables)

    # record timestamp before request
    ingestion_timestamp = datetime.now(timezone.utc)

    # make async request
    async with httpx.AsyncClient(timeout=timeout) as client:
        response = await client.get(base_url, params=params)
    
    raw_data = response.json()

    # Error checks
    if raw_data.get("error"):
        raise ValueError(f"API Error: {raw_data['reason']}")
    
    # Build return dict with metadata
    return {
        "data": raw_data,
        "location": location or {"latitude": latitude, "longitude": longitude},
        "ingestion_metadata": {
            "timestamp_utc": ingestion_timestamp.isoformat(),
            "request_url": str(response.url),
            "elapsed_ms": response.elapsed.total_seconds() * 1000,
            "status_code": response.status_code,
        },

        "api_metadata": {
            "latitude": raw_data["latitude"],
            "longitude": raw_data["longitude"],
            "elevation": raw_data["elevation"],
            "generationtime_ms": raw_data["generationtime_ms"],
            "timezone": raw_data["timezone"],
            "utc_offset_seconds": raw_data["utc_offset_seconds"],
        },
    }

# define async function to fetch multiple locations
async def fetch_weather_multiple(
        locations: list[dict],
        hourly_variables: list[str] | None = None,
        daily_variables: list[str] | None = None,
        max_concurrent_requests: int = 5,
    ) -> list[dict] | Exception:

    """ Fetch weather data for multiple locations concurrently 

        Returns list of reslts or raises Exception (one per location).
    
    """

    semaphore = asyncio.Semaphore(max_concurrent_requests)

    async def fetch_with_limit(location: dict) -> dict | Exception:
        async with semaphore:
            try:
                result = await fetch_weather_single(
                    latitude=location["latitude"],
                    longitude=location["longitude"],
                    hourly_variables=hourly_variables,
                    daily_variables=daily_variables,
                    location=location,  # Pass location for tracing
                )
                # Add mapping info so data can be traced back to location
                result["location"] = location 
                return result
            except Exception as e:
                # If one fails, return error and location with issue
                return {"error": str(e), "location": location, "status": "failed"}

    tasks = [fetch_with_limit(location) for location in locations]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

In [233]:
# single location 
result = await fetch_weather_single(latitude=51.5074, longitude=-0.1278, hourly_variables=["temperature_2m"])
print(result)

{'data': {'latitude': 51.5, 'longitude': -0.120000124, 'generationtime_ms': 0.05614757537841797, 'utc_offset_seconds': 0, 'timezone': 'GMT', 'timezone_abbreviation': 'GMT', 'elevation': 16.0, 'hourly_units': {'time': 'iso8601', 'temperature_2m': '°C'}, 'hourly': {'time': ['2025-12-24T00:00', '2025-12-24T01:00', '2025-12-24T02:00', '2025-12-24T03:00', '2025-12-24T04:00', '2025-12-24T05:00', '2025-12-24T06:00', '2025-12-24T07:00', '2025-12-24T08:00', '2025-12-24T09:00', '2025-12-24T10:00', '2025-12-24T11:00', '2025-12-24T12:00', '2025-12-24T13:00', '2025-12-24T14:00', '2025-12-24T15:00', '2025-12-24T16:00', '2025-12-24T17:00', '2025-12-24T18:00', '2025-12-24T19:00', '2025-12-24T20:00', '2025-12-24T21:00', '2025-12-24T22:00', '2025-12-24T23:00', '2025-12-25T00:00', '2025-12-25T01:00', '2025-12-25T02:00', '2025-12-25T03:00', '2025-12-25T04:00', '2025-12-25T05:00', '2025-12-25T06:00', '2025-12-25T07:00', '2025-12-25T08:00', '2025-12-25T09:00', '2025-12-25T10:00', '2025-12-25T11:00', '2025-1

In [234]:
# multiple locations
locations = [
    {"name": "London", "latitude": 51.5074, "longitude": -0.1278},
    {"name": "New York", "latitude": 40.7128, "longitude": -74.0060},
    {"name": "Tokyo", "latitude": 35.6895, "longitude": 139.6917},
]
results = await fetch_weather_multiple(
    locations,
    hourly_variables=["temperature_2m", "precipitation"],
)
print(results)

[{'data': {'latitude': 51.5, 'longitude': -0.120000124, 'generationtime_ms': 0.11277198791503906, 'utc_offset_seconds': 0, 'timezone': 'GMT', 'timezone_abbreviation': 'GMT', 'elevation': 16.0, 'hourly_units': {'time': 'iso8601', 'temperature_2m': '°C', 'precipitation': 'mm'}, 'hourly': {'time': ['2025-12-24T00:00', '2025-12-24T01:00', '2025-12-24T02:00', '2025-12-24T03:00', '2025-12-24T04:00', '2025-12-24T05:00', '2025-12-24T06:00', '2025-12-24T07:00', '2025-12-24T08:00', '2025-12-24T09:00', '2025-12-24T10:00', '2025-12-24T11:00', '2025-12-24T12:00', '2025-12-24T13:00', '2025-12-24T14:00', '2025-12-24T15:00', '2025-12-24T16:00', '2025-12-24T17:00', '2025-12-24T18:00', '2025-12-24T19:00', '2025-12-24T20:00', '2025-12-24T21:00', '2025-12-24T22:00', '2025-12-24T23:00', '2025-12-25T00:00', '2025-12-25T01:00', '2025-12-25T02:00', '2025-12-25T03:00', '2025-12-25T04:00', '2025-12-25T05:00', '2025-12-25T06:00', '2025-12-25T07:00', '2025-12-25T08:00', '2025-12-25T09:00', '2025-12-25T10:00', '20

In [235]:
for result in results:
        print(result["ingestion_metadata"])
        print(result["api_metadata"])

{'timestamp_utc': '2025-12-24T01:33:57.598555+00:00', 'request_url': 'https://api.open-meteo.com/v1/forecast?latitude=51.5074&longitude=-0.1278&forecast_days=7&timezone=UTC&hourly=temperature_2m%2Cprecipitation', 'elapsed_ms': 565.3580000000001, 'status_code': 200}
{'latitude': 51.5, 'longitude': -0.120000124, 'elevation': 16.0, 'generationtime_ms': 0.11277198791503906, 'timezone': 'GMT', 'utc_offset_seconds': 0}
{'timestamp_utc': '2025-12-24T01:33:57.835610+00:00', 'request_url': 'https://api.open-meteo.com/v1/forecast?latitude=40.7128&longitude=-74.006&forecast_days=7&timezone=UTC&hourly=temperature_2m%2Cprecipitation', 'elapsed_ms': 341.808, 'status_code': 200}
{'latitude': 40.710335, 'longitude': -73.99309, 'elevation': 32.0, 'generationtime_ms': 0.08416175842285156, 'timezone': 'GMT', 'utc_offset_seconds': 0}
{'timestamp_utc': '2025-12-24T01:33:58.055271+00:00', 'request_url': 'https://api.open-meteo.com/v1/forecast?latitude=35.6895&longitude=139.6917&forecast_days=7&timezone=UTC&

In [236]:
import polars as pl

df = pl.from_dicts(results)

df_full = (
    df
    .unnest("data")
    .unnest("hourly")
    .explode(["temperature_2m", "precipitation", "time"])
    
    .with_columns([
        pl.col("api_metadata").struct.rename_fields([f"api_{f}" for f in df.schema["api_metadata"].fields]),
        pl.col("ingestion_metadata").struct.rename_fields([f"ingest_{f}" for f in df.schema["ingestion_metadata"].fields]),
        pl.col("location").struct.rename_fields([f"loc_{f}" for f in df.schema["location"].fields]),
    ])
    .unnest("api_metadata", "ingestion_metadata", "location")
)

df_full.tail(3)

latitude,longitude,generationtime_ms,utc_offset_seconds,timezone,timezone_abbreviation,elevation,hourly_units,time,temperature_2m,precipitation,"loc_Field('name', String)","loc_Field('latitude', Float64)","loc_Field('longitude', Float64)","ingest_Field('timestamp_utc', String)","ingest_Field('request_url', String)","ingest_Field('elapsed_ms', Float64)","ingest_Field('status_code', Int64)","api_Field('latitude', Float64)","api_Field('longitude', Float64)","api_Field('elevation', Float64)","api_Field('generationtime_ms', Float64)","api_Field('timezone', String)","api_Field('utc_offset_seconds', Int64)"
f64,f64,f64,i64,str,str,f64,struct[3],str,f64,f64,str,f64,f64,str,str,f64,i64,f64,f64,f64,f64,str,i64
35.7,139.6875,0.091553,0,"""GMT""","""GMT""",40.0,"{""iso8601"",""°C"",""mm""}","""2025-12-30T21:00""",3.8,0.0,"""Tokyo""",35.6895,139.6917,"""2025-12-24T01:33:58.055271+00:…","""https://api.open-meteo.com/v1/…",125.254,200,35.7,139.6875,40.0,0.091553,"""GMT""",0
35.7,139.6875,0.091553,0,"""GMT""","""GMT""",40.0,"{""iso8601"",""°C"",""mm""}","""2025-12-30T22:00""",4.4,0.0,"""Tokyo""",35.6895,139.6917,"""2025-12-24T01:33:58.055271+00:…","""https://api.open-meteo.com/v1/…",125.254,200,35.7,139.6875,40.0,0.091553,"""GMT""",0
35.7,139.6875,0.091553,0,"""GMT""","""GMT""",40.0,"{""iso8601"",""°C"",""mm""}","""2025-12-30T23:00""",5.5,0.0,"""Tokyo""",35.6895,139.6917,"""2025-12-24T01:33:58.055271+00:…","""https://api.open-meteo.com/v1/…",125.254,200,35.7,139.6875,40.0,0.091553,"""GMT""",0


In [237]:
df_full.schema

Schema([('latitude', Float64),
        ('longitude', Float64),
        ('generationtime_ms', Float64),
        ('utc_offset_seconds', Int64),
        ('timezone', String),
        ('timezone_abbreviation', String),
        ('elevation', Float64),
        ('hourly_units',
         Struct({'time': String, 'temperature_2m': String, 'precipitation': String})),
        ('time', String),
        ('temperature_2m', Float64),
        ('precipitation', Float64),
        ("loc_Field('name', String)", String),
        ("loc_Field('latitude', Float64)", Float64),
        ("loc_Field('longitude', Float64)", Float64),
        ("ingest_Field('timestamp_utc', String)", String),
        ("ingest_Field('request_url', String)", String),
        ("ingest_Field('elapsed_ms', Float64)", Float64),
        ("ingest_Field('status_code', Int64)", Int64),
        ("api_Field('latitude', Float64)", Float64),
        ("api_Field('longitude', Float64)", Float64),
        ("api_Field('elevation', Float64)", Float64)

In [238]:
tokyo_df = df_full.filter(pl.col("loc_Field('name', String)") == "Tokyo")
tokyo_df.head(1)

latitude,longitude,generationtime_ms,utc_offset_seconds,timezone,timezone_abbreviation,elevation,hourly_units,time,temperature_2m,precipitation,"loc_Field('name', String)","loc_Field('latitude', Float64)","loc_Field('longitude', Float64)","ingest_Field('timestamp_utc', String)","ingest_Field('request_url', String)","ingest_Field('elapsed_ms', Float64)","ingest_Field('status_code', Int64)","api_Field('latitude', Float64)","api_Field('longitude', Float64)","api_Field('elevation', Float64)","api_Field('generationtime_ms', Float64)","api_Field('timezone', String)","api_Field('utc_offset_seconds', Int64)"
f64,f64,f64,i64,str,str,f64,struct[3],str,f64,f64,str,f64,f64,str,str,f64,i64,f64,f64,f64,f64,str,i64
35.7,139.6875,0.091553,0,"""GMT""","""GMT""",40.0,"{""iso8601"",""°C"",""mm""}","""2025-12-24T00:00""",3.8,0.4,"""Tokyo""",35.6895,139.6917,"""2025-12-24T01:33:58.055271+00:…","""https://api.open-meteo.com/v1/…",125.254,200,35.7,139.6875,40.0,0.091553,"""GMT""",0


##### Transform
----

Convert nested API response into flat rows to be stored later in pipeline.


In [239]:
import polars as pl

In [241]:
def transform_hourly(result: dict) -> pl.DataFrame:
    """ Transform hourly API response to flat DataFrame """

    data = result["data"]
    hourly = data["hourly"]
    ingestion_metadata = result["ingestion_metadata"]
    api_metadata = result["api_metadata"]
    location = result.get("location", {})

    # Build Dataframe from hourly data - start with time
    data = {"time": hourly["time"]}

    # Add each variable (except time)
    for key, values in hourly.items():
        if key != "time":
            data[key] = values
    
    df = pl.DataFrame(data)

    # Parse time column to datetime
    df = df.with_columns(pl.col("time").str.to_datetime().alias("time"))

    # Add location cols - use coords if location name is not provided
    df = df.with_columns(
        pl.lit(location.get("name") or f"{location.get('latitude')},{location.get('longitude')}").alias("location_name"),
        pl.lit(location.get("latitude")).alias("requested_latitude"),
        pl.lit(location.get("longitude")).alias("requested_longitude"),
    )

    # Add API metadata cols
    df = df.with_columns(
        pl.lit(api_metadata["latitude"]).alias("api_latitude"),
        pl.lit(api_metadata["longitude"]).alias("api_longitude"),
        pl.lit(api_metadata["elevation"]).alias("api_elevation"),
        pl.lit(api_metadata["timezone"]).alias("api_timezone"),
        pl.lit(api_metadata["generationtime_ms"]).alias("api_generationtime_ms"),
    )

    # Add ingestion metadata cols
    df = df.with_columns(
        pl.lit(ingestion_metadata["timestamp_utc"]).alias("ingestion_timestamp_utc"),
        pl.lit(ingestion_metadata["elapsed_ms"]).alias("ingest_elapsed_ms"),
        pl.lit(ingestion_metadata["request_url"]).alias("request_url"),
    )

    return df


def transform_daily(result: dict) -> pl.DataFrame:
    """ Transform daily API response to flat DataFrame """
   
    data = result["data"]
    daily = data["daily"]
    ingestion = result["ingestion_metadata"]
    api_meta = result["api_metadata"]
    location = result.get("location", {})
    
    # Build Dataframe from daily data - start with time
    data = {"date": daily["time"]}
    
    # Add each variable (except time)
    for key, values in daily.items():
        if key != "time":
            data[key] = values
    
    df = pl.DataFrame(data)
    
    # Parse date column to datetime
    df = df.with_columns(pl.col("date").str.to_date().alias("date"))
    
    # Add location cols - use coords if location name is not provided
    location_name = location.get("name") or f"{location.get('latitude')},{location.get('longitude')}"

    df = df.with_columns(
        pl.lit(location_name).alias("location_name"),
        pl.lit(location.get("latitude")).alias("requested_latitude"),
        pl.lit(location.get("longitude")).alias("requested_longitude"),
    )
    
    # Add API metadata cols
    df = df.with_columns(
        pl.lit(api_meta["latitude"]).alias("api_latitude"),
        pl.lit(api_meta["longitude"]).alias("api_longitude"),
        pl.lit(api_meta["elevation"]).alias("api_elevation"),
        pl.lit(api_meta["timezone"]).alias("api_timezone"),
        pl.lit(api_meta["generationtime_ms"]).alias("api_generationtime_ms"),
    )
    
    # Add ingestion metadata cols
    df = df.with_columns(
        pl.lit(ingestion["timestamp_utc"]).alias("ingestion_timestamp_utc"),
        pl.lit(ingestion["elapsed_ms"]).alias("request_elapsed_ms"),
        pl.lit(ingestion["request_url"]).alias("request_url"),
    )
    
    return df


def get_partition_path_single(location_name: str, interval: str, ingestion_timestamp: str) -> str:
    """ Generate partition path for storage.
    
    Format: {interval}/location={name}/date={YYYY-MM-DD}/
    """
    dt = datetime.fromisoformat(ingestion_timestamp.replace("Z", "+00:00"))
    date_str = dt.strftime("%Y-%m-%d")
    return f"{interval}/location={location_name}/{date_str}"    

def get_partition_paths_multiple(df: pl.DataFrame, interval: str) -> list[str]:
    """ Generate partition paths for multiple locations in a DataFrame.
    
    Returns unique paths for each location/date combination.
    Format: {interval}/location={name}/date={YYYY-MM-DD}/
    """
    partition_data = df.select(["location_name", "ingestion_timestamp_utc"]).unique()
    
    paths = []
    for row in partition_data.iter_rows(named=True):
        path = get_partition_path_single(
            location_name=row["location_name"],
            interval=interval,
            ingestion_timestamp=row["ingestion_timestamp_utc"],
        )
        paths.append(path)
    
    return paths

In [242]:
# single location 
result_single_hourly = await fetch_weather_single(51.5074, -0.1278, hourly_variables=["temperature_2m"])
result_single_daily = await fetch_weather_single(51.5074, -0.1278, daily_variables=["temperature_2m_max", "temperature_2m_min"])

# multiple locations - locations already defined above
results_multiple_hourly = await fetch_weather_multiple(
    locations,
    hourly_variables=["temperature_2m", "precipitation"],
)

results_multiple_daily = await fetch_weather_multiple(
    locations,
    daily_variables=["temperature_2m_max", "temperature_2m_min", "precipitation_sum"],
)

In [243]:
# Single locations dfs
df_single_hourly = transform_hourly(result_single_hourly)
df_single_daily = transform_daily(result_single_daily)

# Multiple locations dfs
df_multiple_hourly = pl.concat([transform_hourly(res) for res in results_multiple_hourly if "data" in res])
df_multiple_daily = pl.concat([transform_daily(res) for res in results_multiple_daily if "data" in res])

In [244]:
df_multiple_daily.head(1)

date,temperature_2m_max,temperature_2m_min,precipitation_sum,location_name,requested_latitude,requested_longitude,api_latitude,api_longitude,api_elevation,api_timezone,api_generationtime_ms,ingestion_timestamp_utc,request_elapsed_ms,request_url
date,f64,f64,f64,str,f64,f64,f64,f64,f64,str,f64,str,f64,str
2025-12-24,6.2,4.2,0.0,"""London""",51.5074,-0.1278,51.5,-0.12,16.0,"""GMT""",0.177383,"""2025-12-24T01:34:34.939476+00:…",533.743,"""https://api.open-meteo.com/v1/…"


In [245]:
df_multiple_daily.null_count()

date,temperature_2m_max,temperature_2m_min,precipitation_sum,location_name,requested_latitude,requested_longitude,api_latitude,api_longitude,api_elevation,api_timezone,api_generationtime_ms,ingestion_timestamp_utc,request_elapsed_ms,request_url
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [246]:
df_multiple_hourly.head(1)

time,temperature_2m,precipitation,location_name,requested_latitude,requested_longitude,api_latitude,api_longitude,api_elevation,api_timezone,api_generationtime_ms,ingestion_timestamp_utc,ingest_elapsed_ms,request_url
datetime[μs],f64,f64,str,f64,f64,f64,f64,f64,str,f64,str,f64,str
2025-12-24 00:00:00,6.1,0.0,"""London""",51.5074,-0.1278,51.5,-0.12,16.0,"""GMT""",0.117779,"""2025-12-24T01:34:34.156405+00:…",540.088,"""https://api.open-meteo.com/v1/…"


In [247]:
df_multiple_hourly.null_count()

time,temperature_2m,precipitation,location_name,requested_latitude,requested_longitude,api_latitude,api_longitude,api_elevation,api_timezone,api_generationtime_ms,ingestion_timestamp_utc,ingest_elapsed_ms,request_url
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [248]:
df_single_hourly.null_count()

time,temperature_2m,location_name,requested_latitude,requested_longitude,api_latitude,api_longitude,api_elevation,api_timezone,api_generationtime_ms,ingestion_timestamp_utc,ingest_elapsed_ms,request_url
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,0,0,0,0,0,0,0,0,0


In [249]:
df_single_daily.head(1)

date,temperature_2m_max,temperature_2m_min,location_name,requested_latitude,requested_longitude,api_latitude,api_longitude,api_elevation,api_timezone,api_generationtime_ms,ingestion_timestamp_utc,request_elapsed_ms,request_url
date,f64,f64,str,f64,f64,f64,f64,f64,str,f64,str,f64,str
2025-12-24,6.2,4.2,"""51.5074,-0.1278""",51.5074,-0.1278,51.5,-0.12,16.0,"""GMT""",0.080347,"""2025-12-24T01:34:33.838101+00:…",105.686,"""https://api.open-meteo.com/v1/…"


In [251]:
# Partition paths
partition_path_single_hourly = get_partition_paths_multiple(
    df = df_single_hourly,
    interval="hourly",
)

partition_path_single_daily = get_partition_path_single(
    location_name=df_single_daily[0, "location_name"],
    interval="daily",
    ingestion_timestamp=df_single_daily[0, "ingestion_timestamp_utc"],
)

partition_path_multiple_daily = get_partition_paths_multiple(
    df=df_multiple_daily,
    interval="daily",
)

print(partition_path_single_hourly)
# print(partition_path_single_daily)
# multiple locations, currently only retruns a single location name/value
print(partition_path_multiple_daily) 

['hourly/location=51.5074,-0.1278/2025-12-24']
['daily/location=Tokyo/2025-12-24', 'daily/location=London/2025-12-24', 'daily/location=New York/2025-12-24']


##### Writing
----

Take dataframe, write to disk - checks (is it there, formatting?)

In [223]:
from pathlib import Path

In [261]:
def write_dataframe_to_parquet(
        df: pl.DataFrame,
        base_path: str,
        partition_path: str,
        compression: str = "snappy",
    ) -> dict:
    """ Write DataFrame to Parquet file at specified partition path.
    
    Returns dict with path and records written.
    """

    # Build full directory path
    full_dir = Path(base_path) / partition_path
    full_dir.mkdir(parents=True, exist_ok=True)

    # Generate unique filename
    timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
    filename = f"data_{timestamp}.parquet"
    # Join full file path - combine dir and filename using / where pathlib replaces os.path.join()
    full_path = full_dir / filename

    # Write with compression
    df.write_parquet(full_path, compression=compression)

    return {
        "file_path": str(full_path),
        "records_written": len(df),
        "format": "parquet",
        "compression": compression,
    }

def write_parquet_partitioned(
        df: pl.DataFrame,
        base_path: str,
        interval: str,
) -> List[dict]:
    """ Write DataFrame partitioned by location and date.
    
    Returns list of write results per partition.
    """
    results = []

    # Group by partition keys - location_name and date from ingestion timestamp
    for (location_name,), group_df in df.group_by(["location_name"]):
        # Get unique ingestion timestamp from fist row
        ingestion_timestamp = group_df["ingestion_timestamp_utc"][0]
       
        # Generate partition path
        partition_path = get_partition_path_single(
            location_name=location_name,
            interval="daily" if "date" in group_df.columns else "hourly",
            ingestion_timestamp=ingestion_timestamp,
        )

        # Write each group to its own file
        result = write_dataframe_to_parquet(group_df, base_path, partition_path)
        results.append(result)

    return results

    
       

In [None]:
# Test write single daily df
write_results = write_parquet_partitioned(df_single_hourly, "./data/weather", "hourly")