<a href="https://colab.research.google.com/github/bbanzai88/Data-Science-Repository/blob/main/syndromic_surveillance_colab_autoclamp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Syndromic Surveillance — Colab (Auto-Clamp Weeks/Days, NHSN + NWSS)

This notebook is preconfigured to use **CDC NHSN weekly hospital admissions (COVID)** via Delphi Epidata (by state) and loads **CDC NWSS wastewater** data. It includes a helper that **auto-detects** whether your request is **weekly (epiweeks)** or **daily (YYYYMMDD)** and clamps the date range to an available window based on metadata.

**What it does**
1) Fetches NHSN weekly admissions (COVID) for CA/NY/TX
2) Auto-clamps time range to the last N weeks/days that *exist* for your signal
3) Runs a simple EARS detector with Benjamini–Hochberg FDR control
4) Loads NWSS wastewater data and summarizes to weekly by state
5) Plots trends and alerts; exports CSVs

> *Note:* Alerts are statistical signals, not clinical diagnoses. Validate with multiple sources and tune thresholds for production use.

In [None]:
#@title 0) Environment check (Colab-friendly)
import sys, platform
IN_COLAB = False
try:
    import google.colab  # type: ignore
    IN_COLAB = True
except Exception:
    IN_COLAB = False
print({"python": sys.version.split()[0], "platform": platform.platform(), "IN_COLAB": IN_COLAB})

In [None]:
#@title 1) Install dependencies (avoid changing Colab's pandas/requests)
!pip -q install -U delphi-epidata jupyter-dash duckdb python-dateutil tqdm rpy2 statsmodels plotly scipy epiweeks

In [None]:
#@title 2) Imports & configuration (weekly NHSN defaults)
import pandas as pd, numpy as np
from datetime import datetime
from dateutil import tz
from tqdm import tqdm
import duckdb
from delphi_epidata import Epidata
import plotly.express as px
from jupyter_dash import JupyterDash
from dash import Dash, dcc, html, dash_table

CONFIG = {
    "source": "nhsn",
    "signal": "confirmed_admissions_covid_ew",
    "time_type": "week",
    "geo_type": "state",
    "geo_values": ["ca", "ny", "tx"],
    "start": 20210101,
    "end":   int(datetime.now().strftime('%Y%m%d')),
    "ears_k": 3,
    "ears_baseline": 8,
    "apply_bh_fdr": True,
    "alpha": 0.05,
    "duckdb_path": "syndromic.duckdb",
    "USE_R": False
}
CONFIG

In [None]:
#@title 3) Load Epidata metadata
meta = Epidata.covidcast_meta()
meta_df = pd.DataFrame(meta.get("epidata", []))
print("Total metadata rows:", len(meta_df))
display(meta_df.head(10))

In [None]:
#@title 4) Auto-clamp helper (handles week/day automatically)
from epiweeks import Week

def _epi_to_week(ei: int) -> Week:
    y, w = divmod(int(ei), 100)
    return Week(y, w)

def _week_to_epi(w: Week) -> int:
    return w.year * 100 + w.week

def auto_clamp_time_range(meta_df: pd.DataFrame, CONFIG: dict, last_n_days: int = 180, last_n_weeks: int = 104) -> None:
    s, sg, tt, gt = CONFIG.get("source"), CONFIG.get("signal"), CONFIG.get("time_type"), CONFIG.get("geo_type")
    mask = (
        (meta_df["data_source"] == s) &
        (meta_df["signal"]      == sg) &
        (meta_df["time_type"]   == tt) &
        (meta_df["geo_type"]    == gt)
    )
    subset = meta_df.loc[mask, ["min_time","max_time"]]
    if subset.empty:
        print(f"No metadata rows for {s}/{sg} ({tt}/{gt}). Leaving dates unchanged.")
        return
    min_t = int(subset["min_time"].min())
    max_t = int(subset["max_time"].max())
    if tt == "week":
        min_w = _epi_to_week(min_t)
        max_w = _epi_to_week(max_t)
        start_w = max_w - (last_n_weeks - 1)
        if start_w < min_w:
            start_w = min_w
        CONFIG["start"] = _week_to_epi(start_w)
        CONFIG["end"]   = max_t
        print(f"Auto-clamped (week): {CONFIG['start']} → {CONFIG['end']} (available {min_t} → {max_t})")
    else:
        min_dt = pd.to_datetime(str(min_t), format="%Y%m%d")
        max_dt = pd.to_datetime(str(max_t), format="%Y%m%d")
        start_dt = max(min_dt, max_dt - pd.Timedelta(days=last_n_days))
        CONFIG["start"] = int(start_dt.strftime('%Y%m%d'))
        CONFIG["end"]   = max_t
        print(f"Auto-clamped (day): {CONFIG['start']} → {CONFIG['end']} (available {min_t} → {max_t})")

auto_clamp_time_range(meta_df, CONFIG, last_n_days=180, last_n_weeks=104)
CONFIG

In [None]:
# 5) Robust fetch (legacy client + raw HTTP fallback)
# Put this near your other helpers
from epiweeks import Week
import pandas as pd

def normalize_time_value(df: pd.DataFrame, time_type: str) -> pd.DataFrame:
    """Parse df['time_value'] as datetime depending on time_type."""
    if "time_value" not in df:
        return df

    if time_type == "week":
        # keep the original epiweek for reference
        df["epiweek"] = df["time_value"].astype(int)
        def epi_to_ts(x):
            x = int(x)
            y, w = divmod(x, 100)
            return pd.Timestamp(Week(y, w).startdate())  # start of MMWR week (Monday)
        df["time_value"] = df["time_value"].apply(epi_to_ts)
    else:
        # day: values are YYYYMMDD ints
        df["time_value"] = pd.to_datetime(df["time_value"].astype(str), format="%Y%m%d", errors="coerce")

    return df

import requests
import pandas as pd
from delphi_epidata import Epidata

def fetch_epidata_legacy(source: str, signal: str, time_type: str, geo_type: str,
                         geo_values, start: int, end: int) -> pd.DataFrame:
    if isinstance(geo_values, str):
        geo_values = [geo_values]

    # A) Legacy Python client (positional order)
    try:
        tv = Epidata.range(start, end)
        resp = Epidata.covidcast(source, signal, time_type, geo_type, tv, geo_values)
        if resp.get("result") == 1 and resp.get("epidata"):
            df = pd.DataFrame(resp["epidata"])
            df = normalize_time_value(df, time_type)
            return df
    except Exception:
        pass  # fall through to HTTP

    # B) Raw HTTP fallback
    url = "https://api.delphi.cmu.edu/epidata/covidcast/"
    params = [
        ("data_source", source), ("signal", signal),
        ("time_type", time_type), ("geo_type", geo_type),
        ("time_values", f"{start}-{end}")
    ] + [("geo_value", gv) for gv in geo_values]

    r = requests.get(url, params=params, timeout=60)
    js = r.json()
    if js.get("result") != 1 or not js.get("epidata"):
        raise RuntimeError(f"No data for {source}/{signal} ({time_type}/{geo_type}) "
                           f"{start}-{end}. API said: {js.get('message', js)}")

    df = pd.DataFrame(js["epidata"])
    df = normalize_time_value(df, time_type)
    return df

# re-run the fetch
df = fetch_epidata_legacy(
    CONFIG["source"], CONFIG["signal"], CONFIG["time_type"],
    CONFIG["geo_type"], CONFIG["geo_values"], CONFIG["start"], CONFIG["end"]
)
print(df.shape)
display(df.head())


In [None]:
# 6) EARS detector + BH-FDR
def ears_detect(x: pd.DataFrame, value_col: str = "value", baseline: int = 8, k: float = 3.0) -> pd.DataFrame:
    x = x.sort_values("time_value").reset_index(drop=True).copy()
    x["mu"] = x[value_col].rolling(baseline, min_periods=baseline).mean().shift(1)
    x["sd"] = x[value_col].rolling(baseline, min_periods=baseline).std(ddof=1).shift(1)
    eps = 1e-9
    x["z"] = (x[value_col] - x["mu"]) / (x["sd"].fillna(0) + eps)
    from scipy.stats import norm
    x["p"] = 2 * (1 - norm.cdf(np.abs(x["z"])) )
    x["alert"] = (x["z"] >= k) & x["sd"].notna()
    return x

def benjamini_hochberg(pvals: pd.Series, alpha=0.05) -> pd.Series:
    p = np.asarray(pvals)
    n = (~np.isnan(p)).sum()
    if n == 0:
        return pd.Series([False]*len(pvals), index=pvals.index)
    order = np.argsort(p)
    ranked = np.arange(1, n+1)
    crit = alpha * ranked / n
    p_sorted = p[order]
    passed = p_sorted <= crit
    kmax = ranked[passed].max() if passed.any() else 0
    thresh = crit[kmax-1] if kmax>0 else -1
    return pd.Series(pvals <= thresh, index=pvals.index)

out = (df.groupby("geo_value", group_keys=False)
         .apply(lambda g: ears_detect(g, value_col="value", baseline=CONFIG["ears_baseline"], k=CONFIG["ears_k"]))
         .reset_index(drop=True))

if CONFIG["apply_bh_fdr"]:
    def per_time_bh(g):
        flags = benjamini_hochberg(g["p"], alpha=CONFIG["alpha"])
        g["bh_discovery"] = flags.values
        return g
    out = out.groupby("time_value", group_keys=False).apply(per_time_bh)
else:
    out["bh_discovery"] = out["alert"]

latest_date = out["time_value"].max()
alerts_latest = (out[out["time_value"]==latest_date]
                 .loc[:, ["geo_value","value","mu","sd","z","p","alert","bh_discovery"]]
                 .sort_values("z", ascending=False))
print("Latest period:", latest_date)
alerts_latest

In [None]:
# 7) Visualize trends & alerts (Plotly)
def plot_geo(gv: str):
    g = out.query("geo_value == @gv").copy()
    g["alert_flag"] = np.where(g["bh_discovery"], g["value"], np.nan)
    fig = px.line(g, x="time_value", y="value", title=f"{gv.upper()} — {CONFIG['source']}/{CONFIG['signal']} ({CONFIG['time_type']})")
    fig.add_scatter(x=g["time_value"], y=g["alert_flag"], mode="markers", name="ALERT (BH)")
    return fig

for gv in CONFIG["geo_values"]:
    fig = plot_geo(gv)
    fig.show()

## NWSS (Wastewater) — ready-made pull (state-week summary)
Loads CDC NWSS public table via Socrata (SODA). We auto-detect state/date/metric columns and build weekly medians by state.

In [None]:
#@title 8) Load NWSS wastewater public data (CDC Socrata) — robust
import pandas as pd

# You can optionally narrow columns to reduce transfer:
# NWSS_URL = ("https://data.cdc.gov/resource/2ew6-ywp6.csv"
#             "?$select=reporting_jurisdiction,date_end,date_start,percentile&$limit=500000")
NWSS_URL = "https://data.cdc.gov/resource/2ew6-ywp6.csv?$limit=500000"

# Avoid DtypeWarning by letting pandas read in larger chunks
ww = pd.read_csv(NWSS_URL, low_memory=False)
print("NWSS rows, cols:", ww.shape)
display(ww.head(3))

state_candidates = ["reporting_jurisdiction", "state", "wwtp_jurisdiction", "jurisdiction"]
date_candidates  = [
    "date_end", "date_start",                      # current schema
    "sample_collect_date", "sample_collection_date",
    "collection_date", "first_sample_date", "date", "sample_date"
]
metric_candidates = [
    "wastewater_percentile_7_day_rolling_average",
    "wastewater_percentile",
    "percentile",
    "ww_percentile"
]

def pick_col(df, candidates, kind):
    for c in candidates:
        if c in df.columns:
            return c
    raise RuntimeError(f"Could not find a {kind} column. Available: {list(df.columns)}")

state_col = pick_col(ww, state_candidates, "state")
date_col  = pick_col(ww, date_candidates, "date")
metric_col= pick_col(ww, metric_candidates, "metric")

# Coerce types explicitly
ww[date_col] = pd.to_datetime(ww[date_col], errors="coerce")
ww[metric_col] = pd.to_numeric(ww[metric_col], errors="coerce")

ww2 = ww[[state_col, date_col, metric_col]].dropna().copy()
ww2 = ww2.rename(columns={state_col: "state", date_col: "date", metric_col: "ww_percentile"})

# Aggregate to weekly-by-state (median) — uses calendar weeks; good enough for corroboration
ww2["week"] = ww2["date"].dt.to_period("W").dt.to_timestamp()
ww_weekly = (ww2.groupby(["state", "week"], as_index=False)
               .agg(ww_percentile=("ww_percentile", "median")))

print("Detected columns:", {"state": state_col, "date": date_col, "metric": metric_col})
print(ww_weekly.shape)
display(ww_weekly.head(10))


In [None]:
#@title 9) Store pulls & outputs in DuckDB (robust + collision-proof)
import duckdb
import pandas as pd

WRITE_MODE = "append"   # or "replace"
SCHEMA = "ss"           # short schema name unlikely to collide with any attached catalog

con = duckdb.connect(CONFIG["duckdb_path"])

# (Optional) show what catalogs are attached so you can spot collisions
dbs = con.execute("PRAGMA database_list").df()
print("Databases attached:")
display(dbs)

# Create our schema (no catalog prefix => use the connected DB)
con.execute(f'CREATE SCHEMA IF NOT EXISTS "{SCHEMA}";')

# Register DataFrames so DuckDB can read them
con.register("df_epidata", df)
con.register("df_alerts", out)
con.register("df_nwss", ww_weekly)

if WRITE_MODE == "replace":
    # Fresh snapshot each run
    con.execute(f'CREATE OR REPLACE TABLE "{SCHEMA}".epidata AS SELECT * FROM df_epidata;')
    con.execute(f'CREATE OR REPLACE TABLE "{SCHEMA}".alerts  AS SELECT * FROM df_alerts;')
    con.execute(f'CREATE OR REPLACE TABLE "{SCHEMA}".nwss    AS SELECT * FROM df_nwss;')
else:
    # Create tables from the DataFrame schemas, then append by column name
    con.execute(f'CREATE TABLE IF NOT EXISTS "{SCHEMA}".epidata AS SELECT * FROM df_epidata LIMIT 0;')
    con.execute(f'CREATE TABLE IF NOT EXISTS "{SCHEMA}".alerts  AS SELECT * FROM df_alerts   LIMIT 0;')
    con.execute(f'CREATE TABLE IF NOT EXISTS "{SCHEMA}".nwss    AS SELECT * FROM df_nwss     LIMIT 0;')

    con.execute(f'INSERT INTO "{SCHEMA}".epidata BY NAME SELECT * FROM df_epidata;')
    con.execute(f'INSERT INTO "{SCHEMA}".alerts  BY NAME SELECT * FROM df_alerts;')
    con.execute(f'INSERT INTO "{SCHEMA}".nwss    BY NAME SELECT * FROM df_nwss;')

# Quick counts for sanity
counts = con.execute(f"""
  SELECT 'epidata' AS table, COUNT(*) AS rows FROM "{SCHEMA}".epidata
  UNION ALL SELECT 'alerts', COUNT(*) FROM "{SCHEMA}".alerts
  UNION ALL SELECT 'nwss',   COUNT(*) FROM "{SCHEMA}".nwss
""").df()

con.close()
print("Saved to", CONFIG["duckdb_path"])
display(counts)




In [None]:
#@title 10) Minimal in-notebook dashboard (Plotly + ipywidgets; Dash-free)
import numpy as np
import plotly.graph_objs as go

def figure_for(gv: str):
    g = out.query("geo_value == @gv").copy()
    g["alert_flag"] = np.where(g["bh_discovery"], g["value"], np.nan)
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=g["time_value"], y=g["value"], mode="lines", name="value"))
    fig.add_trace(go.Scatter(x=g["time_value"], y=g["alert_flag"], mode="markers", name="ALERT (BH)"))
    fig.update_layout(
        title=f"{gv.upper()} — {CONFIG['source']}/{CONFIG['signal']} ({CONFIG['time_type']})",
        xaxis_title="time",
        yaxis_title="value",
        legend_title=""
    )
    return fig

# Try interactive dropdown; fall back to static if ipywidgets unavailable
try:
    import ipywidgets as widgets
    from IPython.display import display, clear_output

    geo_opts = [(g.upper(), g) for g in CONFIG["geo_values"]]
    dd = widgets.Dropdown(options=geo_opts, value=CONFIG["geo_values"][0], description="Geo:")
    out_box = widgets.Output()

    def render(_=None):
        with out_box:
            clear_output(wait=True)
            fig = figure_for(dd.value)
            fig.show()

    dd.observe(render, names="value")
    display(widgets.VBox([dd, out_box]))
    render()  # initial render

    # Also show latest-period alerts table below
    import pandas as pd
    display(alerts_latest)

except Exception as e:
    print("ipywidgets unavailable — showing static charts for each geo.\nReason:", e)
    for gv in CONFIG["geo_values"]:
        fig = figure_for(gv)
        fig.show()
    display(alerts_latest)


In [None]:
#@title 11) Export CSVs (alerts & raw pulls)
from datetime import datetime
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
raw_path = f"epidata_NHSN_raw_{ts}.csv"
alerts_path = f"alerts_NHSN_{ts}.csv"
nwss_path = f"nwss_weekly_{ts}.csv"
df.to_csv(raw_path, index=False)
out.to_csv(alerts_path, index=False)
ww_weekly.to_csv(nwss_path, index=False)
raw_path, alerts_path, nwss_path