In [18]:
import sys
import json
import math
import pandas as pd

from pathlib import Path
from datetime import date

PROJECT_ROOT = Path.cwd().parent
sys.path.insert(0, str(PROJECT_ROOT))

from src.schema.observations import DailyObservation, SCHEMA_VERSION

In [19]:
RAW_JSON_DIR = Path("../data/raw/arpa")
OUT_DIR = Path("../data/processed")
OUT_DIR.mkdir(parents=True, exist_ok=True)

json_files = list(RAW_JSON_DIR.glob("*.json"))
len(json_files)

8

In [20]:
def to_float(x):
    if x in ("-", "", None):
        return None
    if isinstance(x, float) and math.isnan(x):
        return None
    try:
        return float(x)
    except Exception:
        return None


def to_int(x):
    if x in ("-", "", None):
        return None
    if isinstance(x, float) and math.isnan(x):
        return None
    try:
        return int(x)
    except Exception:
        return None


def build_date(year, month, day):
    if year is None or month is None or day is None:
        return None
    try:
        return date(int(year), int(month), int(day))
    except Exception:
        return None

In [21]:
FIELD_MAP = {
    "Pioggia mm": "precipitation",
    "Temp. min 째C": "temperature_min",
    "Temp. med 째C": "temperature_mean",
    "Temp. max 째C": "temperature_max",
    "Umidita' min %": "humidity_min",
    "Umidita' med %": "humidity_mean",
    "Umidita' max %": "humidity_max",
    "Vento med km/h": "wind_speed_mean",
    "Vento max km/h": "wind_speed_max",
    "Dir. V. max 째N": "wind_direction_max",
    "Radiaz. KJ/m2": "solar_radiation",
    "Press. med hPa": "pressure_mean",
}

In [22]:
def record_to_daily_observation(raw: dict):
    year = raw.get("anno")
    month = raw.get("mese")
    day = raw.get("giorno*")

    obs_date = build_date(year, month, day)
    if obs_date is None:
        return None

    data = {
        "date": obs_date,
        "year": int(year),
        "month": int(month),
        "day": int(day),
        "station_name": raw.get("stazione"),
    }

    for raw_key, field in FIELD_MAP.items():
        val = raw.get(raw_key)
        if field == "wind_direction_max":
            data[field] = to_int(val)
        else:
            data[field] = to_float(val)

    try:
        return DailyObservation(**data)
    except Exception:
        return None

In [23]:
def load_all_raw_records():
    for path in json_files:
        with open(path, "r", encoding="utf-8") as f:
            station_data = json.load(f)

        # station_data: { "1999": [ {...}, {...} ], "2000": [...] }
        for year_str, records in station_data.items():
            if not isinstance(records, list):
                continue

            for rec in records:
                yield rec

In [24]:
rows = []

for raw in load_all_raw_records():
    obs = record_to_daily_observation(raw)
    if obs is not None:
        rows.append(obs.model_dump())

df = pd.DataFrame(rows)
df.shape

(65929, 17)

In [25]:
display(df.head())

Unnamed: 0,date,year,month,day,station_name,precipitation,temperature_min,temperature_mean,temperature_max,humidity_min,humidity_mean,humidity_max,wind_speed_mean,wind_speed_max,wind_direction_max,solar_radiation,pressure_mean
0,2004-01-01,2004,1,1,Piancavallo,,,,,,,,,,,,
1,2004-01-02,2004,1,2,Piancavallo,,,,,,,,,,,,
2,2004-01-03,2004,1,3,Piancavallo,,,,,,,,,,,,
3,2004-01-04,2004,1,4,Piancavallo,,,,,,,,,,,,
4,2004-01-05,2004,1,5,Piancavallo,,,,,,,,,,,,


In [26]:
assert df["date"].isna().sum() == 0
assert df["station_name"].isna().sum() == 0
assert df["year"].min() >= 1900

df.dtypes

date                   object
year                    int64
month                   int64
day                     int64
station_name           object
precipitation         float64
temperature_min       float64
temperature_mean      float64
temperature_max       float64
humidity_min          float64
humidity_mean         float64
humidity_max          float64
wind_speed_mean       float64
wind_speed_max        float64
wind_direction_max    float64
solar_radiation       float64
pressure_mean         float64
dtype: object

In [27]:
out_path = OUT_DIR / "daily.parquet"
df.to_parquet(out_path, index=False)

out_path

PosixPath('../data/processed/daily.parquet')

In [28]:
pd.read_parquet(out_path).shape

(65929, 17)