
# Business Analyses: Reliability, Load, and Impact

Statistical playbook for the EV reliability dataset. Set `DATABASE_URL` in your env (Streamlit Cloud secret) to query the hosted DB (Neon/Timescale). Each section is runnable on its own; queries are scoped to recent windows to stay lightweight.


In [None]:

import os
import numpy as np
import pandas as pd
from sqlalchemy import create_engine, text
from statsmodels.stats.proportion import proportions_ztest
import statsmodels.api as sm

DATABASE_URL = os.getenv(
    "DATABASE_URL",
    "postgresql://neondb_owner:npg_98OQEAezCaKH@ep-round-moon-ah39km2x-pooler.c-3.us-east-1.aws.neon.tech/neondb?sslmode=require&channel_binding=require",
)
engine = create_engine(DATABASE_URL)

def fetch_df(sql, params=None):
    return pd.read_sql(text(sql), engine, params=params or {})

FAULT_STATES = ("FAULTED", "OFFLINE")



## Reliability uplift (before/after or A/B)
Compare fault rates across two windows and compute confidence intervals and a two-proportion z-test. Swap `window_before`/`window_after` to represent pre/post maintenance or A/B cohorts.


In [None]:

from datetime import timedelta, datetime, timezone

def window_counts(days: int, end=None):
    end = end or datetime.now(timezone.utc)
    start = end - timedelta(days=days)
    sql = """
    SELECT
        SUM(CASE WHEN status IN :faults THEN 1 ELSE 0 END) AS faults,
        COUNT(*) AS samples
    FROM charger_status
    WHERE time BETWEEN :start AND :end
    """
    row = fetch_df(sql, {"faults": FAULT_STATES, "start": start, "end": end}).iloc[0]
    return row.faults, row.samples

def compare_windows(window_before=7, window_after=7):
    now = datetime.now(timezone.utc)
    faults_after, n_after = window_counts(window_after, end=now)
    faults_before, n_before = window_counts(window_before, end=now - timedelta(seconds=1))
    rates = {
        "before": faults_before / n_before if n_before else np.nan,
        "after": faults_after / n_after if n_after else np.nan,
    }
    stat, p = proportions_ztest([faults_after, faults_before], [n_after, n_before])
    return {
        "counts": {
            "before_faults": faults_before,
            "before_samples": n_before,
            "after_faults": faults_after,
            "after_samples": n_after,
        },
        "rates": rates,
        "z_stat": float(stat),
        "p_value": float(p),
    }

compare_windows()



## Load vs faults (regression)
Model whether higher utilization increases fault odds. We aggregate hourly by charger, join session counts, and fit a logistic regression on a fault flag.


In [None]:

# Hourly aggregation by charger (status + session load)
hourly_sql = """
WITH status_hour AS (
    SELECT date_trunc('hour', cs.time) AS hour,
           cs.charger_id,
           SUM(CASE WHEN cs.status IN :faults THEN 1 ELSE 0 END) AS fault_samples,
           COUNT(*) AS samples
    FROM charger_status cs
    WHERE cs.time >= now() - interval '14 days'
    GROUP BY 1,2
), sessions_hour AS (
    SELECT date_trunc('hour', start_time) AS hour,
           charger_id,
           COUNT(*) AS sessions,
           SUM(CASE WHEN success = false THEN 1 ELSE 0 END) AS failed_sessions
    FROM charging_sessions
    WHERE start_time >= now() - interval '14 days'
    GROUP BY 1,2
)
SELECT sh.hour, sh.charger_id,
       COALESCE(s.sessions, 0) AS sessions,
       COALESCE(s.failed_sessions, 0) AS failed_sessions,
       sh.fault_samples,
       sh.samples
FROM status_hour sh
LEFT JOIN sessions_hour s ON sh.hour = s.hour AND sh.charger_id = s.charger_id
"""
load_df = fetch_df(hourly_sql, {"faults": FAULT_STATES})
load_df["fault_flag"] = (load_df["fault_samples"] > 0).astype(int)
load_df["log_sessions"] = np.log1p(load_df["sessions"])
load_df.head()

# Logistic regression: fault_flag ~ log_sessions
X = sm.add_constant(load_df[["log_sessions"]])
model = sm.Logit(load_df["fault_flag"], X).fit(disp=False)
print(model.summary())
print("Odds ratio for sessions:", np.exp(model.params["log_sessions"]))



## Hotspot detection (control chart style)
Identify chargers whose fault rate over 14 days is statistically high (z-score vs portfolio average).


In [None]:

hot_sql = """
SELECT cs.charger_id,
       c.external_id,
       s.name AS site_name,
       SUM(CASE WHEN cs.status IN :faults THEN 1 ELSE 0 END)::float / COUNT(*) AS fault_rate,
       COUNT(*) AS samples
FROM charger_status cs
JOIN chargers c ON cs.charger_id = c.charger_id
JOIN sites s ON c.site_id = s.site_id
WHERE cs.time >= now() - interval '14 days'
GROUP BY cs.charger_id, c.external_id, s.name
HAVING COUNT(*) > 100
"""
hot = fetch_df(hot_sql, {"faults": FAULT_STATES})
global_mean = hot.fault_rate.mean()
global_std = hot.fault_rate.std(ddof=1)
hot["z"] = (hot.fault_rate - global_mean) / (global_std if global_std else 1)
hotspots = hot[hot["z"] > 2].sort_values("z", ascending=False)
hotspots.head(10)



## MTBF / MTTR (survival-style)
Estimate mean time between failures (MTBF) and mean time to repair (MTTR) using status transitions for a subset of chargers.


In [None]:

status_sql = """
SELECT time, charger_id, status
FROM charger_status
WHERE time >= now() - interval '30 days'
ORDER BY charger_id, time
"""
status_df = fetch_df(status_sql)

results = []
for cid, group in status_df.groupby("charger_id"):
    g = group.copy()
    g["is_fault"] = g.status.isin(FAULT_STATES)
    g["prev_is_fault"] = g["is_fault"].shift(fill_value=False)
    fault_starts = g[(~g["prev_is_fault"]) & (g["is_fault"])]
    fault_ends = g[(g["prev_is_fault"]) & (~g["is_fault"])]
    mtbf_hours = None
    if not fault_starts.empty:
        mtbf_deltas = fault_starts["time"].diff().dropna().dt.total_seconds() / 3600
        if not mtbf_deltas.empty:
            mtbf_hours = mtbf_deltas.mean()
    mttr_hours = None
    if not fault_ends.empty:
        durations = (fault_ends["time"] - fault_starts["time"].reindex(fault_ends.index)).dt.total_seconds() / 3600
        durations = durations.dropna()
        if not durations.empty:
            mttr_hours = durations.mean()
    if mtbf_hours is not None or mttr_hours is not None:
        results.append({"charger_id": cid, "mtbf_hours": mtbf_hours, "mttr_hours": mttr_hours})
mtbf_mttr = pd.DataFrame(results).dropna(how="all", subset=["mtbf_hours", "mttr_hours"])
mtbf_mttr.describe()[["mtbf_hours", "mttr_hours"]]



## Capacity planning
Estimate how adding capacity reduces load per charger and faults. Simple heuristic: scale fault odds by the regression coefficient from load vs faults.


In [None]:

site_load_sql = """
SELECT s.site_id,
       s.name AS site_name,
       SUM(CASE WHEN cs.status IN :faults THEN 1 ELSE 0 END) AS fault_samples,
       COUNT(*) AS status_samples,
       COUNT(DISTINCT cs.charger_id) AS chargers,
       COALESCE(SUM(sess.sessions_per_day), 0) AS sessions_per_day
FROM charger_status cs
JOIN chargers c ON cs.charger_id = c.charger_id
JOIN sites s ON c.site_id = s.site_id
LEFT JOIN (
    SELECT site_id, date_trunc('day', start_time) AS day, COUNT(*) AS sessions_per_day
    FROM charging_sessions
    WHERE start_time >= now() - interval '14 days'
    GROUP BY 1,2
) sess ON sess.site_id = s.site_id
WHERE cs.time >= now() - interval '14 days'
GROUP BY s.site_id, s.name
"""
site_load = fetch_df(site_load_sql, {"faults": FAULT_STATES})
site_load["fault_rate"] = site_load["fault_samples"] / site_load["status_samples"]
coef = model.params.get("log_sessions", 0) if 'model' in globals() else 0
site_load["sessions_per_charger"] = site_load["sessions_per_day"] / site_load["chargers"].clip(lower=1)
site_load["sessions_per_charger_plus1"] = site_load["sessions_per_day"] / (site_load["chargers"] + 1)
site_load["delta_log_sessions"] = np.log1p(site_load["sessions_per_charger_plus1"]) - np.log1p(site_load["sessions_per_charger"])
site_load["expected_fault_odds_change_pct"] = np.expm1(coef * site_load["delta_log_sessions"]) * 100
site_load.sort_values("expected_fault_odds_change_pct").head(10)



## Financial impact (lost minutes) with bootstrap CI
Estimate lost revenue using failed session minutes. Adjust `revenue_per_kwh` to match business assumptions.


In [None]:

impact_sql = """
SELECT
    COUNT(*) FILTER (WHERE success = false) AS failed_sessions,
    SUM(duration_minutes) FILTER (WHERE success = false) AS lost_minutes,
    SUM(energy_kwh) FILTER (WHERE success = false) AS lost_kwh
FROM charging_sessions
WHERE start_time >= now() - interval '30 days'
"""
impact = fetch_df(impact_sql).iloc[0]
revenue_per_kwh = 0.35
samples = 5000
lost_kwh = impact.lost_kwh or 0
boot = np.random.gamma(shape=lost_kwh + 1, scale=revenue_per_kwh, size=samples)
ci = np.percentile(boot, [2.5, 50, 97.5])
{
    "failed_sessions": int(impact.failed_sessions or 0),
    "lost_minutes": float(impact.lost_minutes or 0),
    "lost_revenue_ci": ci,
}



## Seasonality / forecasting
Get daily fault counts and fit a quick seasonal ARIMA. Set `run_fit=True` to execute.


In [None]:

from statsmodels.tsa.statespace.sarimax import SARIMAX

daily_sql = """
SELECT date_trunc('day', time) AS day,
       SUM(CASE WHEN status IN :faults THEN 1 ELSE 0 END) AS faults,
       COUNT(*) AS samples
FROM charger_status
GROUP BY 1
ORDER BY 1
"""
daily = fetch_df(daily_sql, {"faults": FAULT_STATES}).set_index("day")
run_fit = False
if run_fit and len(daily) > 7:
    model = SARIMAX(daily["faults"], order=(1,0,1), seasonal_order=(1,0,1,7)).fit(disp=False)
    forecast = model.forecast(14)
    print(forecast.head())
else:
    daily.tail()



## Root cause hints (logistic regression)
Model fault probability using charger metadata and time-of-day/temperature.


In [None]:

sample_sql = """
SELECT cs.time,
       cs.charger_id,
       cs.status,
       cs.temperature_celsius,
       c.model,
       c.connector_type
FROM charger_status cs
JOIN chargers c ON cs.charger_id = c.charger_id
WHERE cs.time >= now() - interval '7 days'
LIMIT 50000
"""
sample = fetch_df(sample_sql)
sample["fault_flag"] = sample.status.isin(FAULT_STATES).astype(int)
sample["hour"] = sample.time.dt.hour
sample["temp"] = sample.temperature_celsius.fillna(sample.temperature_celsius.median())
cat = pd.get_dummies(sample[["model", "connector_type", "hour"]].astype(str), drop_first=True)
X = sm.add_constant(pd.concat([cat, sample[["temp"]]], axis=1))
logit = sm.Logit(sample["fault_flag"], X).fit(disp=False)
logit.summary()



## Alert tuning (precision/recall on fault-rate thresholds)
Treat hours with any fault as positives and test a threshold on recent fault rate per charger-hour.


In [None]:

alert_sql = """
WITH status_hour AS (
    SELECT date_trunc('hour', cs.time) AS hour,
           cs.charger_id,
           SUM(CASE WHEN cs.status IN :faults THEN 1 ELSE 0 END)::float / COUNT(*) AS fault_rate,
           SUM(CASE WHEN cs.status IN :faults THEN 1 ELSE 0 END) AS faults
    FROM charger_status cs
    WHERE cs.time >= now() - interval '7 days'
    GROUP BY 1,2
)
SELECT * FROM status_hour
"""
alert_df = fetch_df(alert_sql, {"faults": FAULT_STATES})
threshold = 0.05
alert_df["pred"] = (alert_df["fault_rate"] >= threshold).astype(int)
alert_df["actual"] = (alert_df["faults"] > 0).astype(int)
precision = (alert_df.query("pred == 1 and actual == 1").shape[0] / max(alert_df.query("pred == 1").shape[0], 1))
recall = (alert_df.query("actual == 1 and pred == 1").shape[0] / max(alert_df.query("actual == 1").shape[0], 1))
{
    "threshold": threshold,
    "precision": precision,
    "recall": recall,
}



## Rollout validation (difference-in-differences)
Template: pick treated sites and an intervention date; compare fault rates vs control sites.


In [None]:

treated_sites = [1, 2, 3]  # replace with real site_ids that received a change
intervention = "2024-01-15"

did_sql = """
WITH daily AS (
    SELECT date_trunc('day', cs.time) AS day,
           c.site_id,
           SUM(CASE WHEN cs.status IN :faults THEN 1 ELSE 0 END)::float / COUNT(*) AS fault_rate
    FROM charger_status cs
    JOIN chargers c ON cs.charger_id = c.charger_id
    GROUP BY 1, c.site_id
)
SELECT day,
       site_id,
       fault_rate,
       CASE WHEN site_id = ANY(:treated) THEN 1 ELSE 0 END AS treated,
       CASE WHEN day >= :intervention::date THEN 1 ELSE 0 END AS post
FROM daily
"""
did = fetch_df(did_sql, {"faults": FAULT_STATES, "treated": treated_sites, "intervention": intervention})
did["treated_post"] = did["treated"] * did["post"]
X = sm.add_constant(did[["treated", "post", "treated_post"]])
y = did["fault_rate"]
did_model = sm.OLS(y, X).fit()
did_model.summary()
