# CNAIM PoF validation – Phase 1

This notebook contains a clean, reproducible workflow:
1) Setup  2) Data loading  3) Asset-years exposure  4) Weibull  5) Kaplan–Meier  6) CNAIM calibration  7) ML risk ranking


In [None]:
import sys
from pathlib import Path
import pandas as pd
import matplotlib.pyplot as plt

# Ensure project root is on sys.path so `import src...` works
sys.path.append(str(Path("..").resolve()))

DATA_RAW = Path("../data/raw")
DATA_PROCESSED = Path("../data/processed")
DATA_PROCESSED.mkdir(parents=True, exist_ok=True)


## 1) Data loading + validation

In [None]:
from src.io import load_assets, load_failures
from src.quality import validate_assets, validate_failures

assets = load_assets(str(DATA_RAW / "assets.csv"))
failures = load_failures(str(DATA_RAW / "failures.csv"))

validate_assets(assets)
validate_failures(failures)

assets.head()


In [None]:
failures.head()

## 2) Asset-years exposure table

In [None]:
from src.exposure import build_asset_years

END_YEAR = 2026
asset_years = build_asset_years(assets, failures, end_year=END_YEAR)

asset_years.head()


In [None]:
asset_years.shape

In [None]:
# Save locally (do not commit processed data)
asset_years.to_csv(DATA_PROCESSED / "asset_years.csv", index=False)


## 3) CNAIM PoF vs observed failure rate (by asset type)

In [None]:
observed = (
    asset_years
    .groupby("asset_type")
    .agg(
        observed_failures=("failures_in_year", "sum"),
        exposure=("exposure_years", "sum"),
        cnaim_pof=("cnaim_pof", "mean"),
    )
)

observed["observed_rate"] = observed["observed_failures"] / observed["exposure"]
observed


In [None]:
# Visual check (ideal = diagonal)
plt.figure()
plt.scatter(observed["cnaim_pof"], observed["observed_rate"])
m = max(observed["cnaim_pof"].max(), observed["observed_rate"].max())
plt.plot([0, m], [0, m])
plt.xlabel("CNAIM PoF")
plt.ylabel("Observed failure rate")
plt.title("CNAIM vs Observed (by asset type)")
plt.show()


## 4) Weibull reliability modelling (failure ages)

In [None]:
import numpy as np
from scipy.stats import weibull_min

# Fit Weibull on ages at which a failure occurred (Phase 1 simplification)
failure_ages = asset_years.loc[asset_years["failures_in_year"] > 0, "age"]

beta, loc, eta = weibull_min.fit(failure_ages, floc=0)
beta, eta


In [None]:
# Hazard function h(t) for interpretation
t = np.linspace(0, max(1, asset_years["age"].max()), 200)
hazard = (beta/eta) * (t/eta)**(beta-1)

plt.figure()
plt.plot(t, hazard)
plt.xlabel("Age (years)")
plt.ylabel("Hazard (relative)")
plt.title("Weibull hazard function")
plt.show()


## 5) Kaplan–Meier survival analysis (censored assets)

In [None]:
from lifelines import KaplanMeierFitter

failures["failure_date"] = pd.to_datetime(failures["failure_date"], errors="coerce")

first_failure = (
    failures.dropna(subset=["failure_date"])
            .sort_values("failure_date")
            .groupby("asset_id", as_index=False)
            .first()[["asset_id", "failure_date"]]
)

survival = assets.merge(first_failure, on="asset_id", how="left")

CURRENT_YEAR = 2026
survival["duration"] = survival["failure_date"].dt.year.fillna(CURRENT_YEAR) - survival["installation_year"]
survival["event"] = survival["failure_date"].notna().astype(int)

survival.head()


In [None]:
kmf = KaplanMeierFitter()
kmf.fit(durations=survival["duration"], event_observed=survival["event"])

plt.figure()
kmf.plot_survival_function()
plt.title("Kaplan–Meier survival curve")
plt.xlabel("Age (years)")
plt.ylabel("Survival probability")
plt.show()

kmf.median_survival_time_


## 6) CNAIM calibration

In [None]:
calib = observed.copy()
calib["calibration_factor"] = calib["observed_rate"] / calib["cnaim_pof"]
calib


In [None]:
# Global calibration factor (simple baseline)
global_factor = (calib["observed_failures"].sum() / calib["exposure"].sum()) / calib["cnaim_pof"].mean()
global_factor


In [None]:
asset_years["calibrated_pof"] = asset_years["cnaim_pof"] * global_factor
asset_years[["asset_id","year","asset_type","cnaim_pof","calibrated_pof"]].head()


## 7) Risk ranking (PoF × consequence × age)

In [None]:
# Consequence proxy (replace with real CoF when available)
consequence_map = {"transformer": 5, "cable": 3, "switch": 2}
asset_years["consequence"] = asset_years["asset_type"].map(consequence_map).fillna(1)

asset_years["risk_score"] = asset_years["calibrated_pof"] * asset_years["consequence"] * asset_years["age"]

latest = (
    asset_years.sort_values(["asset_id", "year"])
              .groupby("asset_id", as_index=False)
              .tail(1)
)

risk_ranking = latest.sort_values("risk_score", ascending=False)
risk_ranking.head(10)


In [None]:
top = risk_ranking.head(10)
plt.figure()
plt.bar(top["asset_id"].astype(str), top["risk_score"])
plt.xticks(rotation=45)
plt.ylabel("Risk score")
plt.title("Top risk assets (latest year)")
plt.show()


## 8) Predictive model (failure next year)

Note: failures are rare → use time-aware splits and imbalance-aware metrics.

In [None]:
# Target: failure next year (binary)
asset_years = asset_years.sort_values(["asset_id", "year"])
asset_years["failure_next_year"] = (
    asset_years.groupby("asset_id")["failures_in_year"]
              .shift(-1)
              .fillna(0)
              .astype(int)
)

asset_years["failure_next_year"].value_counts()


In [None]:
# Find a cutoff year so both train and test contain at least one positive example
def find_cutoff(df, min_pos_train=1, min_pos_test=1):
    years = sorted(df["year"].unique())
    for cut in years[1:]:
        tr = df[df["year"] < cut]
        te = df[df["year"] >= cut]
        if tr["failure_next_year"].sum() >= min_pos_train and te["failure_next_year"].sum() >= min_pos_test:
            return cut
    return None

cutoff = find_cutoff(asset_years, 1, 1)
cutoff


In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score, precision_recall_curve, average_precision_score
import numpy as np

# Features (keep simple + robust)
asset_years["age_squared"] = asset_years["age"]**2

features = ["age", "age_squared", "cnaim_pof", "calibrated_pof", "consequence"]

train = asset_years[asset_years["year"] < cutoff].copy()
test  = asset_years[asset_years["year"] >= cutoff].copy()

X_train, y_train = train[features], train["failure_next_year"]
X_test,  y_test  = test[features],  test["failure_next_year"]

model = LogisticRegression(max_iter=1000, class_weight="balanced")
model.fit(X_train, y_train)

y_prob = model.predict_proba(X_test)[:, 1]

# AUC requires both classes in y_test
auc = roc_auc_score(y_test, y_prob)
ap  = average_precision_score(y_test, y_prob)

auc, ap


In [None]:
# Precision–Recall curve (more informative under heavy class imbalance)
precision, recall, _ = precision_recall_curve(y_test, y_prob)

plt.figure()
plt.plot(recall, precision)
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision–Recall curve (test)")
plt.show()


In [None]:
# ML-based ranking for the test period
test["ml_failure_risk"] = y_prob
test.sort_values("ml_failure_risk", ascending=False).head(10)
