# 08 - Sentiment Analysis and RAG Chatbot

In [None]:
import os, json, warnings
from pathlib import Path
from typing import Dict, Any

import numpy as np
import pandas as pd

# Backend selection
EMB_BACKEND = os.getenv("EMB_BACKEND", "auto")  # auto|sbert|tfidf
EMB_MODEL   = os.getenv("EMB_MODEL", "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")

DATA = Path("D:/HealthAI Project/data")
OUT  = Path("Models") / "RAG_Model"
OUT.mkdir(parents=True, exist_ok=True)

# UPDATED default CSV path for your dataset
CSV = os.getenv("CHATBOT_CSV", str(DATA / "doctor_consultation_chatbot_multilingual.csv"))

def read_csv_robust(path: str | Path) -> pd.DataFrame:
    """Try common encodings and both engines. Only pass low_memory to the C engine."""
    path = str(path)
    for enc in ["utf-8", "utf-8-sig", "cp1252", "latin1"]:
        # Try C engine with low_memory (fast path)
        try:
            return pd.read_csv(path, encoding=enc, engine="c", low_memory=False)
        except Exception:
            pass
        # Try python engine WITHOUT low_memory (python engine doesn't support it)
        try:
            return pd.read_csv(path, encoding=enc, engine="python")
        except Exception:
            pass
    # Last resort: python engine, ignore encoding errors; DO NOT pass low_memory here
    return pd.read_csv(path, engine="python", encoding_errors="ignore")

def get_corpus(df: pd.DataFrame) -> list[tuple[str, Dict[str, Any]]]:
    """
    Your CSV columns:
      - id, user_query, bot_response, intent, specialty
    We'll index user_query (question). bot_response is the answer.
    """
    cols = {c.lower(): c for c in df.columns}
    q_col = cols.get("user_query") or cols.get("question") or list(df.columns)[0]
    a_col = cols.get("bot_response") or cols.get("answer") or (list(df.columns)[1] if len(df.columns) > 1 else q_col)

    items: list[tuple[str, Dict[str, Any]]] = []
    for i, row in df.iterrows():
        q = str(row.get(q_col, "")).strip()
        a = str(row.get(a_col, "")).strip()
        if not q or not a:
            continue
        meta = {
            "row":        int(i),
            "id":         row.get(cols.get("id", "id"), i),
            "user_query": q,
            "bot_response": a,
            "intent":     str(row.get(cols.get("intent","intent"), "")).strip(),
            "specialty":  str(row.get(cols.get("specialty","specialty"), "")).strip(),
        }
        items.append((q, meta))
    # de-dup on (question, answer)
    seen = set(); dedup = []
    for q, m in items:
        key = (q, m["bot_response"])
        if key in seen: continue
        seen.add(key); dedup.append((q, m))
    return dedup

def build_sbert_matrix(texts: list[str]):
    try:
        from sentence_transformers import SentenceTransformer
        model = SentenceTransformer(EMB_MODEL)
        X = model.encode(texts, show_progress_bar=True, convert_to_numpy=True, normalize_embeddings=True)
        return X, {"backend": "sbert", "model": EMB_MODEL}
    except Exception as e:
        warnings.warn(f"SBERT path failed: {e}")
        return None, None

def build_tfidf_matrix(texts: list[str], dim: int = 384):
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.decomposition import TruncatedSVD
    from sklearn.preprocessing import normalize

    # RELAXED settings to avoid "After pruning, no terms remain"
    tf = TfidfVectorizer(min_df=1, max_df=1.0, ngram_range=(1,2), strip_accents="unicode", sublinear_tf=True)
    X = tf.fit_transform(texts)

    # If features/docs are too small, skip SVD safely
    n_docs, n_feats = X.shape
    if n_feats <= 1 or n_docs <= 2:
        Xr = normalize(X, norm="l2").toarray()
        cfg = {"backend": "tfidf", "dim": int(Xr.shape[1]), "svd": False}
        # still save vectorizer for inference
        import joblib; joblib.dump({"vectorizer": tf, "svd": None}, OUT / "tfidf_svd.joblib")
        return Xr, cfg

    n_comp = min(dim, n_feats - 1, n_docs - 1)
    if n_comp < 2:
        Xr = normalize(X, norm="l2").toarray()
        cfg = {"backend": "tfidf", "dim": int(Xr.shape[1]), "svd": False}
        import joblib; joblib.dump({"vectorizer": tf, "svd": None}, OUT / "tfidf_svd.joblib")
        return Xr, cfg

    svd = TruncatedSVD(n_components=n_comp, random_state=42)
    Xr = svd.fit_transform(X)
    Xr = normalize(Xr)
    cfg = {"backend": "tfidf", "dim": int(Xr.shape[1]), "svd": True}
    import joblib; joblib.dump({"vectorizer": tf, "svd": svd}, OUT / "tfidf_svd.joblib")
    return Xr, cfg

def save_faiss(X: np.ndarray, metas: list[Dict[str, Any]], backend_cfg: Dict[str, Any]):
    import faiss
    d = X.shape[1]
    index = faiss.IndexFlatIP(d)
    index.add(X.astype("float32"))
    faiss.write_index(index, str(OUT / "index.faiss"))

    with open(OUT / "meta.jsonl", "w", encoding="utf-8") as f:
        for m in metas:
            f.write(json.dumps(m, ensure_ascii=False) + "\n")
    with open(OUT / "config.json", "w", encoding="utf-8") as f:
        json.dump({"dim": d, **backend_cfg}, f, indent=2)

def main():
    df = read_csv_robust(CSV)
    recs = get_corpus(df)
    if not recs:
        raise SystemExit("No records to index (check your CSV).")
    texts = [t for t, _ in recs]
    metas = [m for _, m in recs]

    X = None; info = None
    order = ["sbert","tfidf"] if EMB_BACKEND in ("auto","sbert") else ["tfidf"]
    for b in order:
        if b == "sbert":
            X, info = build_sbert_matrix(texts)
            if X is not None: break
        else:
            X, info = build_tfidf_matrix(texts)
            if X is not None: break

    if X is None:
        raise SystemExit("Failed to build embeddings with any backend.")

    save_faiss(X, metas, info)
    print(f"Built RAG index | backend={info.get('backend')} | dim={X.shape[1]} | n={len(texts)}")

if __name__ == "__main__":
    main()


Built RAG index | backend=tfidf | dim=83 | n=84




In [None]:
import json, re
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support, matthews_corrcoef
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.utils.class_weight import compute_class_weight
import joblib

DATA = Path("data") / "patient_feedback.csv"
OUT  = Path("Models") / "Sentiment_Model"
OUT.mkdir(parents=True, exist_ok=True)

SEED=42; TEST_SIZE=0.15; VAL_SPLIT=0.50

def robust_read_csv(path: Path) -> pd.DataFrame:
    for enc in ["utf-8", "utf-8-sig", "cp1252", "latin1"]:
        for eng in ["c", "python"]:
            try:
                return pd.read_csv(path, encoding=enc, engine=eng, low_memory=False)
            except Exception:
                pass
    return pd.read_csv(path, engine="python", encoding_errors="ignore")

def normalize_header(s: str) -> str:
    s = (s if isinstance(s, str) else str(s)).replace("\xa0", " ").strip().lower()
    return re.sub(r"\s+"," ",s)

def pick_col(df: pd.DataFrame, candidates) -> str|None:
    m = {normalize_header(c): c for c in df.columns}
    for cand in candidates:
        n = normalize_header(cand)
        for k, orig in m.items():
            if k == n or k.replace(" ","")==n.replace(" ",""):
                return orig
    for cand in candidates:
        n = normalize_header(cand).replace(" ","")
        for k, orig in m.items():
            if n in k.replace(" ",""):
                return orig
    return None

def load_feedback(path: Path):
    df = robust_read_csv(path)
    for c in df.columns:
        if df[c].dtype==object:
            df[c]=df[c].astype(str).str.replace("\xa0"," ",regex=False)
    text_col  = pick_col(df, ["feedback","review","comment","text"])
    label_col = pick_col(df, ["sentiment label","sentiment","label"])
    rate_col  = pick_col(df, ["ratings","rating","stars","score"])
    if text_col is None: raise ValueError(f"No text column found. Columns: {list(df.columns)}")
    if label_col is None and rate_col is None: raise ValueError("Need a label or ratings column.")
    if label_col is not None:
        lab = df[label_col].astype(str).str.lower().str.strip().replace({
            "neg":"negative","-1":"negative","negative":"negative",
            "pos":"positive","1":"positive","positive":"positive",
            "neu":"neutral","0":"neutral","neutral":"neutral"
        })
        df["label"]=lab.fillna("neutral")
    else:
        def rate2label(x):
            try: r=float(str(x).strip())
            except: return "neutral"
            if r<=2: return "negative"
            if r>=4: return "positive"
            return "neutral"
        df["label"]=df[rate_col].apply(rate2label)
    df[text_col]=df[text_col].astype(str).str.strip()
    df=df[[text_col,"label"]].rename(columns={text_col:"text"}).dropna()
    df=df[df["text"].str.len()>0]
    labels=sorted(df["label"].unique())
    return df, labels

def evaluate(y_true,y_pred):
    prec,rec,f1,_=precision_recall_fscore_support(y_true,y_pred,average="weighted",zero_division=0)
    mcc=matthews_corrcoef(y_true,y_pred); acc=(y_true==y_pred).mean()
    return {"accuracy": float(acc),"precision_w":float(prec),"recall_w":float(rec),"f1_w":float(f1),"mcc":float(mcc)}

def main():
    df, labels = load_feedback(DATA)
    label2id={l:i for i,l in enumerate(labels)}
    id2label={i:l for l,i in label2id.items()}
    df["label_id"]=df["label"].map(label2id)

    tr, te = train_test_split(df, test_size=TEST_SIZE, random_state=SEED, stratify=df["label_id"])
    va, ev = train_test_split(te, test_size=VAL_SPLIT, random_state=SEED, stratify=te["label_id"])

    Xtr, ytr = tr["text"].tolist(), tr["label_id"].to_numpy()
    Xva, yva = va["text"].tolist(), va["label_id"].to_numpy()
    Xev, yev = ev["text"].tolist(), ev["label_id"].to_numpy()

    classes=np.array(sorted(np.unique(ytr)))
    cw=compute_class_weight(class_weight="balanced", classes=classes, y=ytr)
    w={int(c):float(v) for c,v in zip(classes,cw)}

    from sklearn.base import BaseEstimator, ClassifierMixin
    from sklearn.linear_model import SGDClassifier
    # LinearSVC is fine; keeping it simple:
    from sklearn.svm import LinearSVC
    pipe=Pipeline([
        ("tfidf", TfidfVectorizer(analyzer="word", ngram_range=(1,2),
                                  min_df=2, max_df=0.98,
                                  strip_accents="unicode", sublinear_tf=True)),
        ("clf", LinearSVC(class_weight=w, random_state=SEED))
    ])
    pipe.fit(Xtr,ytr)
    print("[val]", evaluate(yva, pipe.predict(Xva)))
    print("[test]", evaluate(yev, pipe.predict(Xev)))

    OUT.mkdir(parents=True, exist_ok=True)
    joblib.dump(pipe, OUT/"sklearn_model.joblib")
    (OUT/"label_map.json").write_text(json.dumps({"labels": labels, "label2id": label2id, "id2label": id2label}, indent=2), encoding="utf-8")
    (OUT/"model_adapter.json").write_text(json.dumps({"type":"sklearn","path":str(OUT/"sklearn_model.joblib")}), encoding="utf-8")
    print(f"Saved → {OUT}")

if __name__=="__main__":
    main()

[val] {'accuracy': 0.88, 'precision_w': 0.8881270903010033, 'recall_w': 0.88, 'f1_w': 0.8825038035209737, 'mcc': 0.7105516893606464}
[test] {'accuracy': 0.92, 'precision_w': 0.9189083820662768, 'recall_w': 0.92, 'f1_w': 0.918609022556391, 'mcc': 0.7906954926148669}
Saved → Models\sentiment
