# EVT — GEV (MLE) Batch Fitting Across All Cities (Ecuador)

**Project:** Extreme Value Analysis — Ecuador Climate Data  
**Method:** Block Maxima + GEV via Maximum Likelihood (scipy)  
**Author:** Jefferson Conza  
**Updated:** 2026-02-23

This notebook fits a **GEV distribution** to **block maxima** for **multiple cities** and **multiple variables**, saving:
- Per (city, variable): parameters, return levels, diagnostics plots (optional)
- Global: `gev_mle_summary.csv/.parquet` and `gev_mle_batch_errors.csv`

It is designed for your current repository layout:

```
/content/drive/MyDrive/extreme-climate-forecasting/
└── data/_tmp_evt_city_sorted_final/<CITY>.parquet
```

> Notes  
> - Uses **tqdm progress bars**.  
> - Handles dtype edge-cases (e.g., categorical columns) safely.  
> - Produces robust summaries even when some/all fits fail.


In [11]:
# Mount Google Drive (Colab)
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## 1. Configuration

In [12]:
from __future__ import annotations

from pathlib import Path
import os
import warnings

import numpy as np
import pandas as pd

from pandas.api.types import is_datetime64_any_dtype, is_numeric_dtype

from scipy.stats import genextreme

import matplotlib.pyplot as plt

from tqdm.auto import tqdm

warnings.filterwarnings("ignore")

# -----------------------------
# Paths
# -----------------------------
PROJECT_DIR = Path("/content/drive/MyDrive/extreme-climate-forecasting")
DATA_DIR = PROJECT_DIR / "data"
CITY_PARQUET_DIR = DATA_DIR / "_tmp_evt_city_sorted_final"
RESULTS_DIR = PROJECT_DIR / "results"
RESULTS_DIR.mkdir(parents=True, exist_ok=True)

# -----------------------------
# Variables to fit (edit here)
# -----------------------------
EVT_VARS = [
    "temp",
    # "temp_max",
    # "rain_1h",
    # "wind_speed",
    # "humidity",
]

# -----------------------------
# Block rule for maxima
# -----------------------------
# "YE" annual (year-end) | "ME" monthly (month-end) | "D" daily
BLOCK_RULE = "ME"

# Return periods (years)
RETURN_PERIODS = [10, 20, 50, 100]

# Minimum number of blocks (EVT safety)
MIN_BLOCKS = 40 if BLOCK_RULE == "YE" else 50

# Whether to save per-city plots (can be heavy)
SAVE_PLOTS = True

# Optional: cap number of cities for quick testing (None = all)
MAX_CITIES = None

print("PROJECT_DIR:", PROJECT_DIR)
print("CITY_PARQUET_DIR:", CITY_PARQUET_DIR)
print("RESULTS_DIR:", RESULTS_DIR)
print("BLOCK_RULE:", BLOCK_RULE, " | MIN_BLOCKS:", MIN_BLOCKS)
print("VARS:", EVT_VARS)


PROJECT_DIR: /content/drive/MyDrive/extreme-climate-forecasting
CITY_PARQUET_DIR: /content/drive/MyDrive/extreme-climate-forecasting/data/_tmp_evt_city_sorted_final
RESULTS_DIR: /content/drive/MyDrive/extreme-climate-forecasting/results
BLOCK_RULE: ME  | MIN_BLOCKS: 50
VARS: ['temp']


## 2. Helpers

In [13]:
def list_cities_from_folder(city_parquet_dir: Path) -> list[str]:
    if not city_parquet_dir.exists():
        raise FileNotFoundError(f"City parquet folder not found: {city_parquet_dir}")
    cities = sorted([p.stem for p in city_parquet_dir.glob("*.parquet")])
    if len(cities) == 0:
        raise ValueError(f"No parquet files found in: {city_parquet_dir}")
    return cities


def normalize_block_rule(block_rule: str) -> tuple[str, str, int]:
    """Return (BLOCK_RULE, pandas_rule, blocks_per_year)."""
    _rule = str(block_rule).upper().strip()
    aliases = {"A": "YE", "Y": "YE", "M": "ME"}
    _rule = aliases.get(_rule, _rule)

    supported = {
        "YE": ("YE", 1),
        "ME": ("ME", 12),
        "D": ("D", 365),
    }
    if _rule not in supported:
        raise ValueError(f"Invalid BLOCK_RULE='{block_rule}'. Use one of {sorted(supported)}")

    pandas_rule, bpy = supported[_rule]
    return _rule, pandas_rule, bpy


def detect_datetime_column(df: pd.DataFrame) -> str:
    """Robust datetime-column detection (safe for categorical/object dtypes)."""
    candidates = [
        "fecha_local", "datetime_local", "timestamp_local", "date_local",
        "datetime", "timestamp", "date", "time"
    ]
    for c in candidates:
        if c in df.columns:
            return c

    for c in df.columns:
        try:
            if is_datetime64_any_dtype(df[c]):
                return c
        except Exception:
            pass

    raise ValueError(
        "No datetime column found. Add it to candidates or pass dt_col explicitly."
    )


def ensure_datetime(df: pd.DataFrame, dt_col: str) -> pd.DataFrame:
    if not is_datetime64_any_dtype(df[dt_col]):
        df = df.copy()
        df[dt_col] = pd.to_datetime(df[dt_col], errors="coerce")
    return df


def extract_block_maxima(df: pd.DataFrame, dt_col: str, var: str, pandas_rule: str) -> pd.Series:
    s = (
        df.dropna(subset=[dt_col, var])
          .set_index(dt_col)[var]
          .resample(pandas_rule)
          .max()
          .dropna()
    )
    return s


def fit_gev_mle(x: np.ndarray) -> dict:
    """Fit GEV via scipy (MLE). scipy uses c = -xi."""
    c, mu, sigma = genextreme.fit(x)
    xi = -c
    endpoint = None
    tail_type = "Gumbel"
    if xi < -0.05:
        tail_type = "Weibull"
        endpoint = mu - sigma / xi
    elif xi > 0.05:
        tail_type = "Frechet"

    return {
        "c": float(c),
        "xi": float(xi),
        "loc": float(mu),
        "scale": float(sigma),
        "tail_type": tail_type,
        "endpoint": None if endpoint is None else float(endpoint),
    }


def return_levels(params: dict, blocks_per_year: int, return_periods_years: list[int]) -> dict:
    c = params["c"]
    mu = params["loc"]
    sigma = params["scale"]
    out = {}
    for T in return_periods_years:
        T_blocks = blocks_per_year * float(T)
        rl = genextreme.ppf(1 - 1 / T_blocks, c, loc=mu, scale=sigma)
        out[f"RL{int(T)}"] = float(rl)
    return out


def save_diagnostics_plots(out_dir: Path, city: str, var: str, block_max: pd.Series, params: dict, blocks_per_year: int):
    out_dir.mkdir(parents=True, exist_ok=True)
    c = params["c"]
    mu = params["loc"]
    sigma = params["scale"]
    xi = params["xi"]

    # 1) Time series of block maxima
    plt.figure(figsize=(10, 4))
    plt.plot(block_max.index, block_max.values, linewidth=1)
    plt.title(f"Block maxima ({BLOCK_RULE}) — {city} — {var}")
    plt.xlabel("Date")
    plt.ylabel(var)
    plt.tight_layout()
    plt.savefig(out_dir / f"block_maxima_{city.lower().replace(' ','_')}_{var}.png", dpi=200)
    plt.close()

    # 2) QQ plot
    emp_q = np.sort(block_max.values)
    n = len(emp_q)
    p = (np.arange(1, n + 1) - 0.44) / (n + 0.12)
    theo_q = genextreme.ppf(p, c, loc=mu, scale=sigma)

    plt.figure(figsize=(5.5, 5.5))
    plt.scatter(theo_q, emp_q, s=18, alpha=0.8)
    mn = min(theo_q.min(), emp_q.min())
    mx = max(theo_q.max(), emp_q.max())
    plt.plot([mn, mx], [mn, mx], linestyle="--")
    plt.title(f"Q-Q — {city} — {var}\nxi={xi:.3f}, mu={mu:.2f}, sigma={sigma:.2f}")
    plt.xlabel("Theoretical quantiles (GEV)")
    plt.ylabel("Observed quantiles")
    plt.tight_layout()
    plt.savefig(out_dir / f"qq_{city.lower().replace(' ','_')}_{var}.png", dpi=200)
    plt.close()

    # 3) Return level curve (1..200 years)
    T_curve = np.logspace(0, 2.3, 250)  # 1..~200
    RL_curve = genextreme.ppf(1 - 1/(blocks_per_year*T_curve), c, loc=mu, scale=sigma)

    plt.figure(figsize=(7, 5))
    plt.plot(T_curve, RL_curve, linewidth=2)
    plt.xscale("log")
    plt.title(f"Return level — {city} — {var}")
    plt.xlabel("Return period (years, log)")
    plt.ylabel("Return level")
    plt.tight_layout()
    plt.savefig(out_dir / f"return_level_{city.lower().replace(' ','_')}_{var}.png", dpi=200)
    plt.close()


## 3. Load city list

In [14]:
BLOCK_RULE, PANDAS_RULE, BLOCKS_PER_YEAR = normalize_block_rule(BLOCK_RULE)

CITIES = list_cities_from_folder(CITY_PARQUET_DIR)
if MAX_CITIES is not None:
    CITIES = CITIES[: int(MAX_CITIES)]

print("Cities found:", len(CITIES))
print("First 10:", CITIES[:10])
print("Pandas rule:", PANDAS_RULE, "| Blocks/year:", BLOCKS_PER_YEAR)


Cities found: 16
First 10: ['Ambato', 'Cuenca', 'Esmeraldas', 'Guayaquil', 'Ibarra', 'Lago Agrio', 'Loja', 'Machala', 'Manta', 'Puerto Morona']
Pandas rule: ME | Blocks/year: 12


## 4. Batch run (GEV MLE)

In [15]:
results = []
errors = []

outer = tqdm(CITIES, desc="Cities", leave=True)
for city in outer:
    city_path = CITY_PARQUET_DIR / f"{city}.parquet"

    try:
        df_city = pd.read_parquet(city_path)
    except Exception as e:
        errors.append({"city": city, "var": None, "where": "read_parquet", "error": str(e)})
        continue

    # Detect datetime column robustly
    try:
        dt_col = detect_datetime_column(df_city)
    except Exception as e:
        errors.append({"city": city, "var": None, "where": "detect_datetime_column", "error": str(e)})
        continue

    df_city = ensure_datetime(df_city, dt_col)

    # per-variable loop
    inner = tqdm(EVT_VARS, desc=f"{city}: vars", leave=False)
    for var in inner:
        row = {
            "status": None,
            "city": city,
            "var": var,
            "block_rule": BLOCK_RULE,
            "n_rows": int(len(df_city)),
            "start_dt": None,
            "end_dt": None,
            "n_blocks": None,
            "dt_col": dt_col,
        }
        try:
            if var not in df_city.columns:
                row["status"] = "skip_missing_var"
                results.append(row)
                continue

            if not is_numeric_dtype(df_city[var]):
                row["status"] = "skip_non_numeric_var"
                results.append(row)
                continue

            bm = extract_block_maxima(df_city, dt_col, var, PANDAS_RULE)

            row["start_dt"] = str(bm.index.min().date()) if len(bm) else None
            row["end_dt"] = str(bm.index.max().date()) if len(bm) else None
            row["n_blocks"] = int(len(bm))

            if len(bm) < MIN_BLOCKS:
                row["status"] = f"skip_insufficient_blocks_{len(bm)}"
                results.append(row)
                continue

            params = fit_gev_mle(bm.values.astype(float))
            rls = return_levels(params, BLOCKS_PER_YEAR, RETURN_PERIODS)

            row.update({
                "status": "ok",
                "c": params["c"],
                "xi": params["xi"],
                "loc": params["loc"],
                "scale": params["scale"],
                "tail_type": params["tail_type"],
                "endpoint": params["endpoint"],
            })
            row.update(rls)
            results.append(row)

            if SAVE_PLOTS:
                out_dir = RESULTS_DIR / f"mle_{city.lower().replace(' ','_')}_{var}_{BLOCK_RULE.lower()}"
                save_diagnostics_plots(out_dir, city, var, bm, params, BLOCKS_PER_YEAR)

        except Exception as e:
            row["status"] = "error"
            results.append(row)
            errors.append({"city": city, "var": var, "where": "fit_loop", "error": str(e)})

summary_df = pd.DataFrame(results)
errors_df = pd.DataFrame(errors)

print("Done.")
print("Summary rows:", len(summary_df))
print("Errors rows:", len(errors_df))


Cities:   0%|          | 0/16 [00:00<?, ?it/s]

Ambato: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Cuenca: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Esmeraldas: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Guayaquil: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Ibarra: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Lago Agrio: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Loja: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Machala: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Manta: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Puerto Morona: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Puyo: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Quevedo: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Quito: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Santa Cruz Island: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Santo Domingo: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Zamora: vars:   0%|          | 0/1 [00:00<?, ?it/s]

Done.
Summary rows: 16
Errors rows: 0


## 5. Save outputs (robust schema + safe sorting)

In [16]:
# Robust schema even if results is empty
EXPECTED_COLS = (
    ["status", "city", "var", "block_rule", "dt_col",
     "n_rows", "start_dt", "end_dt", "n_blocks",
     "c", "xi", "loc", "scale", "tail_type", "endpoint"]
    + [f"RL{T}" for T in RETURN_PERIODS]
)

for col in EXPECTED_COLS:
    if col not in summary_df.columns:
        summary_df[col] = np.nan

# Put expected columns first
summary_df = summary_df[EXPECTED_COLS + [c for c in summary_df.columns if c not in EXPECTED_COLS]]

# Safe sort (only by columns that exist)
sort_cols = [c for c in ["status", "city", "var"] if c in summary_df.columns]
if sort_cols:
    summary_df = summary_df.sort_values(sort_cols).reset_index(drop=True)

OUT_SUMMARY_CSV = RESULTS_DIR / "gev_mle_summary.csv"
OUT_SUMMARY_PARQUET = RESULTS_DIR / "gev_mle_summary.parquet"

summary_df.to_csv(OUT_SUMMARY_CSV, index=False)
summary_df.to_parquet(OUT_SUMMARY_PARQUET, index=False)

print("Saved:")
print(" -", OUT_SUMMARY_CSV)
print(" -", OUT_SUMMARY_PARQUET)

if len(errors_df) > 0:
    OUT_ERRORS = RESULTS_DIR / "gev_mle_batch_errors.csv"
    errors_df.to_csv(OUT_ERRORS, index=False)
    print(" -", OUT_ERRORS)

summary_df.tail()

Saved:
 - /content/drive/MyDrive/extreme-climate-forecasting/results/gev_mle_summary.csv
 - /content/drive/MyDrive/extreme-climate-forecasting/results/gev_mle_summary.parquet


Unnamed: 0,status,city,var,block_rule,dt_col,n_rows,start_dt,end_dt,n_blocks,c,xi,loc,scale,tail_type,endpoint,RL10,RL20,RL50,RL100
11,ok,Quevedo,temp,ME,fecha_local,402121,1978-12-31,2024-10-31,551,0.176831,-0.176831,28.496177,1.630409,Weibull,37.71634,33.759063,34.216854,34.740977,35.084397
12,ok,Quito,temp,ME,fecha_local,411112,1978-12-31,2024-10-31,551,0.137318,-0.137318,19.629468,1.673382,Weibull,31.815637,25.497234,26.072548,26.752421,27.212385
13,ok,Santa Cruz Island,temp,ME,fecha_local,401496,1978-12-31,2024-10-31,551,0.084459,-0.084459,21.147642,1.793176,Weibull,42.378975,28.203905,29.012289,30.009018,30.712807
14,ok,Santo Domingo,temp,ME,fecha_local,403049,1978-12-31,2024-10-31,551,0.317828,-0.317828,27.166247,1.232858,Weibull,31.045263,30.197103,30.365256,30.537263,30.63776
15,ok,Zamora,temp,ME,fecha_local,401496,1978-12-31,2024-10-31,551,0.380479,-0.380479,31.818877,1.541278,Weibull,35.869769,35.213386,35.365949,35.514415,35.596836


## 6. Quick diagnostics

In [17]:
# Robust string ops for status
status_s = summary_df["status"].fillna("").astype(str)

ok = summary_df[summary_df["status"] == "ok"]
skipped = summary_df[status_s.str.startswith("skip_")]
err_rows = summary_df[summary_df["status"] == "error"]

print("OK fits:", len(ok))
print("Skipped:", len(skipped))
print("Error rows in summary:", len(err_rows))

if len(ok) > 0:
    display(ok.groupby("city").size().sort_values(ascending=False).head(10))
else:
    print("No OK fits yet. Check gev_mle_batch_errors.csv for root causes.")


OK fits: 16
Skipped: 0
Error rows in summary: 0


Unnamed: 0_level_0,0
city,Unnamed: 1_level_1
Ambato,1
Cuenca,1
Esmeraldas,1
Guayaquil,1
Ibarra,1
Lago Agrio,1
Loja,1
Machala,1
Manta,1
Puerto Morona,1


## 7. Inspect errors (top 20)

In [18]:
if len(errors_df) > 0:
    display(errors_df.head(20))
else:
    print("No errors recorded.")

No errors recorded.
