# Financial Sentiment Analysis with FinBERT

This notebook trains a custom binary sentiment classifier using the ProsusAI/finbert base model.
- **Model**: ProsusAI/finbert (base model, pre-trained on financial text)
- **Task**: Binary classification (positive vs negative sentiment)
- **Dataset**: Stock market tweets


## Table of Contents

1. **Imports** - All required libraries
2. **Data Preprocessing** - Text cleaning and label conversion
3. **Load and Explore Data** - Dataset loading and exploration
4. **Custom Dataset Class** - PyTorch dataset implementation
5. **Model Architecture** - FinBERT backbone + custom binary head
6. **Training Utilities** - Training loop, metrics, plotting functions
7. **Initialize Model and Data** - Setup for manual training (optional)
8. **Training Strategy** - Two-phase training approach (optional manual run)
9. **Complete Training & Testing Pipeline** - `train_and_evaluate()` function
10. **Test Function** - Comprehensive testing with confusion matrix
11. **Inference Function** - Predict sentiment on custom text
12. **Example Predictions** - Test the trained model

---

## 1. Imports


In [2]:
import re
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
from torch.optim import AdamW
from transformers import AutoTokenizer, AutoModel
from sklearn.metrics import f1_score, accuracy_score, confusion_matrix, classification_report
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np


## 2. Data Preprocessing


In [3]:
def clean_text(text):
    """Clean and normalize text for sentiment analysis."""
    text = str(text)
    text = re.sub(r"http\S+", "", text)                      # Remove URLs
    text = re.sub(r"@\w+", "", text)                          # Remove mentions
    text = re.sub(r"^user:\s*", "", text, flags=re.IGNORECASE)  # Remove 'user:' prefix
    text = re.sub(r"^user\s*", "", text, flags=re.IGNORECASE)   # Remove 'user' prefix
    text = re.sub(r"[\"]+", "", text)                        # Remove quotes
    text = re.sub(r"\s+", " ", text).strip()                 # Normalize whitespace
    return text

def labels_zero_one(y: int) -> int:
    """Convert sentiment labels from {-1, 1} to {0, 1}."""
    # input: -1 (negative) or 1 (positive)
    # output: 0 (negative) or 1 (positive)
    return 1 if int(y) == 1 else 0


## 3. Load and Explore Data


In [None]:
# Load the dataset
df = pd.read_csv("data/stock_data.csv")
df["Text"] = df["Text"].astype(str).apply(clean_text)

print(f"Dataset size: {len(df)} samples")
print(f"\nOriginal Sentiment distribution:")
print(df["Sentiment"].value_counts())
print(f"\nClass distribution:")
pos_count = (df["Sentiment"] == 1).sum()
neg_count = (df["Sentiment"] == -1).sum()
print(f"  Positive (1):  {pos_count} ({pos_count/len(df)*100:.1f}%)")
print(f"  Negative (-1): {neg_count} ({neg_count/len(df)*100:.1f}%)")
print(f"  Imbalance ratio: {pos_count/neg_count:.2f}:1")
print(f"\nFirst few samples:")
print(df.head())


Dataset size: 5791 samples

Sentiment distribution:
Sentiment
 1    3685
-1    2106
Name: count, dtype: int64

First few samples:
                                                Text  Sentiment
0  Kickers on my watchlist XIDE TIT SOQ PNK CPW B...          1
1  AAP MOVIE. 55% return for the FEA/GEED indicat...          1
2  I'd be afraid to short AMZN - they are looking...          1
3                                    MNTA Over 12.00          1
4                                      OI Over 21.37          1


### 3.1 Handle Class Imbalance


In [5]:
def balance_dataset(df, method='undersample', random_state=42):
    """
    Balance dataset by handling class imbalance.
    
    Args:
        df: DataFrame with 'Text' and 'Sentiment' columns
        method: 'undersample' (remove majority), 'oversample' (duplicate minority), or 'none'
        random_state: Random seed for reproducibility
    
    Returns:
        Balanced DataFrame
    """
    pos_df = df[df["Sentiment"] == 1]
    neg_df = df[df["Sentiment"] == -1]
    
    if method == 'undersample':
        # Undersample majority class to match minority
        min_count = min(len(pos_df), len(neg_df))
        pos_balanced = pos_df.sample(n=min_count, random_state=random_state)
        neg_balanced = neg_df.sample(n=min_count, random_state=random_state)
        
    elif method == 'oversample':
        # Oversample minority class to match majority
        max_count = max(len(pos_df), len(neg_df))
        pos_balanced = pos_df.sample(n=max_count, replace=True, random_state=random_state)
        neg_balanced = neg_df.sample(n=max_count, replace=True, random_state=random_state)
        
    elif method == 'none':
        return df
    
    else:
        raise ValueError(f"Unknown method: {method}. Use 'undersample', 'oversample', or 'none'")
    
    # Combine and shuffle
    balanced_df = pd.concat([pos_balanced, neg_balanced], ignore_index=True)
    balanced_df = balanced_df.sample(frac=1, random_state=random_state).reset_index(drop=True)
    
    return balanced_df


def compute_class_weights(df):
    """
    Compute class weights for imbalanced dataset.
    Useful for weighted loss during training.
    
    Args:
        df: DataFrame with 'Sentiment' column
    
    Returns:
        pos_weight: Weight for positive class (as torch tensor)
    """
    pos_count = (df["Sentiment"] == 1).sum()
    neg_count = (df["Sentiment"] == -1).sum()
    
    # Weight for positive class (how much to emphasize minority class)
    pos_weight = neg_count / pos_count
    
    return torch.tensor([pos_weight], dtype=torch.float32)


In [8]:
# Balance the dataset using undersampling (removing excess positive samples)
df_balanced = balance_dataset(df, method='undersample', random_state=42)

print("Balanced Sentiment distribution:")
print(df_balanced["Sentiment"].value_counts())

# Save balanced dataset
df_balanced.to_csv("data/stock_data_balanced.csv", index=False)


Balanced Sentiment distribution:
Sentiment
 1    2106
-1    2106
Name: count, dtype: int64


### Options for Handling Imbalanced Data

There are multiple strategies to handle class imbalance:

1. **Undersampling (Used Above)** ✅
   - Randomly removes samples from majority class to match minority
   - Pros: Fast, prevents model from being biased towards majority
   - Cons: Loses data (went from 5,791 → 4,212 samples)

2. **Oversampling** 
   - Duplicates minority class samples to match majority
   - Pros: Keeps all original data
   - Cons: Risk of overfitting on duplicated samples
   - To use: `balance_dataset(df, method='oversample')`

3. **Class Weights** ⚖️
   - Uses weighted loss to penalize errors on minority class more
   - Pros: Keeps all data, no duplication
   - Cons: Needs tuning
   - To use: Set `use_class_weights=True` in `train_and_evaluate()`

**Current strategy:** Undersampling (balanced dataset with 2,106 samples per class)


## 4. Custom Dataset Class


In [9]:
class FinancialSentimentDataset(Dataset):
    """PyTorch Dataset for financial sentiment analysis."""
    
    def __init__(self, csvPath: Path, tokenizer, max_len: int = 128):
        csvPath = Path(csvPath)
        if not csvPath.exists():
            raise ValueError(f"CSV not found: {csvPath}")

        df = pd.read_csv(csvPath)
        # Expect columns: Text, Sentiment
        self.texts = [clean_text(t) for t in df["Text"].astype(str).tolist()]
        self.labels = [labels_zero_one(y) for y in df["Sentiment"].tolist()]
        self.tokenizer = tokenizer
        self.maxLen = max_len
    
    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]

        encoding = self.tokenizer(
            text,
            truncation=True,          # Cut off long texts
            padding='max_length',     # Pad shorter texts
            max_length=self.maxLen,
            return_tensors='pt'       # PyTorch tensors
        )

        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            "label": torch.tensor(label, dtype=torch.long)
        }


## 5. Model Architecture

We use a custom binary classification head on top of the FinBERT base model.


In [10]:
class FinbertBackbone(nn.Module):
    """FinBERT encoder backbone."""
    
    def __init__(self, modelName: str = "ProsusAI/finbert"):
        super().__init__()
        self.encoder = AutoModel.from_pretrained(modelName)
        self.hiddenSize = self.encoder.config.hidden_size  # 768 for BERT-base

    def forward(self, input_ids, attention_mask):
        out = self.encoder(input_ids=input_ids, attention_mask=attention_mask, return_dict=True)
        cls = out.last_hidden_state[:, 0]  # Extract [CLS] token embedding
        return cls  # [batch_size, hidden_size]


class BinaryHead(nn.Module):
    """Binary classification head."""
    
    def __init__(self, inFeatures: int, pDrop: float = 0.1):
        super().__init__()
        self.dropout = nn.Dropout(pDrop)
        self.fc = nn.Linear(inFeatures, 1)  # Single logit for binary classification

    def forward(self, x):
        x = self.dropout(x)
        logits = self.fc(x).squeeze(-1)  # [batch_size]
        return logits


class FinbertBinaryClf(nn.Module):
    """Complete model: FinBERT backbone + custom binary classification head."""
    
    def __init__(self, modelName: str = "ProsusAI/finbert", pDrop: float = 0.1):
        super().__init__()
        self.backbone = FinbertBackbone(modelName)
        self.head = BinaryHead(self.backbone.hiddenSize, pDrop)

    def forward(self, input_ids, attention_mask):
        feats = self.backbone(input_ids, attention_mask)
        logits = self.head(feats)
        return logits


In [11]:
def getLoaders(csvPath, tokenizer, maxLen=128, batchSize=8, valFrac=0.2, seed=42):
    """Create train and validation data loaders."""
    ds = FinancialSentimentDataset(csvPath, tokenizer, maxLen)
    valLen = int(len(ds) * valFrac)
    trainLen = len(ds) - valLen
    gen = torch.Generator().manual_seed(seed)
    trainDs, valDs = random_split(ds, [trainLen, valLen], generator=gen)
    return (
        DataLoader(trainDs, batch_size=batchSize, shuffle=True),
        DataLoader(valDs, batch_size=batchSize, shuffle=False),
    )


def step(model, batch, device, posWeight=None, train=True, optimizer=None, clip=1.0):
    """Perform one training/validation step."""
    input_ids = batch["input_ids"].to(device)
    attention_mask = batch["attention_mask"].to(device)
    labels = batch["label"].float().to(device)  # 0/1 as float for BCE

    logits = model(input_ids, attention_mask)
    
    # Compute loss with optional class weighting
    if posWeight is None:
        loss = F.binary_cross_entropy_with_logits(logits, labels)
    else:
        bce = nn.BCEWithLogitsLoss(pos_weight=posWeight)
        loss = bce(logits, labels)

    # Backward pass and optimization (only in training mode)
    if train:
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

    # Get predictions
    probs = torch.sigmoid(logits).detach().cpu().numpy()
    preds = (probs >= 0.5).astype(int)
    y_true = labels.detach().cpu().numpy().astype(int)
    
    return loss.item(), preds, y_true


def runEpoch(model, loader, device, train, optimizer=None, posWeight=None):
    """Run one epoch of training or validation."""
    model.train(train)
    losses, allPreds, allTrue = [], [], []
    
    for batch in loader:
        loss, preds, y_true = step(model, batch, device, posWeight, train, optimizer)
        losses.append(loss)
        allPreds.extend(preds.tolist())
        allTrue.extend(y_true.tolist())
    
    acc = accuracy_score(allTrue, allPreds)
    f1 = f1_score(allTrue, allPreds)
    avg_loss = float(sum(losses) / max(1, len(losses)))
    
    return avg_loss, acc, f1


In [12]:
def plot_training_history(history):
    """Plot training and validation metrics over epochs."""
    fig, axes = plt.subplots(1, 3, figsize=(18, 5))
    
    # Plot Loss
    axes[0].plot(history['train_loss'], 'b-', label='Train Loss', linewidth=2)
    axes[0].plot(history['val_loss'], 'r-', label='Val Loss', linewidth=2)
    axes[0].set_xlabel('Epoch', fontsize=12)
    axes[0].set_ylabel('Loss', fontsize=12)
    axes[0].set_title('Training and Validation Loss', fontsize=14, fontweight='bold')
    axes[0].legend(fontsize=10)
    axes[0].grid(True, alpha=0.3)
    
    # Plot Accuracy
    axes[1].plot(history['train_acc'], 'b-', label='Train Accuracy', linewidth=2)
    axes[1].plot(history['val_acc'], 'r-', label='Val Accuracy', linewidth=2)
    axes[1].set_xlabel('Epoch', fontsize=12)
    axes[1].set_ylabel('Accuracy', fontsize=12)
    axes[1].set_title('Training and Validation Accuracy', fontsize=14, fontweight='bold')
    axes[1].legend(fontsize=10)
    axes[1].grid(True, alpha=0.3)
    
    # Plot F1 Score
    axes[2].plot(history['train_f1'], 'b-', label='Train F1', linewidth=2)
    axes[2].plot(history['val_f1'], 'r-', label='Val F1', linewidth=2)
    axes[2].set_xlabel('Epoch', fontsize=12)
    axes[2].set_ylabel('F1 Score', fontsize=12)
    axes[2].set_title('Training and Validation F1 Score', fontsize=14, fontweight='bold')
    axes[2].legend(fontsize=10)
    axes[2].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.savefig('training_history.png', dpi=150, bbox_inches='tight')
    plt.show()
    

def evaluate_model(model, loader, device):
    """Evaluate model and return detailed metrics."""
    model.eval()
    all_preds, all_true, all_probs = [], [], []
    total_loss = 0.0
    
    with torch.no_grad():
        for batch in loader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["label"].float().to(device)
            
            logits = model(input_ids, attention_mask)
            loss = F.binary_cross_entropy_with_logits(logits, labels)
            total_loss += loss.item()
            
            probs = torch.sigmoid(logits).cpu().numpy()
            preds = (probs >= 0.5).astype(int)
            
            all_probs.extend(probs.tolist())
            all_preds.extend(preds.tolist())
            all_true.extend(labels.cpu().numpy().astype(int).tolist())
    
    avg_loss = total_loss / len(loader)
    acc = accuracy_score(all_true, all_preds)
    f1 = f1_score(all_true, all_preds)
    
    return avg_loss, acc, f1, all_preds, all_true, all_probs


In [13]:
# Configuration
CSV_PATH = "data/stock_data.csv"
MODEL_NAME = "ProsusAI/finbert"  # Base FinBERT model
MAX_LEN = 128
BATCH_SIZE = 8
VAL_FRACTION = 0.2
DROPOUT = 0.1
SEED = 42

# Initialize tokenizer
print(f"Loading tokenizer from {MODEL_NAME}...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

# Create data loaders
print(f"Creating data loaders...")
trainLoader, valLoader = getLoaders(
    CSV_PATH, 
    tokenizer, 
    maxLen=MAX_LEN, 
    batchSize=BATCH_SIZE,
    valFrac=VAL_FRACTION,
    seed=SEED
)

print(f"Train batches: {len(trainLoader)}")
print(f"Validation batches: {len(valLoader)}")

# Initialize model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"\nUsing device: {device}")

print(f"\nLoading model from {MODEL_NAME}...")
model = FinbertBinaryClf(MODEL_NAME, pDrop=DROPOUT).to(device)
print(f"Model loaded successfully!")


Loading tokenizer from ProsusAI/finbert...


tokenizer_config.json:   0%|          | 0.00/252 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


config.json:   0%|          | 0.00/758 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

Creating data loaders...
Train batches: 580
Validation batches: 145

Using device: cpu

Loading model from ProsusAI/finbert...


Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


pytorch_model.bin:   0%|          | 0.00/438M [00:00<?, ?B/s]

Model loaded successfully!


Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

## 8. Training Strategy: Two-Phase Approach

### Phase 1: Warmup (Freeze Encoder)
Train only the custom classification head while keeping the encoder frozen.

### Phase 2: Fine-tuning (Unfreeze Encoder)
Fine-tune the entire model with a lower learning rate.


In [14]:
# PHASE 1: WARMUP - Train only the head
print("="*60)
print("PHASE 1: Warmup - Training classification head only")
print("="*60)

# Initialize history tracking
history = {
    'train_loss': [],
    'train_acc': [],
    'train_f1': [],
    'val_loss': [],
    'val_acc': [],
    'val_f1': []
}

# Freeze encoder parameters
for p in model.backbone.parameters():
    p.requires_grad = False

# Optimizer for head only
optimizer = AdamW(
    filter(lambda p: p.requires_grad, model.parameters()), 
    lr=2e-4, 
    weight_decay=0.01
)

# Warmup training
WARMUP_EPOCHS = 1
for epoch in range(WARMUP_EPOCHS):
    trLoss, trAcc, trF1 = runEpoch(model, trainLoader, device, train=True, optimizer=optimizer)
    vaLoss, vaAcc, vaF1 = runEpoch(model, valLoader, device, train=False)
    
    # Track metrics
    history['train_loss'].append(trLoss)
    history['train_acc'].append(trAcc)
    history['train_f1'].append(trF1)
    history['val_loss'].append(vaLoss)
    history['val_acc'].append(vaAcc)
    history['val_f1'].append(vaF1)
    
    print(f"[Warmup Epoch {epoch}]")
    print(f"  Train: loss={trLoss:.4f}, acc={trAcc:.3f}, f1={trF1:.3f}")
    print(f"  Val:   loss={vaLoss:.4f}, acc={vaAcc:.3f}, f1={vaF1:.3f}")


PHASE 1: Warmup - Training classification head only
[Warmup Epoch 0]
  Train: loss=0.5761, acc=0.698, f1=0.790
  Val:   loss=0.5980, acc=0.708, f1=0.786


In [None]:
# PHASE 2: FINE-TUNING - Train entire model
print("\n" + "="*60)
print("PHASE 2: Fine-tuning - Training entire model")
print("="*60)

# Unfreeze all encoder parameters
for p in model.backbone.parameters():
    p.requires_grad = True

# New optimizer with lower learning rate for entire model
optimizer = AdamW(model.parameters(), lr=2e-5, weight_decay=0.01)

# Main training
TRAIN_EPOCHS = 3
best_f1 = 0.0

for epoch in range(TRAIN_EPOCHS):
    trLoss, trAcc, trF1 = runEpoch(model, trainLoader, device, train=True, optimizer=optimizer)
    vaLoss, vaAcc, vaF1 = runEpoch(model, valLoader, device, train=False)
    
    # Track metrics
    history['train_loss'].append(trLoss)
    history['train_acc'].append(trAcc)
    history['train_f1'].append(trF1)
    history['val_loss'].append(vaLoss)
    history['val_acc'].append(vaAcc)
    history['val_f1'].append(vaF1)
    
    print(f"[Epoch {epoch}]")
    print(f"  Train: loss={trLoss:.4f}, acc={trAcc:.3f}, f1={trF1:.3f}")
    print(f"  Val:   loss={vaLoss:.4f}, acc={vaAcc:.3f}, f1={vaF1:.3f}")
    
    # Save best model
    if vaF1 > best_f1:
        best_f1 = vaF1
        torch.save(model.state_dict(), "finbert_custom_head_best.pt")
        print(f"  → New best model saved! (F1={best_f1:.3f})")

# Save final model
torch.save(model.state_dict(), "finbert_custom_head_final.pt")
print(f"\n✓ Training complete! Best validation F1: {best_f1:.3f}")

# Plot training history
print("\nGenerating training plots...")
plot_training_history(history)


## 9. Complete Training & Testing Pipeline


In [None]:
def train_and_evaluate(
    csv_path, 
    model_name="ProsusAI/finbert",
    max_len=128,
    batch_size=8,
    val_frac=0.2,
    warmup_epochs=1,
    train_epochs=3,
    warmup_lr=2e-4,
    train_lr=2e-5,
    dropout=0.1,
    seed=42,
    save_best=True,
    use_class_weights=False
):
    """
    Complete training and evaluation pipeline.
    
    Args:
        csv_path: Path to CSV data file
        model_name: HuggingFace model name
        max_len: Maximum sequence length
        batch_size: Training batch size
        val_frac: Validation fraction
        warmup_epochs: Number of warmup epochs (head only)
        train_epochs: Number of fine-tuning epochs
        warmup_lr: Learning rate for warmup phase
        train_lr: Learning rate for fine-tuning phase
        dropout: Dropout probability
        seed: Random seed
        save_best: Whether to save best model
        use_class_weights: Use weighted loss for imbalanced classes (set to False if using balanced data)
    
    Returns:
        model: Trained model
        history: Training history dictionary
        test_results: Test evaluation results
    """
    
    # 1. Setup
    print("🚀 STARTING TRAINING PIPELINE")
    print("="*80)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Device: {device}")
    
    # 2. Load tokenizer and data
    print(f"\n📚 Loading data and tokenizer...")
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    trainLoader, valLoader = getLoaders(
        csv_path, tokenizer, max_len, batch_size, val_frac, seed
    )
    print(f"  Train batches: {len(trainLoader)}")
    print(f"  Val batches: {len(valLoader)}")
    
    # 3. Check for class weights
    pos_weight = None
    if use_class_weights:
        df_temp = pd.read_csv(csv_path)
        pos_weight = compute_class_weights(df_temp).to(device)
        print(f"  ⚖️  Using class weights: pos_weight={pos_weight.item():.3f}")
    else:
        print(f"  ℹ️  Not using class weights (assuming balanced data)")
    
    # 3. Initialize model
    print(f"\n🤖 Initializing model: {model_name}")
    model = FinbertBinaryClf(model_name, pDrop=dropout).to(device)
    
    # 4. Track metrics
    history = {
        'train_loss': [], 'train_acc': [], 'train_f1': [],
        'val_loss': [], 'val_acc': [], 'val_f1': []
    }
    
    # 5. PHASE 1: Warmup
    print(f"\n{'='*80}")
    print(f"PHASE 1: Warmup Training (Head Only) - {warmup_epochs} epoch(s)")
    print(f"{'='*80}")
    
    for p in model.backbone.parameters():
        p.requires_grad = False
    
    optimizer = AdamW(
        filter(lambda p: p.requires_grad, model.parameters()), 
        lr=warmup_lr, weight_decay=0.01
    )
    
    for epoch in range(warmup_epochs):
        trLoss, trAcc, trF1 = runEpoch(model, trainLoader, device, train=True, optimizer=optimizer, posWeight=pos_weight)
        vaLoss, vaAcc, vaF1 = runEpoch(model, valLoader, device, train=False, posWeight=pos_weight)
        
        history['train_loss'].append(trLoss)
        history['train_acc'].append(trAcc)
        history['train_f1'].append(trF1)
        history['val_loss'].append(vaLoss)
        history['val_acc'].append(vaAcc)
        history['val_f1'].append(vaF1)
        
        print(f"[Warmup {epoch}] Train: loss={trLoss:.4f} acc={trAcc:.3f} f1={trF1:.3f} | "
              f"Val: loss={vaLoss:.4f} acc={vaAcc:.3f} f1={vaF1:.3f}")
    
    # 6. PHASE 2: Fine-tuning
    print(f"\n{'='*80}")
    print(f"PHASE 2: Fine-tuning (Full Model) - {train_epochs} epoch(s)")
    print(f"{'='*80}")
    
    for p in model.backbone.parameters():
        p.requires_grad = True
    
    optimizer = AdamW(model.parameters(), lr=train_lr, weight_decay=0.01)
    best_f1 = 0.0
    
    for epoch in range(train_epochs):
        trLoss, trAcc, trF1 = runEpoch(model, trainLoader, device, train=True, optimizer=optimizer, posWeight=pos_weight)
        vaLoss, vaAcc, vaF1 = runEpoch(model, valLoader, device, train=False, posWeight=pos_weight)
        
        history['train_loss'].append(trLoss)
        history['train_acc'].append(trAcc)
        history['train_f1'].append(trF1)
        history['val_loss'].append(vaLoss)
        history['val_acc'].append(vaAcc)
        history['val_f1'].append(vaF1)
        
        print(f"[Epoch {epoch}] Train: loss={trLoss:.4f} acc={trAcc:.3f} f1={trF1:.3f} | "
              f"Val: loss={vaLoss:.4f} acc={vaAcc:.3f} f1={vaF1:.3f}")
        
        if save_best and vaF1 > best_f1:
            best_f1 = vaF1
            torch.save(model.state_dict(), "finbert_custom_head_best.pt")
            print(f"  ✓ Best model saved (F1={best_f1:.3f})")
    
    # 7. Save final model
    torch.save(model.state_dict(), "finbert_custom_head_final.pt")
    print(f"\n✓ Training complete! Best F1: {best_f1:.3f}")
    
    # 8. Plot training history
    print(f"\n📊 Generating plots...")
    plot_training_history(history)
    
    # 9. Evaluate on validation set
    print(f"\n🧪 Final Evaluation:")
    test_results = test_model(model, valLoader, device, show_confusion_matrix=True)
    
    return model, history, test_results


In [None]:
# Run the complete training and evaluation pipeline
# Using the balanced dataset for better model performance
model, history, test_results = train_and_evaluate(
    csv_path="data/stock_data_balanced.csv",  # Using balanced dataset
    model_name="ProsusAI/finbert",
    max_len=128,
    batch_size=8,
    val_frac=0.2,
    warmup_epochs=1,
    train_epochs=3,
    warmup_lr=2e-4,
    train_lr=2e-5,
    dropout=0.1,
    seed=42,
    save_best=True,
    use_class_weights=False  # False because we're using balanced data
)


## 10. Test Function with Detailed Evaluation


In [None]:
def test_model(model, test_loader, device, show_confusion_matrix=True):
    """
    Comprehensive test function with detailed metrics and visualizations.
    
    Args:
        model: Trained model
        test_loader: DataLoader for test data
        device: Device to run on
        show_confusion_matrix: Whether to plot confusion matrix
    
    Returns:
        Dictionary with test metrics
    """
    print("="*60)
    print("TESTING MODEL")
    print("="*60)
    
    # Evaluate model
    test_loss, test_acc, test_f1, preds, labels, probs = evaluate_model(model, test_loader, device)
    
    # Print metrics
    print(f"\n📊 Test Results:")
    print(f"  Loss:     {test_loss:.4f}")
    print(f"  Accuracy: {test_acc:.3f} ({test_acc*100:.1f}%)")
    print(f"  F1 Score: {test_f1:.3f}")
    
    # Classification report
    print(f"\n📋 Detailed Classification Report:")
    print(classification_report(labels, preds, target_names=['Negative', 'Positive'], digits=3))
    
    # Confusion Matrix
    if show_confusion_matrix:
        cm = confusion_matrix(labels, preds)
        
        plt.figure(figsize=(8, 6))
        plt.imshow(cm, interpolation='nearest', cmap='Blues')
        plt.title('Confusion Matrix', fontsize=16, fontweight='bold')
        plt.colorbar()
        
        classes = ['Negative', 'Positive']
        tick_marks = np.arange(len(classes))
        plt.xticks(tick_marks, classes, fontsize=12)
        plt.yticks(tick_marks, classes, fontsize=12)
        
        # Add text annotations
        thresh = cm.max() / 2.
        for i in range(cm.shape[0]):
            for j in range(cm.shape[1]):
                plt.text(j, i, format(cm[i, j], 'd'),
                        ha="center", va="center",
                        color="white" if cm[i, j] > thresh else "black",
                        fontsize=14)
        
        plt.ylabel('True Label', fontsize=12)
        plt.xlabel('Predicted Label', fontsize=12)
        plt.tight_layout()
        plt.savefig('confusion_matrix.png', dpi=150, bbox_inches='tight')
        plt.show()
    
    return {
        'loss': test_loss,
        'accuracy': test_acc,
        'f1': test_f1,
        'predictions': preds,
        'labels': labels,
        'probabilities': probs
    }


In [None]:
# Test the model on validation set
test_results = test_model(model, valLoader, device, show_confusion_matrix=True)


In [None]:
@torch.no_grad()
def predictTexts(texts, tokenizer, model, maxLen=128, threshold=0.5, device=None):
    """Predict sentiment for one or more texts.
    
    Args:
        texts: Single text string or list of texts
        tokenizer: HuggingFace tokenizer
        model: Trained model
        maxLen: Maximum sequence length
        threshold: Classification threshold (default 0.5)
        device: Device to run inference on
    
    Returns:
        labels: Binary labels (0=negative, 1=positive)
        probs: Probability scores
    """
    device = device or next(model.parameters()).device
    model.eval()
    
    if isinstance(texts, str):
        texts = [texts]
    
    enc = tokenizer(
        texts, 
        truncation=True, 
        padding="max_length", 
        max_length=maxLen, 
        return_tensors="pt"
    )
    
    logits = model(enc["input_ids"].to(device), enc["attention_mask"].to(device))
    probs = torch.sigmoid(logits).cpu().numpy()
    labels = (probs >= threshold).astype(int)  # 1=positive, 0=negative
    
    return labels, probs


## 12. Example Predictions


## 11. Inference on Custom Text


In [None]:
# Test the model on some example texts
test_texts = [
    "Stock prices are soaring! Great returns expected.",
    "Market crash imminent, investors panic selling.",
    "Company reports record profits and strong growth.",
    "Bankruptcy fears as debt levels continue to rise."
]

labels, probs = predictTexts(test_texts, tokenizer, model, device=device)

print("\nPredictions:")
print("="*80)
for text, label, prob in zip(test_texts, labels, probs):
    sentiment = "POSITIVE" if label == 1 else "NEGATIVE"
    print(f"Text: {text}")
    print(f"Prediction: {sentiment} (confidence: {prob:.3f})")
    print("-"*80)
