# Step 2 · Normalize SIATA feeds

This notebook flattens the historic and current precipitation JSON feeds into a common schema ready for loading into PostgreSQL (NeonDB). It also persists intermediate artifacts for inspection.


## 1. Setup & paths


In [None]:

from __future__ import annotations
import json
import os
from pathlib import Path
from typing import Any, Dict, Iterable, List

import pandas as pd
from dotenv import load_dotenv

load_dotenv(dotenv_path=Path('..') / '.env', override=True)

BASE_DIR = Path('..').resolve()
RAW_DIR = BASE_DIR / 'data' / 'raw'
PROCESSED_DIR = BASE_DIR / 'data' / 'processed'
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

HISTORIC_PATH = RAW_DIR / 'historic_precipitation.json'
CURRENT_PATH = RAW_DIR / 'current_precipitation.json'

HISTORIC_PATH, CURRENT_PATH


## 2. Load raw feeds

In [None]:

with HISTORIC_PATH.open() as f:
    historic_raw = json.load(f)

with CURRENT_PATH.open() as f:
    current_raw = json.load(f)

len(historic_raw), len(current_raw['estaciones'])


## 3. Normalize sensor metadata

We create a unified `sensors` table with deterministic IDs. Historic Vaisala stations are prefixed with `vaisala_`, current pluvio stations with `pluvio_`.


In [None]:

def to_float(value: Any) -> float | None:
    try:
        if value is None:
            return None
        if isinstance(value, str) and not value.strip():
            return None
        return float(value)
    except (TypeError, ValueError):
        return None


def normalize_historic_sensors(raw: Iterable[Dict[str, Any]]) -> pd.DataFrame:
    records: List[Dict[str, Any]] = []
    for item in raw:
        codigo = str(item.get('codigo'))
        sensor_id = f"vaisala_{codigo}"
        records.append({
            'id': sensor_id,
            'provider_id': codigo,
            'name': item.get('nombre'),
            'lat': to_float(item.get('latitud')),
            'lon': to_float(item.get('longitud')),
            'elevation_m': None,
            'city': item.get('ciudad'),
            'subbasin': item.get('subcuenca'),
            'barrio': None,
            'metadata': {
                'source': 'historic',
                'raw': {
                    'latitud': item.get('latitud'),
                    'longitud': item.get('longitud'),
                    'subcuenca': item.get('subcuenca'),
                }
            }
        })
    return pd.DataFrame.from_records(records)


def normalize_current_sensors(raw: Iterable[Dict[str, Any]]) -> pd.DataFrame:
    records: List[Dict[str, Any]] = []
    for item in raw:
        codigo = str(item.get('codigo'))
        sensor_id = f"pluvio_{codigo}"
        records.append({
            'id': sensor_id,
            'provider_id': codigo,
            'name': item.get('nombre'),
            'lat': to_float(item.get('latitud')),
            'lon': to_float(item.get('longitud')),
            'elevation_m': None,
            'city': item.get('ciudad'),
            'subbasin': item.get('subcuenca'),
            'barrio': item.get('barrio'),
            'metadata': {
                'source': 'current',
                'comuna': item.get('comuna'),
            }
        })
    return pd.DataFrame.from_records(records)

historic_sensors_df = normalize_historic_sensors(historic_raw)
current_sensors_df = normalize_current_sensors(current_raw['estaciones'])

sensors_df = (
    pd.concat([historic_sensors_df, current_sensors_df], ignore_index=True)
    .drop_duplicates(subset=['id'])
)

print('Historic sensors:', len(historic_sensors_df))
print('Current sensors:', len(current_sensors_df))
print('Combined sensors:', len(sensors_df))
sensors_df.head()


## 4. Flatten measurement records

Historic feed contains high-frequency minute data per station. The current feed only reports the latest reading per station; we tag those with the retrieval timestamp.


In [None]:

from datetime import datetime, timezone


def normalize_historic_measurements(raw: Iterable[Dict[str, Any]]) -> pd.DataFrame:
    records: List[Dict[str, Any]] = []
    for item in raw:
        sensor_id = f"vaisala_{item.get('codigo')}"
        for slot in item.get('datos', []):
            ts_str = slot.get('fecha')
            try:
                ts = pd.to_datetime(ts_str, utc=True)
            except Exception:
                ts = pd.NaT
            for var_entry in slot.get('datos', []):
                records.append({
                    'sensor_id': sensor_id,
                    'ts': ts,
                    'value_mm': to_float(var_entry.get('valor')),
                    'quality': to_float(var_entry.get('calidad')),
                    'variable': var_entry.get('variableConsulta'),
                    'source': 'historic'
                })
    df = pd.DataFrame.from_records(records)
    # Filter to precipitation variable (defensive if other variables appear)
    if not df.empty:
        df = df[df['variable'] == 'precipitacion']
    return df


def normalize_current_measurements(raw: Iterable[Dict[str, Any]], retrieval_ts: datetime) -> pd.DataFrame:
    records: List[Dict[str, Any]] = []
    for item in raw:
        sensor_id = f"pluvio_{item.get('codigo')}"
        value = to_float(item.get('valor'))
        if value is not None and value < 0:
            # Current feed uses -999 for missing values
            value = None
        records.append({
            'sensor_id': sensor_id,
            'ts': pd.Timestamp(retrieval_ts),
            'value_mm': value,
            'quality': None,
            'variable': 'precipitacion',
            'source': 'current'
        })
    return pd.DataFrame.from_records(records)

historic_measurements_df = normalize_historic_measurements(historic_raw)
current_measurements_df = normalize_current_measurements(
    current_raw['estaciones'],
    retrieval_ts=datetime.now(timezone.utc).replace(microsecond=0)
)

print('Historic measurements:', len(historic_measurements_df))
print('Current measurements:', len(current_measurements_df))

combined_measurements_df = pd.concat(
    [historic_measurements_df, current_measurements_df],
    ignore_index=True
)
combined_measurements_df.head()


### Quick inspection

In [None]:

combined_measurements_df.describe(include='all')


## 5. Persist normalized artifacts

We save the normalized outputs for downstream services and for verifying the transformation.


In [None]:

sensors_output = PROCESSED_DIR / 'sensors_normalized.parquet'
measurements_output = PROCESSED_DIR / 'measurements_normalized.parquet'

sensors_df.to_parquet(sensors_output, index=False)
combined_measurements_df.to_parquet(measurements_output, index=False)

sensors_output, measurements_output


## 6. Load into PostgreSQL (NeonDB)

The following cell performs idempotent upserts into the schema defined in `db/schema.sql`. Configure `DATABASE_URL` in `.env` before running. The helper uses PostgreSQL `ON CONFLICT` to avoid duplicating records.


In [None]:

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import insert


DATABASE_URL = os.getenv('DATABASE_URL')
if not DATABASE_URL:
    raise RuntimeError('DATABASE_URL not set; update .env before loading data.')

engine = sa.create_engine(DATABASE_URL, pool_pre_ping=True, future=True)

sensors_records = sensors_df.to_dict(orient='records')
measurements_records = combined_measurements_df.to_dict(orient='records')

metadata = sa.MetaData()
sensors_table = sa.Table('sensors', metadata, autoload_with=engine)
raw_measurements_table = sa.Table('raw_measurements', metadata, autoload_with=engine)

with engine.begin() as conn:
    if sensors_records:
        stmt = insert(sensors_table).values(sensors_records)
        update_cols = {col.name: col for col in stmt.excluded if col.name not in ('id',)}
        conn.execute(stmt.on_conflict_do_update(index_elements=['id'], set_=update_cols))

    if measurements_records:
        stmt = insert(raw_measurements_table).values(measurements_records)
        conn.execute(
            stmt.on_conflict_do_update(
                index_elements=['sensor_id', 'ts'],
                set_={
                    'value_mm': stmt.excluded.value_mm,
                    'quality': stmt.excluded.quality,
                    'variable': stmt.excluded.variable,
                    'source': stmt.excluded.source,
                    'updated_at': sa.func.now()
                }
            )
        )
print('Load completed')
