# Sentiment Model Evaluation

This notebook evaluates the FinBERT sentiment classifier against the targets defined in `docs/metrics_and_evaluation.md`.

## 1. Setup

In [None]:
import os
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch

from transformers import AutoTokenizer
from sklearn.metrics import classification_report as sk_classification_report, precision_recall_curve
from scipy import stats

from backend.utils.config import config
from backend.utils.sentiment_data import (
    load_news_articles,
    load_prices_for_labeling,
    create_labeled_dataset,
    split_sentiment_data,
    SentimentDataset,
)
from backend.utils.sentiment_metrics import (
    compute_all_sentiment_metrics,
    compute_confusion_matrix,
    sentiment_price_correlation,
    compare_to_baseline,
    precision_score as sm_precision,
    recall_score as sm_recall,
    f1_score as sm_f1,
)
from models.sentiment_classifier import FinBERTSentimentClassifier

%matplotlib inline

sns.set_style('whitegrid')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## 2. Load Model and Data

In [None]:
# Load checkpoint
checkpoint_path = os.environ.get('SENTIMENT_CHECKPOINT', 'models/checkpoints/finbert_sentiment_20241118_120000.pth')
if not os.path.exists(checkpoint_path):
    fallback = 'models/checkpoints/best_sentiment.pth'
    print(f"Checkpoint not found at {checkpoint_path}. Falling back to {fallback} (if exists).")
    checkpoint_path = fallback

model, metadata = FinBERTSentimentClassifier.load_checkpoint(checkpoint_path, device=device)

model_name = metadata.get('model_name', getattr(model, 'model_name', config.finbert_model_name))
max_length = int(metadata.get('max_length', config.finbert_max_length))
batch_size = int(metadata.get('batch_size', config.finbert_batch_size))
labeling_strategy = metadata.get('labeling_strategy', 'price_change')
test_start = metadata.get('test_start')
test_end_date = metadata.get('test_end_date')

# Tokenizer aligned with model
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained(model_name)

# Log model details
param_counts = model.count_parameters()
trainable_params = param_counts.get('trainable', sum(p.numel() for p in model.parameters() if p.requires_grad))
print(json.dumps({
    "checkpoint_path": checkpoint_path,
    "model_name": model_name,
    "max_length": max_length,
    "batch_size": batch_size,
    "trainable_params": int(trainable_params),
    "metadata_keys": list(metadata.keys())
}, indent=2))

In [None]:
# Load test data (news and prices) and generate labels
news_dir = str(config.get_raw_data_dir('news'))
prices_dir = str(config.get_raw_data_dir('prices'))

news_df = load_news_articles(news_dir, end_date=test_end_date)
if news_df.empty:
    print("Warning: No news loaded from", news_dir)

prices_df = load_prices_for_labeling(prices_dir)
if prices_df.empty:
    print("Warning: No prices loaded from", prices_dir)

labeled_df = create_labeled_dataset(news_df, price_df=prices_df, labeling_strategy=labeling_strategy)
labeled_df['published_at'] = pd.to_datetime(labeled_df['published_at']).dt.normalize()

# Preserve training-defined test window if available
if test_start:
    test_mask = labeled_df['published_at'] >= pd.to_datetime(test_start)
    if test_end_date:
        test_mask &= labeled_df['published_at'] <= pd.to_datetime(test_end_date)
    test_df = labeled_df[test_mask].copy()
else:
    _, _, test_df = split_sentiment_data(labeled_df)

# Extract inputs and targets
test_texts = test_df['text'].tolist()
test_labels = test_df['label'].astype(int).values

print(f"Test size: {len(test_labels)}")
print("Class distribution (%):")
print(pd.Series(test_labels).value_counts(normalize=True).sort_index().apply(lambda x: round(x * 100, 2)))


In [None]:
# Inference: get predictions, confidence, and per-class probabilities
import numpy as np

def batched_predict_with_probs(model, tokenizer, texts, device, max_length=512, batch_size=32):
    model.eval()
    all_preds, all_confs, all_probs = [], [], []
    for i in range(0, len(texts), batch_size):
        batch_texts = texts[i:i+batch_size]
        enc = tokenizer(
            batch_texts,
            padding=True,
            truncation=True,
            max_length=max_length,
            return_tensors='pt'
        )
        input_ids = enc['input_ids'].to(device)
        attention_mask = enc['attention_mask'].to(device)
        with torch.no_grad():
            logits = model(input_ids, attention_mask)
            probs = torch.softmax(logits, dim=1)
            confs, preds = torch.max(probs, dim=1)
        all_probs.append(probs.cpu().numpy())
        all_confs.append(confs.cpu().numpy())
        all_preds.append(preds.cpu().numpy())
    return np.concatenate(all_preds), np.concatenate(all_confs), np.vstack(all_probs)

y_pred, conf_max, probs_all = batched_predict_with_probs(
    model, tokenizer, test_texts, device, max_length=max_length, batch_size=32
)

# Continuous sentiment score: Positive prob - Negative prob
continuous_sentiment = probs_all[:, 0] - probs_all[:, 2] if probs_all.shape[1] >= 3 else conf_max

pd.DataFrame({
    'text': test_texts[:5],
    'true': test_labels[:5],
    'pred': y_pred[:5],
    'conf': conf_max[:5]
})


## 3. Classification Metrics

Target: Macro F1 >= 0.80

In [None]:
# Classification metrics and threshold checks
prec = sm_precision(test_labels, y_pred)
rec = sm_recall(test_labels, y_pred)
f1d = sm_f1(test_labels, y_pred)

print("Classification report (sklearn):")
print(sk_classification_report(test_labels, y_pred, target_names=['Positive','Neutral','Negative'], zero_division=0))

print("Precision per-class:", prec['per_class'], "macro:", round(prec['macro'], 4))
print("Recall per-class:", rec['per_class'], "macro:", round(rec['macro'], 4))
print("F1 per-class:", f1d['per_class'], "macro:", round(f1d['macro'], 4))

pass_status = (
    f1d['macro'] >= 0.80 and
    all(np.array(prec['per_class']) >= 0.75) and
    all(np.array(rec['per_class']) >= 0.75)
)
print("Pass thresholds (Section 2.1):", pass_status)


## 4. Confusion Matrix

Target: No off-diagonal cell > 15%

In [None]:
# Confusion matrix heatmap
cm_dict = compute_confusion_matrix(test_labels, y_pred)
cm_norm = np.array(cm_dict['normalized_matrix'])

plt.figure(figsize=(6, 5))
sns.heatmap(
    cm_norm,
    annot=True,
    fmt='.2f',
    cmap='Blues',
    xticklabels=['Pos','Neu','Neg'],
    yticklabels=['Pos','Neu','Neg']
)
plt.ylabel('Actual')
plt.xlabel('Predicted')

warning = '' if cm_dict['threshold_passed'] else ' - Warning: Off-diagonal > 15%'
plt.title('Confusion Matrix (normalized)' + warning)
plt.tight_layout()
plt.savefig('sentiment_confusion.png', dpi=300, bbox_inches='tight')
plt.show()

cm_dict


## 5. Sentiment-Price Correlation

Target: |rho| >= 0.15

In [None]:
# Sentiment-Price correlation analysis (daily aggregation)
# Aggregate sentiment by date
sent_df = test_df.copy()
sent_df['date'] = pd.to_datetime(sent_df['published_at']).dt.normalize()
sent_df['continuous_senti'] = continuous_sentiment

daily_sent = sent_df.groupby('date')['continuous_senti'].mean().sort_index()

# Compute equal-weight market daily returns from price data
p = prices_df.copy()
p['date'] = pd.to_datetime(p['Date']).dt.normalize()
p = p.sort_values(['Ticker','date'])
p['ret_1d'] = p.groupby('Ticker')['Close'].pct_change()

mkt_ret = p.groupby('date')['ret_1d'].mean().dropna().sort_index()

# Align by date index
aligned = pd.concat([daily_sent, mkt_ret], axis=1, join='inner').dropna()
aligned.columns = ['sent', 'ret']

lags = [1, 2, 3, 5, 7]
price_corr = sentiment_price_correlation(aligned['sent'], aligned['ret'], lags=lags)

# Plot correlations by lag
lag_vals = []
corr_vals = []
pvals = []
for l in lags:
    key = f'lag_{l}'
    res = price_corr.get(key, {'correlation': 0.0, 'p_value': 1.0})
    lag_vals.append(l)
    corr_vals.append(res['correlation'])
    pvals.append(res['p_value'])

plt.figure(figsize=(6,4))
bar = plt.bar(lag_vals, corr_vals, color=['#4e79a7' if abs(c) >= 0.15 and p < 0.05 else '#a0cbe8' for c,p in zip(corr_vals,pvals)])
plt.axhline(0, color='gray', linewidth=0.8)
plt.title('Sentiment vs Forward Returns Correlation by Lag (daily, eq-weight)')
plt.xlabel('Forward lag (days)')
plt.ylabel('Pearson correlation')
for i, (c, p) in enumerate(zip(corr_vals, pvals)):
    plt.text(lag_vals[i], c + np.sign(c)*0.01, f"p={p:.3f}", ha='center', va='bottom' if c>=0 else 'top', fontsize=8)
plt.tight_layout()
plt.savefig('sentiment_price_correlation.png', dpi=300, bbox_inches='tight')
plt.show()

significant_lags = [l for l, c, p in zip(lag_vals, corr_vals, pvals) if abs(c) >= 0.15 and p < 0.05]
print('Significant lags (|rho|>=0.15, p<0.05):', significant_lags)

price_corr


## 6. Baseline Comparison

VADER/TextBlob baseline vs model; report improvement and McNemar test p-value (if available).


In [None]:
# Baseline comparison (VADER/TextBlob if available, else simple lexicon)
try:
    from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
    analyzer = SentimentIntensityAnalyzer()
    baseline_scores = [analyzer.polarity_scores(t)['compound'] for t in test_texts]
    def score_to_class(x):
        return 0 if x >= 0.05 else (2 if x <= -0.05 else 1)
    baseline_pred = np.array([score_to_class(s) for s in baseline_scores], dtype=int)
    baseline_name = 'VADER'
except Exception:
    try:
        from textblob import TextBlob
        def tb_score(text):
            return TextBlob(text).sentiment.polarity
        baseline_scores = [tb_score(t) for t in test_texts]
        def score_to_class(x):
            return 0 if x >= 0.05 else (2 if x <= -0.05 else 1)
        baseline_pred = np.array([score_to_class(s) for s in baseline_scores], dtype=int)
        baseline_name = 'TextBlob'
    except Exception:
        # Simple lexicon fallback
        pos_words = {'beat','surge','gain','strong','growth','bullish','upgrade','outperform'}
        neg_words = {'miss','fall','loss','weak','decline','bearish','downgrade','underperform'}
        def lex_score(text):
            tokens = str(text).lower().split()
            pos = sum(w in pos_words for w in tokens)
            neg = sum(w in neg_words for w in tokens)
            return (pos - neg) / max(1, (pos + neg))
        baseline_scores = [lex_score(t) for t in test_texts]
        def score_to_class(x):
            return 0 if x > 0 else (2 if x < 0 else 1)
        baseline_pred = np.array([score_to_class(s) for s in baseline_scores], dtype=int)
        baseline_name = 'Lexicon'

comp = compare_to_baseline(test_labels, y_pred, baseline_pred, metric='f1')
print({'baseline': baseline_name, **comp})

# p_value and test_stat are now included in comp from compare_to_baseline()
print(f"McNemar's test: statistic={comp['test_stat']:.4f}, p-value={comp['p_value']:.4f}")


## 7. Error Analysis

Inspect common failure modes and sample misclassifications.
