# Survival Analysis for Transformer Failure Prediction

From the [Sisyphean Gridworks ML Playground](https://sgridworks.com/ml-playground/guides/12-advanced-predictive-maintenance.html)

## Setup

Clone the repository and install dependencies. Run this cell first.

In [None]:
!git clone https://github.com/SGridworks/Dynamic-Network-Model.git 2>/dev/null || echo 'Already cloned'
%cd Dynamic-Network-Model
!pip install -q pandas numpy matplotlib seaborn scikit-learn xgboost lightgbm pyarrow

## Load Transformer and Event Data

We start by loading the same datasets from Guide 04, plus weather data that we will use later to build environmental covariates for the Cox model.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from lifelines import KaplanMeierFitter, CoxPHFitter
from lifelines.statistics import logrank_test
from lifelines.utils import median_survival_times
from demo_data.load_demo_data import load_transformers, load_outage_history, load_weather_data

# Load SP&L datasets
transformers = load_transformers().reset_index()
outages      = load_outage_history().reset_index()
weather      = load_weather_data()

print(f"Transformers:    {len(transformers):,}")
print(f"Outage events:   {len(outages)}")
print(f"Weather records: {len(weather):,}")

## Survival Analysis vs Classification

Before we write more code, let's understand why survival analysis is different from the binary classifier you built in Guide 04.

In [None]:
# Identify feeders that experienced equipment-failure outages
# Outages are at the feeder level; we use equipment_involved == "transformer"
# to flag feeders where a transformer failure was recorded.
equip_failures = outages[
    (outages["cause_code"] == "equipment failure") &
    (outages["equipment_involved"] == "transformer")
]

# Get the first transformer-failure date per feeder
first_failure = equip_failures.groupby("feeder_id").agg(
    first_failure_date=("fault_detected", "min")
).reset_index()

# Merge with transformer inventory (feeder-level join)
df = transformers.merge(first_failure, on="feeder_id", how="left")

# Event indicator: 1 = feeder had a transformer failure, 0 = censored
df["event_observed"] = df["first_failure_date"].notna().astype(int)

# Duration: use age_years directly (already in the dataset)
# For failed transformers, approximate time-to-failure from age at event
df["duration_years"] = df["age_years"].astype(float)

# Ensure positive durations
df["duration_years"] = df["duration_years"].clip(lower=0.5)

print(f"Total transformers:  {len(df):,}")
print(f"Failed (event=1):   {df['event_observed'].sum():,}")
print(f"Censored (event=0): {(df['event_observed'] == 0).sum():,}")
print(f"\nDuration range: {df['duration_years'].min():.1f} – {df['duration_years'].max():.1f} years")

## Prepare Survival Data

Survival analysis requires two columns for each subject: the duration (time from installation to failure or current date) and an event indicator (1 = failure observed, 0 = still running / censored).

In [None]:
# Fit overall Kaplan-Meier curve
kmf = KaplanMeierFitter()
kmf.fit(
    durations=df["duration_years"],
    event_observed=df["event_observed"],
    label="All Transformers"
)

fig, ax = plt.subplots(figsize=(10, 6))
kmf.plot_survival_function(ax=ax, color="#2D6A7A", linewidth=2)
ax.set_title("Kaplan–Meier Survival Curve: SP&L Transformer Fleet")
ax.set_xlabel("Years Since Installation")
ax.set_ylabel("Survival Probability")
ax.set_ylim(0, 1.05)
ax.axhline(y=0.5, color="gray", linestyle="--", alpha=0.5, label="50% survival")
ax.legend()
plt.tight_layout()
plt.show()

# Median survival time (50% of transformers have failed by this age)
median = kmf.median_survival_time_
print(f"Median survival time: {median:.1f} years")

## Kaplan–Meier Survival Curves

The Kaplan–Meier estimator is the foundation of survival analysis. It produces a non-parametric estimate of the survival function: the probability of surviving beyond a given time. No assumptions about the shape of the curve are needed.

In [None]:
# Split by manufacturer and plot separate KM curves
fig, ax = plt.subplots(figsize=(10, 6))
colors = ["#2D6A7A", "#5FCCDB", "#D69E2E", "#E53E3E"]

for i, (mfr, group) in enumerate(df.groupby("manufacturer")):
    kmf_mfr = KaplanMeierFitter()
    kmf_mfr.fit(
        durations=group["duration_years"],
        event_observed=group["event_observed"],
        label=mfr
    )
    kmf_mfr.plot_survival_function(
        ax=ax, color=colors[i % len(colors)], linewidth=2
    )

ax.set_title("Survival Curves by Manufacturer")
ax.set_xlabel("Years Since Installation")
ax.set_ylabel("Survival Probability")
ax.set_ylim(0, 1.05)
ax.legend(title="Manufacturer")
plt.tight_layout()
plt.show()

## Log-Rank Test: Are Differences Significant?

Eyeballing curves is useful, but the log-rank test gives a formal statistical answer to the question: "Do these two groups have significantly different survival experiences?"

In [None]:
# Create kVA rating groups
df["kva_group"] = pd.cut(
    df["kva_rating"],
    bins=[0, 50, 100, 500, 5000],
    labels=["Small (≤50)", "Medium (51-100)",
            "Large (101-500)", "Very Large (500+)"]
)

fig, ax = plt.subplots(figsize=(10, 6))

for i, (group_name, group) in enumerate(df.groupby("kva_group")):
    if len(group) 3:
        continue
    kmf_kva = KaplanMeierFitter()
    kmf_kva.fit(
        durations=group["duration_years"],
        event_observed=group["event_observed"],
        label=group_name
    )
    kmf_kva.plot_survival_function(
        ax=ax, color=colors[i % len(colors)], linewidth=2
    )

ax.set_title("Survival Curves by kVA Rating Group")
ax.set_xlabel("Years Since Installation")
ax.set_ylabel("Survival Probability")
ax.set_ylim(0, 1.05)
ax.legend(title="kVA Group")
plt.tight_layout()
plt.show()

## Cox Proportional Hazards Model

Kaplan–Meier curves compare groups, but they don't handle multiple continuous covariates simultaneously. The Cox Proportional Hazards (PH) model is the regression equivalent for survival data. It estimates how each covariate affects the hazard (failure rate) while controlling for the others.

In [None]:
# Compare two manufacturers statistically
manufacturers = df["manufacturer"].unique()
mfr_a = df[df["manufacturer"] == manufacturers[0]]
mfr_b = df[df["manufacturer"] == manufacturers[1]]

result = logrank_test(
    mfr_a["duration_years"], mfr_b["duration_years"],
    event_observed_A=mfr_a["event_observed"],
    event_observed_B=mfr_b["event_observed"]
)

print(f"Log-rank test: {manufacturers[0]} vs {manufacturers[1]}")
print(f"  Test statistic: {result.test_statistic:.3f}")
print(f"  p-value:        {result.p_value:.4f}")

if result.p_value 0.05:
    print("  → Statistically significant difference (p )
else:
    print("  → No statistically significant difference (p ≥ 0.05)")

## Interpret Hazard Ratios

The Cox model output includes hazard ratios for each covariate. A hazard ratio greater than 1 means the factor increases failure risk; less than 1 means it is protective.

In [None]:
# Run pairwise log-rank tests across all manufacturers
from itertools import combinations

print("Pairwise log-rank tests (all manufacturers):")
print(f"{'Group A':10} {'Significant?':>14}")
print("-" * 66)

for a, b in combinations(manufacturers, 2):
    grp_a = df[df["manufacturer"] == a]
    grp_b = df[df["manufacturer"] == b]
    res = logrank_test(
        grp_a["duration_years"], grp_b["duration_years"],
        event_observed_A=grp_a["event_observed"],
        event_observed_B=grp_b["event_observed"]
    )
    sig = "Yes" if res.p_value 0.05 else "No"
    print(f"{a:10.4f} {sig:>14}")

## Predict Individual Survival Curves

The Cox model can produce a personalized survival curve for each transformer, conditioned on its specific covariates. This gives you remaining useful life (RUL) estimates for every asset in the fleet.

In [None]:
# Engineer covariates for the Cox model

# Outage-derived features: count of equipment-failure outages per feeder
# This serves as a maintenance-need proxy (feeders with more failures
# likely have assets in worse condition).
equip_outage_counts = outages[outages["cause_code"] == "equipment failure"].groupby(
    "feeder_id"
).agg(
    feeder_failure_count=("cause_code", "count"),
    avg_outage_hours=("duration_hours", "mean")
).reset_index()
df = df.merge(equip_outage_counts, on="feeder_id", how="left")
df["feeder_failure_count"] = df["feeder_failure_count"].fillna(0)
df["avg_outage_hours"] = df["avg_outage_hours"].fillna(0)

# Weather exposure: extreme weather hours in the dataset year
# Extreme defined as: wind > 35 mph, temperature > 100 °F, or temperature < 50 °F
weather["extreme_hour"] = (
    (weather["wind_speed"] > 35) |
    (weather["temperature"] > 100) |
    (weather["temperature"] 50)
).astype(int)

# With a single year of weather data we compute one annual extreme-hour
# total and scale by each transformer's age to approximate lifetime exposure.
annual_extreme_hours = weather["extreme_hour"].sum()
# Cumulative extreme-weather exposure scales with asset age (in thousands of hours
# for numerical stability). Older transformers accumulate more weather stress.
df["avg_extreme_weather_hrs"] = annual_extreme_hours * df["age_years"] / 1000.0

# Encode phase as numeric (A=0, B=1, C=2, ABC=3)
phase_map = {p: i for i, p in enumerate(df["phase"].unique())}
df["phase_code"] = df["phase"].map(phase_map).fillna(0)

print("Covariates prepared.")
print(df[["transformer_id", "age_years", "kva_rating",
          "feeder_failure_count", "avg_outage_hours",
          "duration_years", "event_observed"]].head())

## Build a Replacement Priority Ranking

Survival analysis tells you when a transformer is likely to fail. But not all failures have equal consequences. A transformer serving 200 customers matters more than one serving 10. Combining failure risk with consequence produces an actionable replacement schedule.

In [None]:
# Fit the Cox Proportional Hazards model
cox_cols = [
    "duration_years", "event_observed",
    "age_years", "kva_rating", "phase_code",
    "feeder_failure_count", "avg_outage_hours",
    "avg_extreme_weather_hrs"
]

cox_df = df[cox_cols].dropna()

# Standardize large-range columns for numerical stability
cox_df["kva_rating_scaled"] = cox_df["kva_rating"] / 100
cox_df = cox_df.drop(columns=["kva_rating"])

cph = CoxPHFitter(penalizer=0.1)
cph.fit(
    cox_df,
    duration_col="duration_years",
    event_col="event_observed"
)

cph.print_summary()

# Concordance index: the model's ability to correctly rank transformers
# by failure time. 0.5 = random guessing, 1.0 = perfect discrimination
print(f"\nConcordance index: {cph.concordance_index_:.3f}")

## Weather-Adjusted Seasonal Risk

Transformer failure risk is not constant throughout the year. Extreme heat accelerates oil degradation, and storm seasons increase mechanical stress. Let's create a seasonal risk adjustment using the weather data.

In [None]:
# Extract and visualize hazard ratios
summary = cph.summary

fig, ax = plt.subplots(figsize=(9, 5))

# Plot hazard ratios with confidence intervals
hr = summary["exp(coef)"]
hr_lower = summary["exp(coef) lower 95%"]
hr_upper = summary["exp(coef) upper 95%"]

y_pos = range(len(hr))
ax.barh(y_pos, hr.values, color="#5FCCDB", height=0.6, alpha=0.8)
ax.errorbar(hr.values, y_pos,
           xerr=[hr.values - hr_lower.values, hr_upper.values - hr.values],
           fmt="none", color="#2D6A7A", capsize=4)
ax.axvline(x=1.0, color="red", linestyle="--", alpha=0.7, label="HR = 1 (no effect)")
ax.set_yticks(y_pos)
ax.set_yticklabels(hr.index)
ax.set_xlabel("Hazard Ratio (95% CI)")
ax.set_title("Cox PH Hazard Ratios: What Accelerates Transformer Failure?")
ax.legend()
plt.tight_layout()
plt.show()

# Print interpretation
print("\nHazard ratio interpretation:")
for covariate in hr.index:
    ratio = hr[covariate]
    p_val = summary.loc[covariate, "p"]
    if ratio > 1:
        direction = "increases"
        pct = (ratio - 1) * 100
    else:
        direction = "decreases"
        pct = (1 - ratio) * 100
    sig = "*" if p_val 0.05 else ""
    print(f"  {covariate}: HR={ratio:.3f} → {direction} hazard by {pct:.1f}% {sig}")

## What You Built and Next Steps

In [None]:
# Predict survival curves for all transformers
surv_funcs = cph.predict_survival_function(cox_df)

# Plot survival curves for 5 specific transformers
sample_ids = df["transformer_id"].iloc[:5].values
fig, ax = plt.subplots(figsize=(10, 6))

for i, tid in enumerate(sample_ids):
    idx = df[df["transformer_id"] == tid].index[0]
    if idx in surv_funcs.columns:
        ax.plot(surv_funcs.index, surv_funcs[idx],
               label=f"{tid} (age={df.loc[idx, 'duration_years']:.0f}yr, {df.loc[idx, 'kva_rating']}kVA)",
               linewidth=2)

ax.set_title("Individual Predicted Survival Curves")
ax.set_xlabel("Years Since Installation")
ax.set_ylabel("Survival Probability")
ax.set_ylim(0, 1.05)
ax.legend(title="Transformer", fontsize=9)
plt.tight_layout()
plt.show()

In [None]:
# Estimate remaining useful life (RUL) for each transformer
# RUL = median predicted survival time - current age

median_survival = cph.predict_median(cox_df)

df["predicted_median_life"] = median_survival.values
df["remaining_useful_life"] = df["predicted_median_life"] - df["duration_years"]
df["remaining_useful_life"] = df["remaining_useful_life"].clip(lower=0)

print("Remaining Useful Life Estimates (top 15 most urgent):")
rul_table = df.sort_values("remaining_useful_life")[[
    "transformer_id", "duration_years", "kva_rating",
    "predicted_median_life", "remaining_useful_life"
]].head(15)
print(rul_table.to_string(index=False, float_format="%.1f"))

In [None]:
# Current failure hazard (predicted partial hazard from Cox model)
df["hazard_score"] = cph.predict_partial_hazard(cox_df).values

# Consequence score: kva_rating as a proxy for customers affected
# Normalize both to 0-1 scale
df["hazard_norm"] = (
    (df["hazard_score"] - df["hazard_score"].min()) /
    (df["hazard_score"].max() - df["hazard_score"].min())
)
df["consequence_norm"] = (
    (df["kva_rating"] - df["kva_rating"].min()) /
    (df["kva_rating"].max() - df["kva_rating"].min())
)

# Risk priority score = failure likelihood × consequence
df["priority_score"] = df["hazard_norm"] * 0.6 + df["consequence_norm"] * 0.4

# Rank by priority (highest score = replace first)
priority_list = df.sort_values("priority_score", ascending=False)

print("Top 10 Replacement Priorities:")
print(priority_list[[
    "transformer_id", "duration_years", "age_years",
    "kva_rating", "remaining_useful_life", "priority_score"
]].head(10).to_string(index=False, float_format="%.2f"))

In [None]:
# Visualize: risk vs consequence scatter plot
fig, ax = plt.subplots(figsize=(10, 7))

scatter = ax.scatter(
    df["hazard_norm"], df["consequence_norm"],
    c=df["priority_score"], cmap="YlOrRd",
    s=80, edgecolors="white", linewidth=0.5, alpha=0.9
)
plt.colorbar(scatter, label="Priority Score")

# Label the top 5 highest-priority transformers
top5 = priority_list.head(5)
for _, row in top5.iterrows():
    ax.annotate(row["transformer_id"],
                (row["hazard_norm"], row["consequence_norm"]),
                fontsize=8, fontweight="bold",
                xytext=(5, 5), textcoords="offset points")

ax.set_xlabel("Failure Likelihood (normalized hazard)")
ax.set_ylabel("Consequence (normalized kva_rating)")
ax.set_title("Replacement Priority Matrix: Risk × Consequence")
ax.axvline(x=0.5, color="gray", linestyle="--", alpha=0.3)
ax.axhline(y=0.5, color="gray", linestyle="--", alpha=0.3)

# Quadrant labels
ax.text(0.75, 0.75, "REPLACE NOW", ha="center", fontsize=11,
       fontweight="bold", color="#E53E3E", alpha=0.7)
ax.text(0.25, 0.75, "MONITOR", ha="center", fontsize=11,
       fontweight="bold", color="#D69E2E", alpha=0.7)
ax.text(0.75, 0.25, "SCHEDULE", ha="center", fontsize=11,
       fontweight="bold", color="#D69E2E", alpha=0.7)
ax.text(0.25, 0.25, "LOW PRIORITY", ha="center", fontsize=11,
       fontweight="bold", color="#718096", alpha=0.7)

plt.tight_layout()
plt.show()

In [None]:
# Monthly extreme weather frequency
weather["month"] = weather["timestamp"].dt.month
monthly_extreme = weather.groupby("month")["extreme_hour"].mean()

# Create seasonal risk multiplier (normalized around 1.0)
seasonal_multiplier = monthly_extreme / monthly_extreme.mean()

fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Plot 1: Seasonal risk multiplier
months = ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
          "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
bar_colors = ["#E53E3E" if v > 1.2 else "#D69E2E" if v > 1.0
              else "#5FCCDB" for v in seasonal_multiplier.values]

axes[0].bar(months, seasonal_multiplier.values, color=bar_colors)
axes[0].axhline(y=1.0, color="gray", linestyle="--", alpha=0.5)
axes[0].set_title("Seasonal Risk Multiplier")
axes[0].set_ylabel("Risk Multiplier (1.0 = average)")
axes[0].tick_params(axis="x", rotation=45)

# Plot 2: Adjusted priority scores for top 10 in peak month
peak_month = seasonal_multiplier.idxmax()
peak_mult = seasonal_multiplier.max()

top10 = priority_list.head(10).copy()
top10["adjusted_score"] = top10["priority_score"] * peak_mult

y_pos = range(len(top10))
axes[1].barh(y_pos, top10["priority_score"], height=0.4,
            color="#5FCCDB", label="Base Priority")
axes[1].barh([y + 0.4 for y in y_pos], top10["adjusted_score"],
            height=0.4, color="#E53E3E", label=f"Peak Month (×{peak_mult:.2f})")
axes[1].set_yticks([y + 0.2 for y in y_pos])
axes[1].set_yticklabels(top10["transformer_id"].values)
axes[1].set_xlabel("Priority Score")
axes[1].set_title("Top 10: Base vs Peak-Season Priority")
axes[1].legend()

plt.tight_layout()
plt.show()

print(f"\nPeak risk month: {months[peak_month - 1]} (multiplier: {peak_mult:.2f}×)")
print("Use seasonal multipliers to time capital replacement projects")
print("before high-risk seasons, not after.")

In [None]:
# Save the fitted Cox model (lifelines models are picklable)
import pickle
with open("cox_model.pkl", "wb") as f:
    pickle.dump(cph, f)

# Load it back
with open("cox_model.pkl", "rb") as f:
    cph = pickle.load(f)