In [15]:
from pathlib import Path
import pandas as pd
import numpy as np


SCRIPTS = Path(r"C:\Users\Dylan\OneDrive - Swinburne University\COS70008\Scripts")

p_emails_clean   = SCRIPTS / "data_clean" / "Emails_clean.parquet"               
p_risk_email     = SCRIPTS / "risk_analysis" / "RiskScores.parquet"              
p_sent_email     = SCRIPTS / "sentiment_analysis" / "SentimentScores.parquet"      

p_sent_emotion   = SCRIPTS / "nlp_sentiment" / "SentimentScores_emotion.parquet"  
p_risk_threads   = SCRIPTS / "Threads" / "RiskScores_threads.parquet"              
p_thread_index   = SCRIPTS / "Threads" / "ThreadIndex.parquet"                   

p_comm_meta      = SCRIPTS / "network_graphs" / "Communities_with_meta.parquet"    
p_comm           = SCRIPTS / "network_graphs" / "Communities.parquet"             
p_nodeindex      = SCRIPTS / "network_graphs" / "NodeIndex.parquet"                

OUT_DIR = SCRIPTS / "risk_analysis" / "results"
OUT_DIR.mkdir(parents=True, exist_ok=True)


def file_ok(p: Path) -> bool:
    return p.exists() and p.is_file()

def read_px(p: Path):
    return pd.read_parquet(p, engine="pyarrow", memory_map=False)

def safe_merge(left, right, on, how="left"):
    if right is None or len(right) == 0:
        return left.copy()
    return left.merge(right, on=on, how=how)

def normalize_sentiment_rows(df, label_col, score_col, out_value_col="sent_value"):
    if df is None or len(df) == 0:
        return df
    s = df[label_col].astype("string").str.lower()
    score = pd.to_numeric(df[score_col], errors="coerce").fillna(0.0)
    val = np.where(s.str.contains("pos"), +1.0 * score,
          np.where(s.str.contains("neg"), -1.0 * score, 0.0))
    df[out_value_col] = val.astype(float)
    return df

def dominant_label(df, key_cols, label_col, score_col, out_col):
    """Pick the highest-scoring label per key."""
    if df is None or len(df) == 0:
        return pd.DataFrame(columns=key_cols + [out_col])
    idx = (df.sort_values(score_col, ascending=False)
             .drop_duplicates(key_cols))
    return idx[key_cols + [label_col]].rename(columns={label_col: out_col})


assert file_ok(p_emails_clean), f"Missing {p_emails_clean}"
assert file_ok(p_risk_email),   f"Missing {p_risk_email}"
assert file_ok(p_sent_email),   f"Missing {p_sent_email}"

emails_clean = read_px(p_emails_clean)
risk_raw     = read_px(p_risk_email)
sent_email   = read_px(p_sent_email)

sent_emotion = read_px(p_sent_emotion) if file_ok(p_sent_emotion) else None
risk_threads = read_px(p_risk_threads) if file_ok(p_risk_threads) else None
thread_index = read_px(p_thread_index) if file_ok(p_thread_index) else None
comm_meta    = read_px(p_comm_meta) if file_ok(p_comm_meta) else None
comm         = read_px(p_comm) if (comm_meta is None and file_ok(p_comm)) else None
node_index   = read_px(p_nodeindex) if (comm_meta is None and file_ok(p_nodeindex)) else None


need = {"email_id","person_id","dt_utc"}
missing = need - set(emails_clean.columns)
assert not missing, f"Emails_clean missing columns: {missing}"

emails_clean["dt_utc"] = pd.to_datetime(emails_clean["dt_utc"], utc=True, errors="coerce")
emails_clean["yyyymm"] = emails_clean["dt_utc"].dt.strftime("%Y%m")


if "final_score" not in risk_raw.columns:
    raise ValueError("RiskScores.parquet must include 'final_score' (overall email risk).")
risk_email = risk_raw[["email_id", "final_score"]].copy()
risk_email["final_score"] = pd.to_numeric(risk_email["final_score"], errors="coerce").fillna(0.0)

if "risk_label" in risk_raw.columns:
    dom_risk = risk_raw[["email_id", "risk_label"]].rename(columns={"risk_label": "top_risk_cat"})
else:
    hit_cols = [c for c in risk_raw.columns if c.startswith("hits_")]
    if hit_cols:
        long_hits = (risk_raw[["email_id"] + hit_cols]
                     .melt(id_vars=["email_id"], var_name="risk_category", value_name="hits"))
        long_hits["risk_category"] = long_hits["risk_category"].str.replace(r"^hits_", "", regex=True)
        dom_risk = (long_hits.sort_values(["email_id","hits"], ascending=[True,False])
                            .drop_duplicates(["email_id"])
                            .rename(columns={"risk_category":"top_risk_cat"})
                            [["email_id","top_risk_cat"]])
    else:
        dom_risk = pd.DataFrame({"email_id": risk_email["email_id"], "top_risk_cat": np.nan})

sent_email = sent_email.rename(columns={"label":"sentiment_label","score":"sentiment_score"})
if "sentiment_score" not in sent_email.columns:
    cand = [c for c in sent_email.columns if "score" in c.lower()]
    sent_email["sentiment_score"] = pd.to_numeric(sent_email[cand[0]], errors="coerce").fillna(0.0) if cand else 0.0
if "sentiment_label" not in sent_email.columns:
    cand = [c for c in sent_email.columns if "label" in c.lower()]
    sent_email["sentiment_label"] = sent_email[cand[0]] if cand else "neutral"
sent_email = normalize_sentiment_rows(sent_email, "sentiment_label", "sentiment_score", "sent_value")
sent_best = (sent_email.sort_values("sentiment_score", ascending=False)
             .drop_duplicates(["email_id"])
             [["email_id","sentiment_label","sentiment_score","sent_value"]])

if sent_emotion is not None and len(sent_emotion):
    sent_emotion = sent_emotion.rename(columns={"label":"emotion_label","score":"emotion_score"})
    dom_emotion = dominant_label(sent_emotion, ["email_id"], "emotion_label", "emotion_score", "top_emotion")
else:
    dom_emotion = None

em_base = emails_clean[["email_id","person_id","dt_utc","yyyymm"]].drop_duplicates()
em_join = em_base.merge(risk_email, on="email_id", how="left")
em_join = safe_merge(em_join, dom_risk,  on=["email_id"])
em_join = safe_merge(em_join, sent_best, on=["email_id"])
em_join = safe_merge(em_join, dom_emotion, on=["email_id"])

for c in ["final_score","sentiment_score","sent_value"]:
    if c in em_join.columns:
        em_join[c] = em_join[c].fillna(0.0)


if comm_meta is not None and len(comm_meta):
    person_to_comm = comm_meta[["person_id","community_id"]].drop_duplicates()
else:
    if comm is None or node_index is None:
        raise FileNotFoundError("Need Communities_with_meta.parquet OR (Communities.parquet + NodeIndex.parquet).")
    person_to_comm = comm[["person_id","community_id"]].drop_duplicates()

em_join = em_join.merge(person_to_comm, on="person_id", how="left")


if (risk_threads is not None and len(risk_threads)
        and thread_index is not None and len(thread_index)
        and "thread_id" in thread_index.columns):
    th_map = thread_index[["email_id","thread_id"]].drop_duplicates()
    rt = risk_threads.rename(columns={"score":"final_score"})
    if "final_score" not in rt.columns:
        cand = [c for c in rt.columns if "score" in c.lower()]
        rt["final_score"] = pd.to_numeric(rt[cand[0]], errors="coerce").fillna(0.0) if cand else 0.0
    trisk = rt.groupby("thread_id", as_index=False)["final_score"].mean().rename(
        columns={"final_score":"thread_risk_mean"})
    em_join = em_join.merge(th_map, on="email_id", how="left").merge(trisk, on="thread_id", how="left")
    em_join["thread_risk_mean"] = em_join["thread_risk_mean"].fillna(0.0)

per_email = em_join.copy()
per_email["risk_score"] = per_email["final_score"].fillna(0.0)

grp = per_email.groupby(["person_id","community_id"], dropna=False)
mean_risk      = grp["risk_score"].mean().rename("mean_risk")
mean_sentiment = grp["sent_value"].mean().rename("mean_sentiment") if "sent_value" in per_email.columns else None
n_emails       = grp["email_id"].nunique().rename("n_emails")


if "top_risk_cat" in per_email.columns:
    cat_scores = (per_email.dropna(subset=["top_risk_cat"])
                           .groupby(["person_id","community_id","top_risk_cat"])["risk_score"]
                           .sum().reset_index())
    idx = (cat_scores.sort_values(["person_id","community_id","risk_score"], ascending=[True,True,False])
                    .drop_duplicates(["person_id","community_id"]))
    top_cat = idx.set_index(["person_id","community_id"])["top_risk_cat"]
else:
    top_cat = pd.Series(dtype="object")

summary_idx = mean_risk.index
summary = pd.DataFrame(index=summary_idx).reset_index()
summary = summary.merge(mean_risk.reset_index(), on=["person_id","community_id"])
if mean_sentiment is not None:
    summary = summary.merge(mean_sentiment.reset_index(), on=["person_id","community_id"], how="left")
summary = summary.merge(n_emails.reset_index(), on=["person_id","community_id"])
summary["top_risk_cat"] = summary.set_index(["person_id","community_id"]).index.map(top_cat).values


cgrp = summary.groupby("community_id", dropna=False)
comm_summary = pd.DataFrame({
    "community_id": cgrp.size().index,
    "n_people": cgrp["person_id"].nunique().values,
    "mean_risk": cgrp["mean_risk"].mean().values,
    "mean_sentiment": (cgrp["mean_sentiment"].mean().values if "mean_sentiment" in summary.columns else np.nan),
    "n_emails": cgrp["n_emails"].sum().values
})

if "top_risk_cat" in summary.columns:
    cat_c = (summary.dropna(subset=["top_risk_cat"])
                    .groupby(["community_id","top_risk_cat"])
                    .agg(weight=("mean_risk","mean"))
                    .reset_index())
    idx2 = (cat_c.sort_values(["community_id","weight"], ascending=[True,False])
                  .drop_duplicates(["community_id"]))
    comm_summary = comm_summary.merge(
        idx2.rename(columns={"top_risk_cat":"top_risk_cat_comm","weight":"top_cat_weight"}),
        on="community_id", how="left"
    )


if "yyyymm" in per_email.columns:
    trend_person = (per_email.groupby(["yyyymm","person_id"], dropna=False)
                    .agg(mean_risk=("risk_score","mean"),
                         mean_sentiment=("sent_value","mean"))
                    .reset_index())
    trend_comm   = (per_email.groupby(["yyyymm","community_id"], dropna=False)
                    .agg(mean_risk=("risk_score","mean"),
                         mean_sentiment=("sent_value","mean"))
                    .reset_index())
else:
    trend_person = pd.DataFrame(columns=["yyyymm","person_id","mean_risk","mean_sentiment"])
    trend_comm   = pd.DataFrame(columns=["yyyymm","community_id","mean_risk","mean_sentiment"])

trend_person["level"] = "person";  trend_person = trend_person.rename(columns={"person_id":"id"})
trend_comm["level"]   = "community"; trend_comm = trend_comm.rename(columns={"community_id":"id"})
trends = pd.concat([trend_person, trend_comm], ignore_index=True)


deliver = summary.copy()
for col in ["mean_sentiment"]:
    if col not in deliver.columns: deliver[col] = np.nan
deliver = deliver[["person_id","community_id","mean_risk","mean_sentiment","top_risk_cat","n_emails"]]

deliver.to_parquet(OUT_DIR / "RiskSentimentSummary.parquet", index=False)
summary.to_parquet(OUT_DIR / "PersonSummary.parquet", index=False)
comm_summary.to_parquet(OUT_DIR / "CommunitySummary.parquet", index=False)

for df in (trend_person, trend_comm):
    df["yyyymm"] = df["yyyymm"].astype("string")
   
    df["id"] = df["id"].astype("string")        # <- key fix
    df["mean_risk"] = pd.to_numeric(df["mean_risk"], errors="coerce").astype("float32")
    if "mean_sentiment" in df.columns:
        df["mean_sentiment"] = pd.to_numeric(df["mean_sentiment"], errors="coerce").astype("float32")
    else:
        df["mean_sentiment"] = np.nan


trends = pd.concat([trend_person, trend_comm], ignore_index=True)


trends["yyyymm"] = trends["yyyymm"].fillna("")
trends["id"] = trends["id"].fillna("")


trends.to_parquet(OUT_DIR / "Trends_monthly.parquet", index=False)

top_people = (summary.sort_values("mean_risk", ascending=False)
              .head(10)[["person_id","community_id","mean_risk","mean_sentiment","top_risk_cat","n_emails"]])
top_comms  = (comm_summary.sort_values("mean_risk", ascending=False)
              .head(10)[["community_id","mean_risk","mean_sentiment","n_people","n_emails","top_risk_cat_comm"]])
top_people.to_csv(OUT_DIR / "Top10_HighRisk_People.csv", index=False)
top_comms.to_csv(OUT_DIR / "Top10_HighRisk_Communities.csv", index=False)

print("✅ Wrote to:", OUT_DIR)


✅ Wrote to: C:\Users\Dylan\OneDrive - Swinburne University\COS70008\Scripts\risk_analysis\results
