In [None]:
import re
import math
import pandas as pd
import numpy as np

from transformers import pipeline
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler

In [None]:
MODEL_NAME = "ProsusAI/finbert" 
TEXT_COL = "content"  
OUTPUT_CSV = "zerodha_filtered.csv"

FEW_SHOT_EXAMPLES = [
    ("SIP executed successfully, portfolio performing well.", 1),
    ("Bought more shares today, happy with returns.", 1),
    ("Slight dip in my portfolio, not worried yet.", 3),
    ("SIP failed this month and I'm a bit worried.", 4),
    ("Orders failing repeatedly, I'm losing confidence.", 6),
    ("Sold at a loss due to app issues, very upset.", 8),
    ("Margin call today, I'm extremely worried and panicking.", 9),
    ("APP CRASHED while placing order!! Lost a lot!!!", 10),
    ("Huge 20% loss in portfolio and can't login to sell.", 10),
    ("Brokerage is high but not urgent.", 3),
]

STRONG_STRESS_KW = [
    "panic", "panic-sell", "sold everything", "sold all", "margin call",
    "lost all", "lost everything", "urgent", "help immediately", "call support",
    "sold at loss", "sold at a loss", "panic sell", "margincall"
]

# Helpers: feature extraction
PERCENT_RE = re.compile(r"\b(-?\d{1,3}(?:[.,]\d+)?)\s?%")
RUPEE_RE = re.compile(r"â‚¹\s?\d[\d,]*(?:\.\d+)?|\b(?:rs\.?|inr)\s?\d[\d,]*(?:\.\d+)?\b", re.I)
EXCLAM_RE = re.compile(r"!")
ALLCAP_WORD_RE = re.compile(r"\b[A-Z]{2,}\b")

In [None]:
def extract_features(text: str):
    t = "" if text is None else str(text)
    features = {}
    features["excl_count"] = len(EXCLAM_RE.findall(t))
    allcaps = ALLCAP_WORD_RE.findall(t)
    features["allcaps_count"] = len(allcaps)
    words = re.findall(r"\b\w+\b", t)
    features["allcaps_ratio"] = (len(allcaps) / len(words)) if words else 0.0
    perc = PERCENT_RE.findall(t)
    perc_vals = []
    for p in perc:
        try:
            p_clean = float(p.replace(",", ""))
            perc_vals.append(abs(p_clean))
        except Exception:
            pass
    features["max_percent"] = max(perc_vals) if perc_vals else 0.0
    features["rupee_flag"] = 1 if RUPEE_RE.search(t) else 0
    s = t.lower()
    features["strong_kw_count"] = sum(1 for kw in STRONG_STRESS_KW if kw in s)
    features["char_len"] = len(t)
    features["q_count"] = t.count("?")
    features["dot_count"] = t.count(".")
    return features

In [None]:
# Main function (apply to a DataFrame `finance_rows`)
def label_with_finbert(finance_rows: pd.DataFrame,
                       text_col: str = TEXT_COL,
                       model_name: str = MODEL_NAME,
                       few_shot_examples=FEW_SHOT_EXAMPLES,
                       output_csv: str = OUTPUT_CSV):
    print("Loading FinBERT model:", model_name)
    sentiment_pipe = pipeline("sentiment-analysis", model=model_name, tokenizer=model_name, return_all_scores=True)

    texts = finance_rows[text_col].astype(str).fillna("").tolist()
    print(f"Running sentiment model on {len(texts)} rows (this may take a while)...")
    all_scores = sentiment_pipe(texts, truncation=True)  # returns list of list of label/prob dicts

    pos_probs, neg_probs, neu_probs = [], [], []
    for scores in all_scores:
        label_score_map = {d["label"].lower(): d["score"] for d in scores}
        pos = 0.0; neg = 0.0; neu = 0.0
        for k, v in label_score_map.items():
            if "pos" in k:
                pos += v
            elif "neg" in k:
                neg += v
            elif "neu" in k:
                neu += v
            else:
                pass
        if pos + neg + neu == 0:
            items = sorted(label_score_map.items(), key=lambda x: x[1], reverse=True)
            if items:
                top_label = items[0][0]
                if "0" in top_label and len(items) == 2:
                    neg = items[0][1]
                    pos = items[1][1] if len(items) > 1 else 0.0
                else:
                    pos = items[0][1]
                    neg = items[1][1] if len(items) > 1 else 0.0
        pos_probs.append(pos)
        neg_probs.append(neg)
        neu_probs.append(neu)

    polarity = ["negative" if neg_probs[i] >= 0.5 else "positive" for i in range(len(texts))]

    feat_list = [extract_features(t) for t in texts]
    feat_df = pd.DataFrame(feat_list).fillna(0.0)
    feat_df["finbert_pos"] = pos_probs
    feat_df["finbert_neg"] = neg_probs
    feat_df["finbert_neu"] = neu_probs

    fs_texts = [t for t, score in few_shot_examples]
    fs_scores = np.array([s for t, s in few_shot_examples], dtype=float)
    fs_sent = sentiment_pipe(fs_texts, truncation=True)
    fs_pos, fs_neg, fs_neu = [], [], []
    for scores in fs_sent:
        m = {d["label"].lower(): d["score"] for d in scores}
        p = sum(v for k, v in m.items() if "pos" in k)
        n = sum(v for k, v in m.items() if "neg" in k)
        neu = sum(v for k, v in m.items() if "neu" in k)
        if p + n + neu == 0:
            items = sorted(m.items(), key=lambda x: x[1], reverse=True)
            p = items[0][1] if items else 0.0
        fs_pos.append(p); fs_neg.append(n); fs_neu.append(neu)

    fs_feats = [extract_features(t) for t in fs_texts]
    fs_df = pd.DataFrame(fs_feats).fillna(0.0)
    fs_df["finbert_pos"] = fs_pos
    fs_df["finbert_neg"] = fs_neg
    fs_df["finbert_neu"] = fs_neu

    X_train = fs_df.values
    y_train = fs_scores
    scaler = StandardScaler()
    Xs = scaler.fit_transform(X_train)
    model = Ridge(alpha=1.0)
    model.fit(Xs, y_train)
    X_all = feat_df.values
    X_all_s = scaler.transform(X_all)
    y_pred = model.predict(X_all_s)
    y_pred_clipped = np.clip(np.round(y_pred).astype(int), 1, 10)

    out = finance_rows.copy().reset_index(drop=True)
    out["polarity"] = polarity
    out["finbert_pos_prob"] = pos_probs
    out["finbert_neg_prob"] = neg_probs
    out["finbert_neu_prob"] = neu_probs

    out["excl_count"] = feat_df["excl_count"]
    out["allcaps_ratio"] = feat_df["allcaps_ratio"]
    out["max_percent"] = feat_df["max_percent"]
    out["rupee_flag"] = feat_df["rupee_flag"]
    out["strong_kw_count"] = feat_df["strong_kw_count"]
    out["stress_score_1_10"] = y_pred_clipped
    out.to_csv(output_csv, index=False)
    print(f"Saved labeled data to {output_csv} ({len(out)} rows).")
    return out


In [None]:
# labeled_df = label_with_finbert(finance_rows, text_col="content", model_name=MODEL_NAME)
