# SwanBERT Pretraining on Labelled Finance Sentiment Classification Data (Financial PhraseBank)

### Import Libraries

In [5]:
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification, DistilBertConfig, DataCollatorWithPadding
from datasets import Dataset
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split
import torch
from torch.optim import AdamW
from sklearn.metrics import accuracy_score, f1_score

### Prepare Data for Training

In [None]:
# Using sentences with all levels of agreeance (>50%)
file_path = './FinancialPhraseBank/Data/Sentences_50Agree.txt'

texts = []
labels = []

with open(file_path, 'r', encoding='latin-1') as f:
    for line in f:
        line = line.strip() # Remove newline characters
        if line:    # Skip empty lines
            text, label = line.rsplit('@', 1)
            texts.append(text.strip())
            labels.append(label.strip())

label_map = {'negative': 0, 'neutral': 1, 'positive': 2}
numeric_labels = [label_map[label] for label in labels]

train_texts, test_texts, train_labels, test_labels = train_test_split(
    texts, numeric_labels, 
    test_size=0.2, 
    stratify=numeric_labels,
    random_state=42
)

# Create Dataset objects
train_dataset = Dataset.from_dict({
    'text': train_texts,
    'label': train_labels
})
test_dataset = Dataset.from_dict({
    'text': test_texts,
    'label': test_labels
})

# Tokenization
tokenizer = DistilBertTokenizer.from_pretrained("./financial-corpus-distilbert")

def tokenize_function(examples):
    return tokenizer(
        examples['text'], 
        truncation=True, 
        max_length=512,
        padding=False
    )

tokenized_train = train_dataset.map(tokenize_function, batched=True)
tokenized_test = test_dataset.map(tokenize_function, batched=True)

# Remove text column as we don't need it anymore
tokenized_train = tokenized_train.remove_columns(['text'])
tokenized_test = tokenized_test.remove_columns(['text'])

# Set format for PyTorch
tokenized_train.set_format('torch')
tokenized_test.set_format('torch')

# Create DataLoader with padding collator
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

batch_size = 16  # Adjust based on your GPU memory

train_loader = DataLoader(
    tokenized_train,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=data_collator
)

test_loader = DataLoader(
    tokenized_test,
    batch_size=batch_size,
    shuffle=False,  # No need to shuffle test data
    collate_fn=data_collator
)

# Verification
sample_batch = next(iter(train_loader))
print("Batch verification successful!")
print({k: v.shape for k, v in sample_batch.items()})

Map: 100%|██████████| 3876/3876 [00:02<00:00, 1747.51 examples/s]
Map: 100%|██████████| 970/970 [00:00<00:00, 1638.31 examples/s]

Batch verification successful!
{'input_ids': torch.Size([16, 58]), 'attention_mask': torch.Size([16, 58]), 'labels': torch.Size([16])}





### Train and Save SwanBERT Using Custom Training (Slanted Triangular Learning Rates, Gradual Unfreezing, and Discriminitive Fine-Tuning)

In [7]:
# Model needs to be reconfigured for sentiment classification from a masked language model
config = DistilBertConfig.from_pretrained(
    "./financial-corpus-distilbert",
    num_labels=3,
    id2label={0: "negative", 1: "neutral", 2: "positive"},
    label2id=label_map,
    architectures=["DistilBertForSequenceClassification"]
)

model = DistilBertForSequenceClassification.from_pretrained(
    "./financial-corpus-distilbert",
    config=config,
    ignore_mismatched_sizes=True
)

# Slanted triangular learning rate
def get_slanted_triangular_lr(optimizer, num_steps, max_lr=2e-5, cut_frac=0.1, ratio=32):
    """
    STLR schedule: Linear warmup followed by linear decay.
    Args:
        optimizer: Optimizer object.
        num_steps: Total training steps.
        max_lr: Peak learning rate.
        cut_frac: Fraction of steps for warmup (default: 10%).
        ratio: Controls steepness of decay (higher = steeper).
    """
    step_size = max_lr / (num_steps * cut_frac)
    def lr_lambda(step):
        if step < num_steps * cut_frac:
            # Warmup phase: Linear increase
            return step / (num_steps * cut_frac)
        else:
            # Annealing phase: Linear decrease
            return 1 / (ratio * (step / (num_steps * cut_frac) - 1) + 1)
    
    scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda)
    return scheduler

# Disriminative fine-tuning
def get_optimizer(model, lr=1e-4, discr_rate=0.85):
    """
    Assign lower learning rates to earlier layers.
    Args:
        discr_rate: Discount factor for layer-wise LR (e.g., 0.85 means layer l-1 has LR = 0.85 * layer l).
    """
    no_decay = ["bias", "LayerNorm.weight"]
    optimizer_grouped_parameters = []
    
    # Iterate through layers (DistilBERT has 6 transformer layers)
    for i in range(6, -1, -1):  # From top layer (6) to embeddings (0)
        layer_params = {
            "params": [p for n, p in model.named_parameters() 
                      if f"layer.{i}." in n and not any(nd in n for nd in no_decay)],
            "lr": lr * (discr_rate ** (6 - i)),  # Decrease LR for lower layers
        }
        optimizer_grouped_parameters.append(layer_params)
    
    # Add remaining parameters (e.g., classifier head)
    optimizer_grouped_parameters.append({
        "params": [p for n, p in model.named_parameters() 
                  if not any(f"layer.{i}." in n for i in range(6)) and not any(nd in n for nd in no_decay)],
        "lr": lr,  # Higher LR for classifier
    })
    
    return AdamW(optimizer_grouped_parameters)


# Gradual unfreezing
def unfreeze_layers(model, current_epoch, total_epochs):
    """
    Gradually unfreeze layers starting from the top.
    Args:
        current_epoch: Current epoch number.
        total_epochs: Total epochs planned.
    """
    layers_to_unfreeze = int((current_epoch / total_epochs) * 6)  # DistilBERT has 6 layers
    for i in range(6):
        for param in model.distilbert.transformer.layer[i].parameters():
            param.requires_grad = (i >= (6 - layers_to_unfreeze))


num_epochs = 10

# Initialize optimizer with discriminative LR
optimizer = get_optimizer(model, lr=1e-4, discr_rate=0.9)

# STLR scheduler
total_steps = len(train_loader) * num_epochs
scheduler = get_slanted_triangular_lr(optimizer, total_steps)

# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Training loop
for epoch in range(num_epochs):
    model.train()
    unfreeze_layers(model, epoch, num_epochs)
    
    # Initialize epoch-level training loss
    train_loss = 0.0
    
    for batch in train_loader:
        batch = {k: v.to(device) for k, v in batch.items()}
        
        optimizer.zero_grad()
        outputs = model(**batch)
        loss = outputs.loss
        loss.backward()
        optimizer.step()
        scheduler.step()
        
        # Accumulate training loss (per batch)
        train_loss += loss.item()
    
    # Calculate average training loss for the epoch
    avg_train_loss = train_loss / len(train_loader)
    
    # Validation phase
    model.eval()
    val_loss = 0.0
    
    # Initialize lists to store predictions and true labels
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for batch in test_loader:
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(**batch)
            
            val_loss += outputs.loss.item()

            logits = outputs.logits
            preds = torch.argmax(logits, dim=1)
            
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(batch['labels'].cpu().numpy())
    
    # Calculate metrics 
    avg_val_loss = val_loss / len(test_loader)
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted')

    # Print both losses
    print(f"Epoch {epoch+1}/{num_epochs} | "
          f"Train Loss: {avg_train_loss:.4f} | "
          f"Val Loss: {avg_val_loss:.4f} | "
          f"Accuracy: {accuracy:.4f} | "
          f"F1-Score: {f1:.4f}")

# Save model
model.save_pretrained('./sentiment_model_custom')
tokenizer.save_pretrained('./sentiment_model_custom')

Some weights of the model checkpoint at ./financial-corpus-distilbert were not used when initializing DistilBertForSequenceClassification: ['vocab_layer_norm.bias', 'vocab_layer_norm.weight', 'vocab_projector.bias', 'vocab_transform.bias', 'vocab_transform.weight']
- This IS expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at ./financial-corpus-distilbert and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_c

Epoch 1/10 | Train Loss: 0.9091 | Val Loss: 0.7346 | Accuracy: 0.6856 | F1-Score: 0.6469
Epoch 2/10 | Train Loss: 0.6857 | Val Loss: 0.6910 | Accuracy: 0.7041 | F1-Score: 0.6738
Epoch 3/10 | Train Loss: 0.6415 | Val Loss: 0.6575 | Accuracy: 0.7093 | F1-Score: 0.6876
Epoch 4/10 | Train Loss: 0.6083 | Val Loss: 0.6371 | Accuracy: 0.7124 | F1-Score: 0.6944
Epoch 5/10 | Train Loss: 0.5760 | Val Loss: 0.6029 | Accuracy: 0.7412 | F1-Score: 0.7308
Epoch 6/10 | Train Loss: 0.5309 | Val Loss: 0.5659 | Accuracy: 0.7691 | F1-Score: 0.7628
Epoch 7/10 | Train Loss: 0.4909 | Val Loss: 0.5370 | Accuracy: 0.7794 | F1-Score: 0.7748
Epoch 8/10 | Train Loss: 0.4620 | Val Loss: 0.5148 | Accuracy: 0.7866 | F1-Score: 0.7830
Epoch 9/10 | Train Loss: 0.4387 | Val Loss: 0.5000 | Accuracy: 0.7948 | F1-Score: 0.7923
Epoch 10/10 | Train Loss: 0.4199 | Val Loss: 0.4873 | Accuracy: 0.7969 | F1-Score: 0.7950


('./sentiment_model_custom\\tokenizer_config.json',
 './sentiment_model_custom\\special_tokens_map.json',
 './sentiment_model_custom\\vocab.txt',
 './sentiment_model_custom\\added_tokens.json')