# Sample Usage — Bayesian Email Link Scoring

In [12]:

import pandas as pd
import numpy as np
from dataclasses import dataclass
from math import gamma
import numpy as np

def normalize_emails(df: pd.DataFrame) -> pd.DataFrame:
    out = df.rename(columns={"message_id": "email_id", "date": "email_ts"}).copy()
    out["email_ts"] = pd.to_datetime(out["email_ts"], utc=True, errors="coerce")
    keep = {"email_id", "email_ts", "subject", "body", "from", "to"}
    return out[[c for c in out.columns if c in keep]].dropna(subset=["email_id", "email_ts"])

ELECTRICITY_LEXICON = {
    "caiso","ancillary","congestion","schedule","ricochet","death star","fat boy",
    "outage","bid","megawatt","load","rto","iso","lmp","price cap","overcharge",
    "reserve","balancing","settlement"
}

def simple_text_score(df: pd.DataFrame) -> pd.Series:
    text = (df["subject"].fillna("") + " " + df["body"].fillna("")).str.lower()
    hits = text.apply(lambda t: sum(1 for w in ELECTRICITY_LEXICON if w in t))
    m = hits.max() if hits.max() > 0 else 1
    return (hits / m).clip(0, 1)

def prepare_candidates(enron_df: pd.DataFrame, anom_df: pd.DataFrame) -> pd.DataFrame:
    enron_core = normalize_emails(enron_df)
    cand = enron_core.merge(anom_df, on="email_id", how="inner")
    cand["episode_ts"] = pd.to_datetime(cand["episode_ts"], utc=True, errors="coerce")
    cand["time_delta_sec"] = (cand["email_ts"] - cand["episode_ts"]).dt.total_seconds()
    if "text_score" not in cand.columns:
        cand["text_score"] = simple_text_score(cand)
    return cand

@dataclass
class Prior:
    pi: float = 1e-3

@dataclass
class ChannelParams:
    time_scale_pos: float
    time_scale_neg: float
    text_alpha_pos: float
    text_beta_pos: float
    text_alpha_neg: float
    text_beta_neg: float
    graph_beta0: float
    graph_beta: dict
    anom_mu_pos: float
    anom_sigma_pos: float
    anom_mu_neg: float
    anom_sigma_neg: float

def _laplace_pdf(x, scale): return (1.0/(2.0*scale)) * np.exp(-np.abs(x)/scale)

def _beta_pdf(z, a, b):
    z = np.clip(z, 1e-6, 1-1e-6)
    B = gamma(a)*gamma(b)/gamma(a+b)
    return (z**(a-1))*((1-z)**(b-1))/B

def _lognormal_pdf(x, mu, sigma):
    x = np.maximum(x, 1e-12)
    return (1.0/(x*sigma*np.sqrt(2*np.pi))) * np.exp(-((np.log(x)-mu)**2)/(2*sigma**2))

def _sigmoid(u): return 1.0/(1.0+np.exp(-u))

def _safe_ratio(a, b, eps=1e-12): return (a+eps)/(b+eps)

def fit_params_from_labels(df_pos: pd.DataFrame, df_neg: pd.DataFrame, prior_pi: float = 1e-3):
    tpos = abs(df_pos["time_delta_sec"]).mean() + 1e-6
    tneg = abs(df_neg["time_delta_sec"]).mean() + 1e-6

    def fit_beta(x):
        x = np.clip(x.astype(float), 1e-6, 1-1e-6)
        m, v = x.mean(), x.var() + 1e-9
        t = m*(1-m)/v - 1
        a = max(m*t, 1e-3); b = max((1-m)*t, 1e-3)
        return a, b

    ap,bp = fit_beta(df_pos["text_score"]); an,bn = fit_beta(df_neg["text_score"])

    def fit_logn(x):
        x = np.clip(x.astype(float), 1e-12, None)
        lx = np.log(x); return float(lx.mean()), float(lx.std(ddof=1)+1e-6)

    mup,sigp = fit_logn(df_pos["anomaly_score"]); mun,sign = fit_logn(df_neg["anomaly_score"])

    params = ChannelParams(
        time_scale_pos=tpos, time_scale_neg=tneg,
        text_alpha_pos=ap, text_beta_pos=bp, text_alpha_neg=an, text_beta_neg=bn,
        graph_beta0=0.0, graph_beta={"bias":0.0, "neg_hops":0.7, "same_comm":0.9},
        anom_mu_pos=mup, anom_sigma_pos=sigp, anom_mu_neg=mun, anom_sigma_neg=sign
    )
    return Prior(prior_pi), params

def score_df(df: pd.DataFrame, prior: Prior, params: ChannelParams) -> pd.DataFrame:
    bias = np.ones(len(df))
    neg_hops = -df["graph_shortest_hops"].astype(float)
    same_comm = df["graph_same_community"].astype(float)

    def _laplace_pdf(x, scale): return (1.0/(2.0*scale)) * np.exp(-np.abs(x)/scale)
    def _beta_pdf(z, a, b):
        z = np.clip(z, 1e-6, 1-1e-6)
        from math import gamma
        B = gamma(a)*gamma(b)/gamma(a+b)
        return (z**(a-1))*((1-z)**(b-1))/B
    def _lognormal_pdf(x, mu, sigma):
        x = np.maximum(x, 1e-12)
        return (1.0/(x*sigma*np.sqrt(2*np.pi))) * np.exp(-((np.log(x)-mu)**2)/(2*sigma**2))
    def _sigmoid(u): return 1.0/(1.0+np.exp(-u))
    def _safe_ratio(a, b, eps=1e-12): return (a+eps)/(b+eps)

    lr_time = _safe_ratio(
        _laplace_pdf(df["time_delta_sec"], params.time_scale_pos),
        _laplace_pdf(df["time_delta_sec"], params.time_scale_neg)
    )
    lr_text = _safe_ratio(
        _beta_pdf(df["text_score"], params.text_alpha_pos, params.text_beta_pos),
        _beta_pdf(df["text_score"], params.text_alpha_neg, params.text_beta_neg)
    )
    lin = (params.graph_beta0
           + params.graph_beta["bias"]*bias
           + params.graph_beta["neg_hops"]*neg_hops
           + params.graph_beta["same_comm"]*same_comm)
    s = np.clip(_sigmoid(lin), 1e-6, 1-1e-6)
    lr_graph = s/(1-s)
    lr_anom = _safe_ratio(
        _lognormal_pdf(df["anomaly_score"], params.anom_mu_pos, params.anom_sigma_pos),
        _lognormal_pdf(df["anomaly_score"], params.anom_mu_neg, params.anom_sigma_neg)
    )

    prior_odds = prior.pi/(1-prior.pi)
    odds = prior_odds * lr_time * lr_text * lr_graph * lr_anom
    post = odds/(1+odds)

    out = df.copy()
    out["posterior_prob"] = np.clip(post, 0, 1)
    out["lr_time"] = lr_time; out["lr_text"] = lr_text
    out["lr_graph"] = lr_graph; out["lr_anom"] = lr_anom
    return out.sort_values("posterior_prob", ascending=False).reset_index(drop=True)

In [13]:

import pandas as pd

enron_df = pd.DataFrame([
    {
        "message_id": "<18782981.1075855378110.JavaMail.evans@thyme>",
        "date": "2001-05-14 23:39:00Z",
        "subject": "CAISO congestion and price cap discussion",
        "body": "Team, CAISO congestion charges may spike; discuss LMP and schedule impacts. Keep this internal.",
        "from": "allen-p@enron.com",
        "to": "energy-team@enron.com",
        "file": "allen-p/_sent_mail/1."
    },
    {
        "message_id": "<15464986.1075855378456.JavaMail.evans@thyme>",
        "date": "2001-05-04 20:51:00Z",
        "subject": "Ancillary services bids and balancing market",
        "body": "Please review ancillary bids vs balancing requirements; potential outage risk noted.",
        "from": "allen-p@enron.com",
        "to": "grid-ops@enron.com",
        "file": "allen-p/_sent_mail/10."
    },
    {
        "message_id": "<20010515.083000.iris@enron>",
        "date": "2001-05-15 08:30:00Z",
        "subject": "Press talking points for analysts",
        "body": "Keep messaging strong on capacity; avoid mentioning the transmission constraint.",
        "from": "ir@enron.com",
        "to": "exec-staff@enron.com",
        "file": "ir/_sent_mail/77."
    },
    {
        "message_id": "<20010515.081200.ops@enron>",
        "date": "2001-05-15 08:12:00Z",
        "subject": "Outage schedule and Ricochet routing",
        "body": "Routing via out-of-state tie may trigger ‘ricochet’. Monitor settlement exposure.",
        "from": "ops@enron.com",
        "to": "trading-floor@enron.com",
        "file": "ops/_sent_mail/42."
    },
    {
        "message_id": "<20010513.120000.random@enron>",
        "date": "2001-05-13 12:00:00Z",
        "subject": "Lunch plans",
        "body": "Anyone up for sushi near the office?",
        "from": "hr@enron.com",
        "to": "all@enron.com",
        "file": "hr/_sent_mail/5."
    },
    {
        "message_id": "<20010514.230000.ops2@enron>",
        "date": "2001-05-14 23:00:00Z",
        "subject": "Settlement check: balancing & reserve",
        "body": "Check settlement diffs on reserve and balancing; potential overcharge flagged.",
        "from": "ops2@enron.com",
        "to": "settlements@enron.com",
        "file": "ops/_sent_mail/55."
    },
])

episode_ts = pd.Timestamp("2001-05-15 09:00:00Z")

anom_df = pd.DataFrame([
    {
        "email_id": "<18782981.1075855378110.JavaMail.evans@thyme>",
        "episode_id": "CAISO-20010515-AM",
        "episode_ts": episode_ts,
        "graph_shortest_hops": 1,
        "graph_same_community": 1,
        "anomaly_score": 3.2
    },
    {
        "email_id": "<15464986.1075855378456.JavaMail.evans@thyme>",
        "episode_id": "CAISO-20010515-AM",
        "episode_ts": episode_ts,
        "graph_shortest_hops": 2,
        "graph_same_community": 1,
        "anomaly_score": 2.1
    },
    {
        "email_id": "<20010515.083000.iris@enron>",
        "episode_id": "CAISO-20010515-AM",
        "episode_ts": episode_ts,
        "graph_shortest_hops": 2,
        "graph_same_community": 1,
        "anomaly_score": 4.0
    },
    {
        "email_id": "<20010515.081200.ops@enron>",
        "episode_id": "CAISO-20010515-AM",
        "episode_ts": episode_ts,
        "graph_shortest_hops": 1,
        "graph_same_community": 1,
        "anomaly_score": 5.1
    },
    {
        "email_id": "<20010513.120000.random@enron>",
        "episode_id": "CAISO-20010515-AM",
        "episode_ts": episode_ts,
        "graph_shortest_hops": 4,
        "graph_same_community": 0,
        "anomaly_score": 0.2
    },
    {
        "email_id": "<20010514.230000.ops2@enron>",
        "episode_id": "CAISO-20010515-AM",
        "episode_ts": episode_ts,
        "graph_shortest_hops": 2,
        "graph_same_community": 1,
        "anomaly_score": 2.6
    },
])

print("Samples ready:", len(enron_df), "emails,", len(anom_df), "candidates")


Samples ready: 6 emails, 6 candidates


In [14]:

import pandas as pd

candidates = prepare_candidates(enron_df, anom_df)

# Simple seeds: top anomaly + within 1 hour of episode -> positives; random others -> negatives
top_anom = candidates.nlargest(2, "anomaly_score")
near_time = candidates.loc[candidates["time_delta_sec"].abs() < 3600]
pos_seed = pd.concat([top_anom, near_time]).drop_duplicates("email_id")
neg_pool = candidates[~candidates["email_id"].isin(pos_seed["email_id"])]
neg_seed = neg_pool.sample(min(len(pos_seed)*2, len(neg_pool)), random_state=7) if len(neg_pool) else pos_seed

prior, params = fit_params_from_labels(pos_seed, neg_seed, prior_pi=1e-3)
scored = score_df(candidates, prior, params)

scored[["email_id","episode_id","posterior_prob","time_delta_sec",
        "text_score","graph_shortest_hops","graph_same_community","anomaly_score","subject"]].head(10)


Unnamed: 0,email_id,episode_id,posterior_prob,time_delta_sec,text_score,graph_shortest_hops,graph_same_community,anomaly_score,subject
0,<20010515.083000.iris@enron>,CAISO-20010515-AM,0.0304102,-1800.0,0.0,2,1,4.0,Press talking points for analysts
1,<20010515.081200.ops@enron>,CAISO-20010515-AM,0.002888415,-2880.0,0.666667,1,1,5.1,Outage schedule and Ricochet routing
2,<18782981.1075855378110.JavaMail.evans@thyme>,CAISO-20010515-AM,5.456189e-09,-33660.0,1.0,1,1,3.2,CAISO congestion and price cap discussion
3,<20010514.230000.ops2@enron>,CAISO-20010515-AM,5.809741e-12,-36000.0,0.666667,2,1,2.6,Settlement check: balancing & reserve
4,<15464986.1075855378456.JavaMail.evans@thyme>,CAISO-20010515-AM,2.158811e-14,-907740.0,0.666667,2,1,2.1,Ancillary services bids and balancing market
5,<20010513.120000.random@enron>,CAISO-20010515-AM,1.3184430000000001e-23,-162000.0,0.0,4,0,0.2,Lunch plans
