In [None]:
# ============================
# Social Intelligence Analyst Agent (Schema-agnostic, Autonomous-by-default)
# ============================
# Requires:
#   pip install -U google-genai pandas numpy scikit-learn umap-learn matplotlib tqdm rich
#
# Key features:
# - Robust header/preamble detection + delimiter sniff
# - Column selection based on observed evidence (examples + diagnostics), not name guesses
# - Datetime diagnostics computed for ALL columns
# - No downsampling of dataset
# - Adaptive k (metrics + coherence checks, iterate)
# - Language-aware labeling (no English assumption)
# - Autonomous by default; pauses only when low-confidence or repeated failures
# - Natural-language interjection patches AGENT_STATE safely
# ============================

from __future__ import annotations

import os, re, csv, json, time, traceback
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from tqdm import tqdm
from sklearn.cluster import KMeans
from sklearn.cluster import MiniBatchKMeans
from sklearn.preprocessing import Normalizer
from sklearn.metrics import davies_bouldin_score, calinski_harabasz_score
import umap.umap_ as umap

from rich.console import Console
from rich.panel import Panel

from getpass import getpass
from google import genai
from google.genai import types

console = Console()

# ----------------------------
# Agent State (edit DATA_PATH)
# ----------------------------
AGENT_STATE: Dict[str, Any] = {
    "DATA_PATH": "/Users/angusmclean/Downloads/2056554290/SocialData.csv",  # <-- set this
    "OUTPUT_DIR": "outputs",
    "artifacts": [],
    "run_id": time.strftime("%Y%m%d-%H%M%S"),

    # Autonomous behavior controls
    "AUTO_MODE": True,              # autonomous by default
    "PAUSE_ON_LOW_CONFIDENCE": True,
    "LOW_CONF_THRESHOLD": 0.70,     # 0..1 (LLM-reported confidence)

    # Optional explicit overrides (set by you or via NL interjection)
    # "TEXT_COL": "exact column name",
    # "TIME_COLS": ["one or more time columns"],
    # "ENGAGEMENT_COLS": ["numeric columns"],

    # Adaptive-k controls (can be adjusted by agent or you)
    "K_MIN": 8,
    "K_MAX": 80,
    "K_STEP": 4,
    "K_FINE_WINDOW": 6,
    # Optional override:
    # "FORCE_K": 32,
}

# ----------------------------
# Artifact helpers
# ----------------------------
def ensure_dirs() -> None:
    os.makedirs(AGENT_STATE["OUTPUT_DIR"], exist_ok=True)

def add_artifact(path: str) -> None:
    if not path:
        return
    p = str(path)
    if "artifacts" not in AGENT_STATE or AGENT_STATE["artifacts"] is None:
        AGENT_STATE["artifacts"] = []
    if p not in AGENT_STATE["artifacts"]:
        AGENT_STATE["artifacts"].append(p)

def safe_write(path: str, content: str) -> None:
    Path(path).parent.mkdir(parents=True, exist_ok=True)
    with open(path, "w", encoding="utf-8") as f:
        f.write(content)
    add_artifact(path)

def savefig(path: str) -> None:
    plt.savefig(path, dpi=200, bbox_inches="tight")
    plt.close()
    add_artifact(path)

def validate_paths() -> None:
    dp = AGENT_STATE["DATA_PATH"]
    if not os.path.exists(dp):
        raise FileNotFoundError(f"DATA_PATH does not exist: {dp}")
    ensure_dirs()

# ----------------------------
# Gemini client
# ----------------------------
def get_gemini_client() -> genai.Client:
    if not os.environ.get("GOOGLE_API_KEY"):
        os.environ["GOOGLE_API_KEY"] = getpass("Gemini API key: ")
    return genai.Client()

# ----------------------------
# Safe natural-language interjection -> state patch
# ----------------------------
ALLOWED_STATE_KEYS = {
    "TEXT_COL", "TIME_COLS", "ENGAGEMENT_COLS",
    "K_MIN", "K_MAX", "K_STEP", "K_FINE_WINDOW", "FORCE_K",
    "AUTO_MODE", "PAUSE_ON_LOW_CONFIDENCE", "LOW_CONF_THRESHOLD",
}

def apply_state_patch(patch: Dict[str, Any]) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    applied, ignored = {}, {}
    for k, v in patch.items():
        if k in ALLOWED_STATE_KEYS:
            AGENT_STATE[k] = v
            applied[k] = v
        else:
            ignored[k] = v
    return applied, ignored

def nl_to_state_patch(client: genai.Client, user_message: str, context: str) -> Dict[str, Any]:
    prompt = {
        "task": "Convert user guidance into a minimal patch to AGENT_STATE.",
        "rules": [
            "Return ONLY a valid JSON object.",
            f"Only use these keys: {sorted(ALLOWED_STATE_KEYS)}",
            "If user says 'don't use engagement', set ENGAGEMENT_COLS=[]",
            "If user says 'use X as time', set TIME_COLS=[X] unless they mention multiple",
        ],
        "context": context,
        "user_message": user_message
    }
    resp = client.models.generate_content(model="gemini-2.5-pro", contents=json.dumps(prompt, ensure_ascii=False))
    txt = (resp.text or "").strip()
    m = re.search(r"\{.*\}", txt, flags=re.DOTALL)
    if not m:
        raise ValueError(f"Could not parse JSON from NL patch response.\nRaw:\n{txt[:1200]}")
    return json.loads(m.group(0))

def maybe_pause_for_user(
    client: genai.Client,
    title: str,
    summary: str,
    force: bool = False
) -> None:
    """
    Autonomous by default.
    Pauses only if force=True or AUTO_MODE=False.
    Natural-language input becomes state patch.
    """
    console.print(Panel(summary, title=f"CHECKPOINT: {title}", style="cyan"))

    if AGENT_STATE.get("AUTO_MODE", True) and not force:
        return

    while True:
        msg = input("Enter=continue | 'stop'=halt | or type guidance in plain English: ").strip()
        if msg == "":
            return
        if msg.lower() == "stop":
            raise SystemExit("Stopped by user.")
        try:
            patch = nl_to_state_patch(client, msg, context=f"{title}\n\n{summary}")
            applied, ignored = apply_state_patch(patch)
            console.print(Panel(json.dumps(applied, indent=2, ensure_ascii=False), title="Applied patch", style="green"))
            if ignored:
                console.print(Panel(json.dumps(ignored, indent=2, ensure_ascii=False), title="Ignored keys", style="yellow"))
            console.print(Panel("Patch applied. Enter to continue or add more guidance.", style="green"))
        except Exception as e:
            console.print(Panel(f"Could not apply guidance: {e}", title="Interjection error", style="red"))

# ----------------------------
# Stage runner with 3-strikes stop + ask
# ----------------------------
@dataclass
class StageRunner:
    name: str
    max_failures: int = 3
    failures: int = 0

    def run(self, client: genai.Client, fn):
        while True:
            try:
                return fn()
            except SystemExit:
                raise
            except Exception:
                self.failures += 1
                err = traceback.format_exc()
                console.print(Panel(err[-4000:], title=f"ERROR in stage '{self.name}' (attempt {self.failures})", style="red"))
                fail_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], f"last_failed_{self.name}.txt")
                safe_write(fail_path, err)

                if self.failures >= self.max_failures:
                    # Force human interjection after 3 strikes
                    maybe_pause_for_user(
                        client,
                        title=f"{self.name} failed {self.failures} times",
                        summary=f"Last error saved: {fail_path}\nYou can give guidance like:\n"
                                f"- use column X as text\n- use column Y as time\n- force k to 24\n"
                                f"Then press Enter to retry, or type stop.",
                        force=True
                    )
                    self.failures = 0  # reset after user intervention
                else:
                    # In AUTO_MODE, we still continue retrying unless user has set AUTO_MODE=False
                    maybe_pause_for_user(
                        client,
                        title=f"{self.name} failed",
                        summary=f"Last error saved: {fail_path}\n"
                                f"(Autonomous retry will continue unless you disable AUTO_MODE.)",
                        force=False
                    )
                    if not AGENT_STATE.get("AUTO_MODE", True):
                        # If not autonomous, give chance to intervene now
                        maybe_pause_for_user(client, title=f"{self.name} retry?", summary="Provide guidance or Enter to retry.", force=True)

# ----------------------------
# CSV header/preamble + delimiter sniff + load
# ----------------------------
def looks_like_metadata(line: str) -> bool:
    # ex: "Report:, Bulk Mentions Download"
    return bool(re.match(r"^\s*[A-Za-z][A-Za-z _-]*:\s*,", line))

def find_real_header_row(path: str, scan_lines: int = 6000) -> int:
    with open(path, "r", encoding="utf-8", errors="replace") as f:
        lines = []
        for _ in range(scan_lines):
            line = f.readline()
            if not line:
                break
            lines.append(line.rstrip("\n"))

    i = 0
    while i < len(lines) and (lines[i].strip() == "" or looks_like_metadata(lines[i])):
        i += 1

    # first line with enough separators and not metadata
    for j in range(i, len(lines)):
        line = lines[j]
        if looks_like_metadata(line):
            continue
        if max(line.count(","), line.count("\t"), line.count(";"), line.count("|")) >= 2:
            return j

    return i

def sniff_delimiter(path: str, header_row: int) -> str:
    with open(path, "r", encoding="utf-8", errors="replace") as f:
        for _ in range(header_row):
            f.readline()
        sample = "".join([f.readline() for _ in range(40)])
    try:
        dialect = csv.Sniffer().sniff(sample, delimiters=[",", "\t", ";", "|"])
        return dialect.delimiter
    except Exception:
        return ","  # acceptable base assumption per your earlier OK

def robust_read_csv(path: str) -> Tuple[pd.DataFrame, int, str]:
    header_row = find_real_header_row(path)
    delim = sniff_delimiter(path, header_row)
    try:
        df = pd.read_csv(path, skiprows=header_row, sep=delim)
    except pd.errors.ParserError:
        df = pd.read_csv(path, skiprows=header_row, sep=delim, engine="python", on_bad_lines="skip")
    return df, header_row, delim

# ----------------------------
# Profiling + diagnostics (observe before decide)
# ----------------------------
def profile_dataframe(df: pd.DataFrame, out_path: str, n_examples: int = 5) -> None:
    lines = [f"shape: {df.shape}", "columns:"]
    for c in df.columns:
        s = df[c]
        dtype = str(s.dtype)
        null_rate = float(s.isna().mean())
        examples = s.dropna().astype(str).head(n_examples).tolist()
        lines.append(json.dumps({
            "name": str(c),
            "dtype": dtype,
            "null_rate": round(null_rate, 4),
            "examples": examples
        }, ensure_ascii=False))
    safe_write(out_path, "\n".join(lines))

def datetime_parse_diagnostics(df: pd.DataFrame) -> pd.DataFrame:
    rows = []
    for c in df.columns:
        s = df[c]
        if s.dropna().empty:
            continue
        parsed = pd.to_datetime(s, errors="coerce", utc=False)
        rate = float(parsed.notna().mean())
        if rate > 0:
            rows.append({
                "column": str(c),
                "parse_success_rate": rate,
                "min": str(parsed.min()),
                "max": str(parsed.max()),
                "dtype": str(s.dtype),
            })
    return pd.DataFrame(rows).sort_values("parse_success_rate", ascending=False)

def basic_textiness_stats(df: pd.DataFrame, max_rows_for_stats: int = 2000) -> pd.DataFrame:
    """
    No assumptions about column names.
    Computes stats on string-like columns to help choose a 'text' field.
    (This does NOT change the dataset; it's just diagnostics.)
    """
    rows = []
    # limit just for diagnostics cost; does not affect the dataset used later
    sub = df.head(max_rows_for_stats)
    for c in df.columns:
        s = sub[c]
        if s.dtype == "O" or pd.api.types.is_string_dtype(s):
            nonnull = s.dropna().astype(str)
            if nonnull.empty:
                continue
            lens = nonnull.str.len()
            rows.append({
                "column": str(c),
                "avg_len": float(lens.mean()),
                "p90_len": float(lens.quantile(0.9)),
                "unique_ratio": float(nonnull.nunique() / max(len(nonnull), 1)),
                "nonnull_ratio": float(nonnull.shape[0] / max(sub.shape[0], 1)),
            })
    return pd.DataFrame(rows).sort_values(["avg_len", "unique_ratio"], ascending=False)

# ----------------------------
# LLM: propose interpretation (with confidence + evidence)
# ----------------------------
def llm_propose_schema(
    client: genai.Client,
    df: pd.DataFrame,
    dt_diag: pd.DataFrame,
    text_diag: pd.DataFrame,
    profile_path: str
) -> Dict[str, Any]:
    schema_examples = []
    for c in df.columns:
        ex = df[c].dropna().astype(str).head(3).tolist()
        schema_examples.append({"name": str(c), "dtype": str(df[c].dtype), "examples": ex})

    prompt = {
        "task": "Infer which columns correspond to post text, timestamps, and optional engagement signals.",
        "hard_rules": [
            "Do not assume column names ahead of evidence.",
            "Use examples + diagnostics only.",
            "Return strict JSON only."
        ],
        "return_format": {
            "text_col": "string",
            "time_cols": ["string", "... (one or more)"],
            "engagement_cols": ["string", "... (optional, empty if uncertain)"],
            "language_notes": "string",
            "confidence": "number 0..1",
            "evidence": {
                "why_text": "string",
                "why_time": "string",
                "why_engagement": "string"
            }
        },
        "profile_path_written": profile_path,
        "textiness_diagnostics_top": text_diag.head(15).to_dict(orient="records"),
        "datetime_diagnostics_top": dt_diag.head(15).to_dict(orient="records"),
        "schema_examples": schema_examples
    }

    resp = client.models.generate_content(model="gemini-2.5-pro", contents=json.dumps(prompt, ensure_ascii=False))
    txt = (resp.text or "").strip()
    m = re.search(r"\{.*\}", txt, flags=re.DOTALL)
    if not m:
        raise ValueError(f"LLM did not return JSON.\nRaw:\n{txt[:1200]}")
    return json.loads(m.group(0))

# ----------------------------
# Prepare dataframe using chosen columns (no downsampling)
# ----------------------------
def prepare_dataframe(
    df: pd.DataFrame,
    text_col: str,
    time_cols: List[str],
    engagement_cols: List[str]
) -> Tuple[pd.DataFrame, bool]:
    if text_col not in df.columns:
        raise ValueError(f"text_col '{text_col}' not in columns")
    for c in time_cols:
        if c not in df.columns:
            raise ValueError(f"time_col '{c}' not in columns")

    out = df.copy()
    out = out.rename(columns={text_col: "text"})
    out["text"] = out["text"].astype(str)
    out = out[out["text"].str.strip() != ""]

    primary_time = time_cols[0]
    out = out.rename(columns={primary_time: "timestamp"})
    out["timestamp"] = pd.to_datetime(out["timestamp"], errors="coerce", utc=False)
    out = out.dropna(subset=["timestamp", "text"])

    # Preserve other time columns (parsed) if requested
    for c in time_cols[1:]:
        out[f"timestamp__{c}"] = pd.to_datetime(out[c], errors="coerce", utc=False)

    use_proxy = True
    if engagement_cols:
        for c in engagement_cols:
            if c not in out.columns:
                raise ValueError(f"engagement_col '{c}' not in columns")
        for c in engagement_cols:
            out[c] = pd.to_numeric(out[c], errors="coerce").fillna(0)
        out["engagement"] = out[engagement_cols].sum(axis=1)
        use_proxy = False
    else:
        # explicit proxy behavior (no guessing)
        out["engagement"] = 1
        use_proxy = True

    out = out.reset_index(drop=True)
    return out, use_proxy

# ----------------------------
# Embeddings (full dataset)
# ----------------------------
def embed_texts(client: genai.Client, texts: List[str], batch_size: int = 96) -> np.ndarray:
    vectors: List[np.ndarray] = []
    for start in tqdm(range(0, len(texts), batch_size), desc="Embedding"):
        batch = [str(t).strip()[:4000] for t in texts[start:start + batch_size]]
        resp = client.models.embed_content(
            model="gemini-embedding-001",
            contents=batch,
            config=types.EmbedContentConfig(task_type="CLUSTERING"),
        )
        for e in resp.embeddings:
            vectors.append(np.array(e.values, dtype=np.float32))
    return np.vstack(vectors)

# ----------------------------
# Adaptive-k: metrics + coherence checks
# ----------------------------
def kmeans_metrics(Xn: np.ndarray, k: int, use_minibatch: bool = True) -> Dict[str, float]:
    t0 = time.time()
    if use_minibatch:
        km = MiniBatchKMeans(
            n_clusters=k,
            random_state=42,
            batch_size=4096,
            n_init=3,
            max_no_improvement=10,
            reassignment_ratio=0.01
        )
    else:
        km = KMeans(n_clusters=k, random_state=42, n_init=10)

    labels = km.fit_predict(Xn)
    fit_s = float(time.time() - t0)

    db = float(davies_bouldin_score(Xn, labels)) if k > 1 else float("nan")
    ch = float(calinski_harabasz_score(Xn, labels)) if k > 1 else float("nan")
    inertia = float(km.inertia_)

    return {"k": int(k), "davies_bouldin": db, "calinski_harabasz": ch, "inertia": inertia, "fit_seconds": fit_s}

def compute_representatives(Xn: np.ndarray, labels: np.ndarray, centers: np.ndarray, top_n: int = 10) -> Dict[int, List[int]]:
    reps: Dict[int, List[int]] = {}
    for cid in range(centers.shape[0]):
        idx = np.where(labels == cid)[0]
        if idx.size == 0:
            reps[cid] = []
            continue
        sims = Xn[idx] @ centers[cid]  # Xn is l2-normalized => dot is cosine similarity
        order = np.argsort(-sims)
        reps[cid] = idx[order[:min(top_n, len(order))]].tolist()
    return reps

def llm_coherence_verdict(
    client: genai.Client,
    df: pd.DataFrame,
    Xn: np.ndarray,
    k: int
) -> Dict[str, Any]:
    km = KMeans(n_clusters=int(k), random_state=42, n_init=10)
    labels = km.fit_predict(Xn)
    centers = km.cluster_centers_
    reps = compute_representatives(Xn, labels, centers, top_n=8)

    clusters_payload = []
    for cid in range(int(k)):
        idxs = reps.get(cid, [])
        snippets = [df.loc[i, "text"][:240].replace("\n", " ") for i in idxs]
        clusters_payload.append({"cluster": cid, "examples": snippets})

    prompt = {
        "task": "Assess topic cluster coherence and distinctness.",
        "instructions": [
            "Do not assume a language; judge based on provided text.",
            "Return strict JSON only.",
            "If too many clusters are redundant/overlapping => verdict='too_high'.",
            "If clusters are too broad/mixed => verdict='too_low'.",
            "If clusters look distinct and coherent => verdict='good'."
        ],
        "return_format": {"k": "int", "verdict": "good|too_low|too_high", "notes": "string", "suggested_k": "int"},
        "candidate_k": int(k),
        "clusters": clusters_payload
    }

    resp = client.models.generate_content(model="gemini-2.5-pro", contents=json.dumps(prompt, ensure_ascii=False))
    txt = (resp.text or "").strip()
    m = re.search(r"\{.*\}", txt, flags=re.DOTALL)
    if not m:
        return {"k": int(k), "verdict": "unknown", "notes": "No JSON returned", "suggested_k": int(k)}
    return json.loads(m.group(0))

def adaptive_k_search(client: genai.Client, df: pd.DataFrame, Xn: np.ndarray) -> int:
    if AGENT_STATE.get("FORCE_K") is not None:
        return int(AGENT_STATE["FORCE_K"])

    k_min = int(AGENT_STATE.get("K_MIN", 8))
    k_max = int(AGENT_STATE.get("K_MAX", 80))
    k_step = int(AGENT_STATE.get("K_STEP", 4))
    fine_window = int(AGENT_STATE.get("K_FINE_WINDOW", 6))

    # ---------- Phase 1: Coarse scan (MiniBatch) ----------
    ks = list(range(k_min, k_max + 1, k_step))
    rows = []
    best_db = float("inf")
    worsen_streak = 0
    MAX_WORSEN_STREAK = 5  # early-stop heuristic

    for k in tqdm(ks, desc="K scan (coarse, MiniBatch)"):
        m = kmeans_metrics(Xn, k, use_minibatch=True)
        rows.append(m)

        # early stop if DB keeps worsening as k increases (often indicates over-fragmentation)
        db = m["davies_bouldin"]
        if np.isfinite(db):
            if db < best_db:
                best_db = db
                worsen_streak = 0
            else:
                worsen_streak += 1

        if worsen_streak >= MAX_WORSEN_STREAK and k > (k_min + 3 * k_step):
            break

    coarse_df = pd.DataFrame(rows).sort_values("k")
    cpath = os.path.join(AGENT_STATE["OUTPUT_DIR"], "k_metrics_coarse.csv")
    coarse_df.to_csv(cpath, index=False)
    add_artifact(cpath)

    # pick coarse best: minimize DB then maximize CH
    coarse_best = int(
        coarse_df.sort_values(["davies_bouldin", "calinski_harabasz"], ascending=[True, False]).iloc[0]["k"]
    )

    # ---------- Phase 2: Fine scan around coarse best ----------
    fine_ks = sorted(set(range(max(2, coarse_best - fine_window), coarse_best + fine_window + 1)))
    fine_rows = []
    for k in tqdm(fine_ks, desc="K scan (fine, MiniBatch)"):
        fine_rows.append(kmeans_metrics(Xn, k, use_minibatch=True))

    fine_df = pd.DataFrame(fine_rows).sort_values("k")
    fpath = os.path.join(AGENT_STATE["OUTPUT_DIR"], "k_metrics_fine.csv")
    fine_df.to_csv(fpath, index=False)
    add_artifact(fpath)

    combined = pd.concat([coarse_df, fine_df], ignore_index=True).drop_duplicates(subset=["k"])
    combined = combined.sort_values("k")

    # shortlist top candidates by DB/CH
    shortlist = (
        combined.sort_values(["davies_bouldin", "calinski_harabasz"], ascending=[True, False])
        .head(3)["k"].astype(int).tolist()
    )

    # ---------- Phase 3: LLM coherence checks ----------
    verdicts = []
    for k in shortlist:
        verdicts.append(llm_coherence_verdict(client, df, Xn, int(k)))

    verdict_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], "k_coherence_verdicts.json")
    safe_write(verdict_path, json.dumps(verdicts, indent=2, ensure_ascii=False))

    good = [v for v in verdicts if str(v.get("verdict", "")).lower() == "good"]
    if good:
        chosen = int(good[0].get("k", shortlist[0]))
    else:
        sugg = [int(v.get("suggested_k", shortlist[0])) for v in verdicts]
        chosen = int(np.median(sugg)) if sugg else int(shortlist[0])

    # Only pause if not autonomous or coherence is unknown
    unclear = all(str(v.get("verdict", "")).lower() in {"unknown"} for v in verdicts)
    summary = {
        "coarse_best": coarse_best,
        "shortlist": shortlist,
        "verdicts": verdicts,
        "chosen_k": chosen
    }
    maybe_pause_for_user(
        client,
        title="Adaptive K chosen (fast mode)",
        summary=json.dumps(summary, indent=2, ensure_ascii=False),
        force=(not AGENT_STATE.get("AUTO_MODE", True)) or unclear
    )

    if AGENT_STATE.get("FORCE_K") is not None:
        return int(AGENT_STATE["FORCE_K"])
    return chosen

# ----------------------------
# UMAP + charts
# ----------------------------
def plot_umap(df: pd.DataFrame, out_path: str) -> None:
    fig, ax = plt.subplots(figsize=(14, 10))
    sc = ax.scatter(df["umap_x"], df["umap_y"], c=df["cluster"], s=6, alpha=0.75)
    ax.set_title("UMAP Visualization of Conversation Clusters")
    ax.set_xlabel("UMAP 1")
    ax.set_ylabel("UMAP 2")
    cb = fig.colorbar(sc)
    cb.set_label("Cluster ID")
    savefig(out_path)

def plot_volume_timeseries(df: pd.DataFrame, out_path: str) -> pd.DataFrame:
    ts = df.set_index(df["timestamp"]).resample("D").size().reset_index(name="volume")
    fig, ax = plt.subplots(figsize=(15, 6))
    ax.plot(ts["timestamp"], ts["volume"], marker="o")
    ax.set_title("Conversation Volume Over Time")
    ax.set_xlabel("Date")
    ax.set_ylabel("Posts (Volume)")
    fig.autofmt_xdate()
    savefig(out_path)
    return ts

def plot_velocity(ts: pd.DataFrame, out_path: str) -> None:
    t = ts.copy()
    t["velocity"] = t["volume"].diff().fillna(0)
    fig, ax = plt.subplots(figsize=(15, 6))
    ax.bar(t["timestamp"], t["velocity"])
    ax.set_title("Conversation Velocity (Day-over-Day Change)")
    ax.set_xlabel("Date")
    ax.set_ylabel("Δ Posts")
    fig.autofmt_xdate()
    savefig(out_path)

def plot_roi(cluster_stats: pd.DataFrame, out_path: str, engagement_is_proxy: bool) -> None:
    fig, ax = plt.subplots(figsize=(10, 8))
    ax.scatter(cluster_stats["volume"], cluster_stats["engagement"])
    ax.axvline(cluster_stats["volume"].mean(), linestyle="--")
    ax.axhline(cluster_stats["engagement"].mean(), linestyle="--")
    ax.set_xscale("log")
    ax.set_yscale("log")
    ax.set_xlabel("Cluster Volume (log)")
    ax.set_ylabel("Engagement Proxy (log)" if engagement_is_proxy else "Engagement (log)")
    ax.set_title("ROI Quadrant: Volume vs Engagement")
    savefig(out_path)

def plot_anomalies(ts: pd.DataFrame, out_path: str, window: int = 7, z: float = 2.0) -> None:
    t = ts.copy()
    t["mean"] = t["volume"].rolling(window, min_periods=1).mean()
    t["std"] = t["volume"].rolling(window, min_periods=1).std().fillna(0)
    t["upper"] = t["mean"] + z * t["std"]
    t["is_anomaly"] = t["volume"] > t["upper"]

    fig, ax = plt.subplots(figsize=(15, 6))
    ax.plot(t["timestamp"], t["volume"], label="Volume")
    ax.plot(t["timestamp"], t["mean"], linestyle="--", label="Rolling Mean")
    ax.fill_between(t["timestamp"], (t["mean"] - z * t["std"]), t["upper"], alpha=0.2, label="±2σ band")
    ax.scatter(t.loc[t["is_anomaly"], "timestamp"], t.loc[t["is_anomaly"], "volume"], label="Anomalies")
    ax.set_title("Anomaly Detection on Volume")
    ax.set_xlabel("Date")
    ax.set_ylabel("Posts (Volume)")
    ax.legend()
    fig.autofmt_xdate()
    savefig(out_path)

# ----------------------------
# Language-aware labeling (no English assumption)
# ----------------------------
def detect_language_notes(client: genai.Client, texts: List[str]) -> str:
    prompt = {
        "task": "Detect the language(s) present and recommend labeling strategy.",
        "instructions": [
            "Do not assume English.",
            "If multilingual, say so and suggest strategy.",
            "Return short plain text."
        ],
        "examples": [t[:400].replace("\n", " ") for t in texts[:40]]
    }
    resp = client.models.generate_content(model="gemini-2.5-pro", contents=json.dumps(prompt, ensure_ascii=False))
    return (resp.text or "").strip()

def label_clusters(client: genai.Client, df: pd.DataFrame, Xn: np.ndarray, centers: np.ndarray, k: int, language_notes: str) -> Dict[int, str]:
    labels: Dict[int, str] = {}
    reps = compute_representatives(Xn, df["cluster"].to_numpy(), centers, top_n=12)
    for cid in tqdm(range(k), desc="Labeling clusters"):
        idxs = reps.get(cid, [])
        snippets = [df.loc[i, "text"][:320].replace("\n", " ") for i in idxs]
        prompt = {
            "task": "Create a short topic label for this cluster.",
            "instructions": [
                "Do not assume English.",
                "Use the dominant language of the examples; if mixed, use a neutral label or indicate multilingual.",
                "Keep it concise (adapt to language).",
                "Return ONLY the label text."
            ],
            "language_notes": language_notes,
            "cluster_id": cid,
            "examples": snippets
        }
        try:
            resp = client.models.generate_content(model="gemini-2.5-pro", contents=json.dumps(prompt, ensure_ascii=False))
            lab = (resp.text or "").strip().strip('"').strip("'")
            labels[cid] = lab if lab else f"Cluster {cid}"
        except Exception:
            labels[cid] = f"Cluster {cid} (Label Error)"
        time.sleep(0.15)
    return labels

# ----------------------------
# Report
# ----------------------------
def write_report(df: pd.DataFrame, cluster_stats: pd.DataFrame, labels: Dict[int, str],
                 engagement_is_proxy: bool, language_notes: str, paths: Dict[str, str], out_path: str) -> None:
    cs = cluster_stats.copy()
    cs["label"] = cs["cluster"].map(labels)
    top = cs.sort_values(["volume", "engagement"], ascending=False).head(12)

    proxy_note = ""
    if engagement_is_proxy:
        proxy_note = "\n\n> Engagement is a proxy based on volume because no engagement columns were selected.\n"

    md = f"""# Social Intelligence Analyst Report

**Run ID:** `{AGENT_STATE["run_id"]}`  
**Date:** {pd.Timestamp.now().strftime("%Y-%m-%d")}  
**Posts analyzed:** {len(df):,}  

## Language Notes
{language_notes if language_notes else "N/A"}
{proxy_note}

## Thematic Landscape (UMAP)
![UMAP]({Path(paths["umap"]).name})

## Top Clusters
| Cluster | Label | Volume | Engagement |
|:--:|:--|--:|--:|
"""
    for _, r in top.iterrows():
        md += f"| {int(r['cluster'])} | {r['label']} | {int(r['volume']):,} | {int(r['engagement']):,} |\n"

    md += f"""

## Temporal Intelligence
### Volume
![Volume]({Path(paths["volume"]).name})

### Velocity
![Velocity]({Path(paths["velocity"]).name})

### Anomalies
![Anomalies]({Path(paths["anomalies"]).name})

## Strategic Quadrant
![ROI]({Path(paths["roi"]).name})
"""
    safe_write(out_path, md)

# ----------------------------
# Main agent loop (autonomous by default)
# ----------------------------
def main() -> None:
    validate_paths()
    ensure_dirs()

    client = get_gemini_client()

    console.print(Panel(json.dumps({
        "DATA_PATH": AGENT_STATE["DATA_PATH"],
        "OUTPUT_DIR": AGENT_STATE["OUTPUT_DIR"],
        "AUTO_MODE": AGENT_STATE.get("AUTO_MODE", True),
        "run_id": AGENT_STATE["run_id"]
    }, indent=2), title="Config"))

    # Stage: load + profile
    def stage_load():
        df_raw, header_row, delim = robust_read_csv(AGENT_STATE["DATA_PATH"])
        raw_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], "raw_loaded_data.csv")
        df_raw.to_csv(raw_path, index=False)
        add_artifact(raw_path)

        profile_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], "data_profile.txt")
        profile_dataframe(df_raw, profile_path)

        summary = {
            "header_row_detected": header_row,
            "delimiter": delim,
            "shape": df_raw.shape,
            "raw_saved": raw_path,
            "profile_saved": profile_path
        }
        maybe_pause_for_user(client, "Loaded + Profiled", json.dumps(summary, indent=2), force=False)

        if df_raw.empty:
            raise ValueError("Parsed DataFrame is empty. File may be metadata-only or delimiter/header detection failed.")
        return df_raw, profile_path

    df_raw, profile_path = StageRunner("load").run(client, stage_load)

    # Stage: diagnostics
    def stage_diagnostics():
        dt_diag = datetime_parse_diagnostics(df_raw)
        dt_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], "datetime_parse_diagnostics.csv")
        dt_diag.to_csv(dt_path, index=False)
        add_artifact(dt_path)

        txt_diag = basic_textiness_stats(df_raw)
        txt_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], "textiness_diagnostics.csv")
        txt_diag.to_csv(txt_path, index=False)
        add_artifact(txt_path)

        summary = {
            "datetime_diag_saved": dt_path,
            "textiness_diag_saved": txt_path,
            "top_datetime_candidates": dt_diag.head(8).to_dict(orient="records"),
            "top_text_candidates": txt_diag.head(8).to_dict(orient="records"),
        }
        maybe_pause_for_user(client, "Diagnostics computed", json.dumps(summary, indent=2, ensure_ascii=False), force=False)
        return dt_diag, txt_diag

    dt_diag, txt_diag = StageRunner("diagnostics").run(client, stage_diagnostics)

    # Stage: propose schema (LLM) with confidence; pause only if low confidence OR user disabled AUTO_MODE
    def stage_schema():
        # If user overrides already exist, use them
        if AGENT_STATE.get("TEXT_COL") and AGENT_STATE.get("TIME_COLS") is not None:
            proposal = {
                "text_col": AGENT_STATE["TEXT_COL"],
                "time_cols": AGENT_STATE["TIME_COLS"],
                "engagement_cols": AGENT_STATE.get("ENGAGEMENT_COLS", []),
                "language_notes": "",
                "confidence": 1.0,
                "evidence": {"why_text": "User override", "why_time": "User override", "why_engagement": "User override"}
            }
        else:
            proposal = llm_propose_schema(client, df_raw, dt_diag, txt_diag, profile_path)

        prop_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], "schema_proposal.json")
        safe_write(prop_path, json.dumps(proposal, indent=2, ensure_ascii=False))

        conf = float(proposal.get("confidence", 0.0) or 0.0)
        force_pause = (not AGENT_STATE.get("AUTO_MODE", True)) or (
            AGENT_STATE.get("PAUSE_ON_LOW_CONFIDENCE", True) and conf < float(AGENT_STATE.get("LOW_CONF_THRESHOLD", 0.70))
        )

        summary = json.dumps(proposal, indent=2, ensure_ascii=False)
        maybe_pause_for_user(client, "Schema proposal", summary, force=force_pause)

        # Apply overrides if user set them during interjection
        text_col = AGENT_STATE.get("TEXT_COL", proposal.get("text_col"))
        time_cols = AGENT_STATE.get("TIME_COLS", proposal.get("time_cols", []))
        eng_cols = AGENT_STATE.get("ENGAGEMENT_COLS", proposal.get("engagement_cols", []))

        if isinstance(time_cols, str):
            time_cols = [time_cols]
        if eng_cols is None:
            eng_cols = []
        if isinstance(eng_cols, str):
            eng_cols = [eng_cols]

        if not text_col or not time_cols:
            raise ValueError("No text_col or time_cols selected. Interject: 'Use column X as text and Y as time'.")
        return str(text_col), list(time_cols), list(eng_cols)

    text_col, time_cols, eng_cols = StageRunner("schema").run(client, stage_schema)

    # Stage: prepare data
    def stage_prepare():
        df, engagement_is_proxy = prepare_dataframe(df_raw, text_col, time_cols, eng_cols)
        prep_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], "prepared_data.csv")
        df.to_csv(prep_path, index=False)
        add_artifact(prep_path)

        summary = {
            "prepared_shape": df.shape,
            "prepared_saved": prep_path,
            "text_col_used": text_col,
            "time_cols_used": time_cols,
            "engagement_cols_used": eng_cols,
            "engagement_is_proxy": engagement_is_proxy
        }
        maybe_pause_for_user(client, "Prepared dataset", json.dumps(summary, indent=2, ensure_ascii=False), force=False)
        return df, engagement_is_proxy

    df, engagement_is_proxy = StageRunner("prepare").run(client, stage_prepare)

    # Stage: language notes
    def stage_language():
        notes = detect_language_notes(client, df["text"].astype(str).head(60).tolist())
        safe_write(os.path.join(AGENT_STATE["OUTPUT_DIR"], "language_notes.txt"), notes)
        maybe_pause_for_user(client, "Language notes", notes or "(empty)", force=False)
        return notes

    language_notes = StageRunner("language").run(client, stage_language)

    # Stage: embeddings (this can be expensive; in AUTO_MODE we still proceed; you can interject by turning AUTO_MODE off)
    def stage_embeddings():
        X = embed_texts(client, df["text"].tolist(), batch_size=96)
        emb_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], "embeddings.npy")
        np.save(emb_path, X)
        add_artifact(emb_path)
        maybe_pause_for_user(client, "Embeddings complete", f"Embeddings shape: {X.shape}\nSaved: {emb_path}", force=False)
        return X

    X = StageRunner("embeddings").run(client, stage_embeddings)
    Xn = Normalizer(norm="l2").fit_transform(X)

    # Stage: adaptive k
    def stage_k():
        k = adaptive_k_search(client, df, Xn)
        safe_write(os.path.join(AGENT_STATE["OUTPUT_DIR"], "chosen_k.txt"), str(k))
        return int(k)

    k_final = StageRunner("adaptive_k").run(client, stage_k)
    console.print(Panel(f"Chosen k = {k_final}", title="K", style="green"))

    # Stage: final clustering + UMAP
    def stage_cluster_umap():
        km = KMeans(n_clusters=int(k_final), random_state=42, n_init=10)
        clusters = km.fit_predict(Xn)
        centers = km.cluster_centers_

        df2 = df.copy()
        df2["cluster"] = clusters

        reducer = umap.UMAP(n_neighbors=15, min_dist=0.1, n_components=2, random_state=42)
        U = reducer.fit_transform(Xn)
        df2["umap_x"], df2["umap_y"] = U[:, 0], U[:, 1]

        clustered_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], "clustered_data.csv")
        df2.to_csv(clustered_path, index=False)
        add_artifact(clustered_path)

        return df2, centers

    df2, centers = StageRunner("cluster_umap").run(client, stage_cluster_umap)

    # Stage: labels
    def stage_labels():
        labels = label_clusters(client, df2, Xn, centers, int(k_final), language_notes)
        labels_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], "cluster_labels.json")
        safe_write(labels_path, json.dumps(labels, indent=2, ensure_ascii=False))
        return labels

    labels_map = StageRunner("labels").run(client, stage_labels)

    # Stage: charts + report
    def stage_outputs():
        paths = {
            "umap": os.path.join(AGENT_STATE["OUTPUT_DIR"], "umap_clusters.png"),
            "volume": os.path.join(AGENT_STATE["OUTPUT_DIR"], "timeseries_volume.png"),
            "velocity": os.path.join(AGENT_STATE["OUTPUT_DIR"], "velocity.png"),
            "roi": os.path.join(AGENT_STATE["OUTPUT_DIR"], "roi_quadrant.png"),
            "anomalies": os.path.join(AGENT_STATE["OUTPUT_DIR"], "anomalies.png"),
        }

        plot_umap(df2, paths["umap"])
        ts = plot_volume_timeseries(df2, paths["volume"])
        plot_velocity(ts, paths["velocity"])

        cluster_stats = (
            df2.groupby("cluster")
               .agg(volume=("text", "size"), engagement=("engagement", "sum"))
               .reset_index()
               .sort_values("volume", ascending=False)
        )
        cs_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], "cluster_stats.csv")
        cluster_stats.assign(label=cluster_stats["cluster"].map(labels_map)).to_csv(cs_path, index=False)
        add_artifact(cs_path)

        plot_roi(cluster_stats, paths["roi"], engagement_is_proxy=engagement_is_proxy)
        plot_anomalies(ts, paths["anomalies"], window=7, z=2.0)

        report_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], "Social_Intelligence_Report.md")
        write_report(df2, cluster_stats, labels_map, engagement_is_proxy, language_notes, paths, report_path)

        maybe_pause_for_user(
            client,
            "Done",
            "Artifacts:\n" + "\n".join(sorted(AGENT_STATE["artifacts"])),
            force=False
        )

    StageRunner("outputs").run(client, stage_outputs)

    console.print(Panel("Completed successfully.", title="STATUS", style="green"))

# Run
try:
    main()
except SystemExit as e:
    console.print(Panel(str(e), title="Stopped", style="yellow"))
except Exception:
    ensure_dirs()
    err = traceback.format_exc()
    fail_path = os.path.join(AGENT_STATE["OUTPUT_DIR"], "fatal_error.txt")
    safe_write(fail_path, err)
    console.print(Panel(err[-4000:], title="FATAL ERROR", style="red"))
    raise