Base config

In [1]:
import os
import time
from datetime import datetime
from typing import Dict, Any, List
import requests
import json

# Endpoint Open-Meteo
BASE_URL_OM = "https://api.open-meteo.com/v1/forecast"

# Variables hourly that we are going to use in the project
HOURLY_VARS = "temperature_2m,precipitation,wind_speed_10m,weather_code,visibility"

# Stations
STATIONS_OM = [
    {"station_code": "S01700", "name": "Milano Centrale", "lat": 45.485, "lon": 9.204},
    {"station_code": "S01645", "name": "Milano Porta Garibaldi", "lat": 45.485, "lon": 9.190},
]

# Directory base to save the data
BASE_DIR = "./data"   # puoi cambiarla


Open-Meteo calling function

In [12]:
def fetch_openmeteo_hourly(
    lat: float,
    lon: float,
    past_hours: int = 2,
    forecast_hours: int = 0,
    timezone: str = "Europe/Rome",
    timeout_sec: int = 30,
    max_retries: int = 3,
    backoff_sec: float = 2.0,
):
    params = {
        "latitude": lat,
        "longitude": lon,
        "hourly": HOURLY_VARS,
        "timezone": timezone,
        "timeformat": "iso8601",
        "past_hours": past_hours,
        "forecast_hours": forecast_hours,
    }

    last_exception = None

    for attempt in range(1, max_retries + 1):
        try:
            start = time.time()
            resp = requests.get(
                BASE_URL_OM,
                params=params,
                timeout=timeout_sec
            )
            latency = time.time() - start

            status = resp.status_code
            url = resp.url
            data = resp.json() if status == 200 else None

            return status, latency, data, url

        except requests.exceptions.ReadTimeout as e:
            last_exception = e
            print(f"[TIMEOUT] tentativo {attempt}/{max_retries} "
                  f"(retry tra {backoff_sec}s)")
            time.sleep(backoff_sec)

        except requests.exceptions.RequestException as e:
            # qualsiasi altro errore di rete
            last_exception = e
            print(f"[HTTP ERROR] tentativo {attempt}/{max_retries}: {e}")
            time.sleep(backoff_sec)

        backoff_sec *= 2  # backoff esponenziale

    print("[FALLIMENTO] Open-Meteo non risponde dopo retries")
    return None, None, None, None


Basic throughput probe/latency test

In [3]:
from collections import Counter

def throughput_probe_openmeteo(
    stations: List[Dict[str, Any]], # list of dicts with code: station_code, lat, lon
    delay_sec: float = 0.5, # pause between one call and the next one
    cycles: int = 3, # n of cycles
):
    logs = []

    for c in range(cycles):
        print(f"\n=== Ciclo {c+1}/{cycles} ===")
        for st in stations:
            code = st["station_code"]
            lat = st["lat"]
            lon = st["lon"]

            status, latency, data, url = fetch_openmeteo_hourly(
                lat=lat,
                lon=lon,
                past_hours=2,
                forecast_hours=0,
            )

            log = {
                "timestamp": datetime.now().isoformat(),
                "station_code": code,
                "lat": lat,
                "lon": lon,
                "status": status,
                "latency": latency,
                "url": url,
            }
            logs.append(log)

            print(f"{log['timestamp']}  {code} → {status} "
                  f"({latency:.3f}s)")

            time.sleep(delay_sec)


    print("Total requests:", len(logs))

    if logs:
        statuses = Counter(l["status"] for l in logs)
        print("Status codes:")
        for s, count in statuses.items():
            print(f"  - {s}: {count}")

        avg_latency = sum(l["latency"] for l in logs) / len(logs)
        print(f"Avg latency: {avg_latency:.3f} s")

    return logs


In [4]:
logs_om = throughput_probe_openmeteo(
    stations=STATIONS_OM,
    delay_sec=0.5,
    cycles=5,  # 5 giri su tutte le stazioni
)



=== Ciclo 1/5 ===
2025-12-06T16:08:18.404715  S01700 → 200 (0.628s)
2025-12-06T16:08:19.514237  S01645 → 200 (0.608s)

=== Ciclo 2/5 ===
2025-12-06T16:08:20.620468  S01700 → 200 (0.605s)
2025-12-06T16:08:21.729908  S01645 → 200 (0.608s)

=== Ciclo 3/5 ===
2025-12-06T16:08:22.838496  S01700 → 200 (0.607s)
2025-12-06T16:08:23.949444  S01645 → 200 (0.609s)

=== Ciclo 4/5 ===
2025-12-06T16:08:25.058646  S01700 → 200 (0.608s)
2025-12-06T16:08:26.167905  S01645 → 200 (0.607s)

=== Ciclo 5/5 ===
2025-12-06T16:08:27.275460  S01700 → 200 (0.606s)
2025-12-06T16:08:28.383627  S01645 → 200 (0.606s)
Total requests: 10
Status codes:
  - 200: 10
Avg latency: 0.609 s


In [5]:
def parse_openmeteo_to_rows(data: Dict[str, Any], station_code: str) -> List[Dict[str, Any]]:
    """
    Converte la risposta Open-Meteo in una lista di righe
    con i campi del tuo schema weather (senza DB).
    """
    hourly = data.get("hourly", {})
    times = hourly.get("time", [])
    temps = hourly.get("temperature_2m", [])
    precs = hourly.get("precipitation", [])
    winds = hourly.get("wind_speed_10m", [])
    wcodes = hourly.get("weather_code", [])
    visibility = hourly.get("visibility", [])

    rows = []
    for i, ts in enumerate(times):
        # es: "2025-12-06T15:00"
        ts_dt = datetime.fromisoformat(ts)  # naive in Europe/Rome

        row = {
            "station_code": station_code,
            "timestamp": ts_dt,
            "temperature": float(temps[i]) if i < len(temps) else None,
            "wind_speed": float(winds[i]) if i < len(winds) else None,
            "precip_mm": float(precs[i]) if i < len(precs) else None,
            "weather_code": int(wcodes[i]) if i < len(wcodes) else None,
            "visibility": float(visibility[i]) if i < len(visibility) else None,
        }
        rows.append(row)

    return rows


RAW data saving in disk

In [9]:
def ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)

def save_raw_openmeteo(data: Dict[str, Any], station_code: str):
    today = datetime.now().strftime("%Y-%m-%d")
    dir_path = os.path.join(BASE_DIR, "raw", "openmeteo", today)
    ensure_dir(dir_path)

    file_path = os.path.join(dir_path, f"{station_code}_raw.json")
    with open(file_path, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)

    print(f"[RAW] Salvato {file_path}")


In [10]:
def run_openmeteo_pipeline_once():
    all_rows = []

    for st in STATIONS_OM:
        print(f"\n[Open-Meteo] {st['station_code']} - {st['name']}")
        status, latency, data, url = fetch_openmeteo_hourly(
            st["lat"], st["lon"]
        )

        print(f"Status {status} | latency {latency:.3f}s")

        if status != 200:
            print("ERRORE, salto stazione")
            continue

        save_raw_openmeteo(data, st["station_code"])

        rows = parse_openmeteo_to_rows(data, st["station_code"])
        print(f"Righe normalizzate: {len(rows)}")

        all_rows.extend(rows)
        time.sleep(0.3)

    print(f"\nTotale righe generate: {len(all_rows)}")
    return all_rows


In [13]:
weather_rows = run_openmeteo_pipeline_once()


[Open-Meteo] S01700 - Milano Centrale
Status 200 | latency 0.618s
[RAW] Salvato ./data/raw/openmeteo/2025-12-06/S01700_raw.json
Righe normalizzate: 2

[Open-Meteo] S01645 - Milano Porta Garibaldi
Status 200 | latency 0.607s
[RAW] Salvato ./data/raw/openmeteo/2025-12-06/S01645_raw.json
Righe normalizzate: 2

Totale righe generate: 4


Basic data quality stuff

In [21]:
def weather_quality_summary(rows):
    if not rows:
        print("Nessun dato meteo da analizzare.")
        return

    n = len(rows)

    # completeness
    missing_temp  = sum(1 for r in rows if r["temperature"]  is None)
    missing_wind  = sum(1 for r in rows if r["wind_speed"]   is None)
    missing_prec  = sum(1 for r in rows if r["precip_mm"]    is None)
    missing_wcode = sum(1 for r in rows if r["weather_code"] is None)

    print(f"Totale righe: {n}")
    print(f"- temperature mancanti:   {missing_temp}  ({missing_temp/n:.1%})")
    print(f"- wind_speed mancanti:    {missing_wind}  ({missing_wind/n:.1%})")
    print(f"- precip_mm mancanti:     {missing_prec}  ({missing_prec/n:.1%})")
    print(f"- weather_code mancanti:  {missing_wcode} ({missing_wcode/n:.1%})")

    # basic range checks (non correggiamo, solo contiamo)
    out_temp = sum(
        1 for r in rows
        if r["temperature"] is not None and not (-30 <= r["temperature"] <= 50)
    )
    out_wind = sum(
        1 for r in rows
        if r["wind_speed"] is not None and not (0 <= r["wind_speed"] <= 150)
    )
    out_prec = sum(
        1 for r in rows
        if r["precip_mm"] is not None and not (0 <= r["precip_mm"] <= 500)
    )
    out_wcode = sum(
        1 for r in rows
        if r["weather_code"] is not None and not (0 <= r["weather_code"] <= 99)
    )
    out_vis = sum(
        1 for r in rows
        if r["visibility"] is not None
    )

    print("\nRange check (valori sospetti):")
    print(f"- temperature fuori [-30, 50]°C:  {out_temp}")
    print(f"- wind_speed fuori [0, 150]:      {out_wind}")
    print(f"- precip_mm fuori [0, 500]:       {out_prec}")
    print(f"- weather_code fuori [0, 99]:     {out_wcode}")
    print(f"- visibility null:                {out_vis}")


In [22]:
weather_quality_summary(weather_rows)


Totale righe: 4
- temperature mancanti:   0  (0.0%)
- wind_speed mancanti:    0  (0.0%)
- precip_mm mancanti:     0  (0.0%)
- weather_code mancanti:  0 (0.0%)

Range check (valori sospetti):
- temperature fuori [-30, 50]°C:  0
- wind_speed fuori [0, 150]:      0
- precip_mm fuori [0, 500]:       0
- weather_code fuori [0, 99]:     0
- visibility null:            4


In [26]:
import csv

def save_weather_curated_csv(rows, base_dir=BASE_DIR):
    if not rows:
        print("Nessuna riga da salvare.")
        return

    today = datetime.now().strftime("%Y-%m-%d")
    dir_path = os.path.join(base_dir, "curated", "openmeteo")
    os.makedirs(dir_path, exist_ok=True)

    file_path = os.path.join(dir_path, f"weather_{today}.csv")

    fieldnames = ["station_code", "timestamp", "temperature", "wind_speed", "precip_mm", "weather_code", "visibility"]

    with open(file_path, "w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()

        for r in rows:
            writer.writerow({
                "station_code": r["station_code"],
                "timestamp": r["timestamp"].isoformat(),  # ISO 8601 → Postgres lo legge
                "temperature": r["temperature"],
                "wind_speed": r["wind_speed"],
                "precip_mm": r["precip_mm"],
                "weather_code": r["weather_code"],
                "visibility": r["visibility"]
            })

    print(f"[CURATED CSV] Salvato {file_path}")
    return file_path


In [27]:
weather_rows = run_openmeteo_pipeline_once()
csv_path = save_weather_curated_csv(weather_rows)
csv_path



[Open-Meteo] S01700 - Milano Centrale
Status 200 | latency 0.680s
[RAW] Salvato ./data/raw/openmeteo/2025-12-06/S01700_raw.json
Righe normalizzate: 2

[Open-Meteo] S01645 - Milano Porta Garibaldi
Status 200 | latency 0.612s
[RAW] Salvato ./data/raw/openmeteo/2025-12-06/S01645_raw.json
Righe normalizzate: 2

Totale righe generate: 4
[CURATED CSV] Salvato ./data/curated/openmeteo/weather_2025-12-06.csv


'./data/curated/openmeteo/weather_2025-12-06.csv'