<a href="https://colab.research.google.com/github/GiorgioMB/UniversityProjects/blob/BERT-Project/BERT_Training.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install datasets
!pip install transformers
!pip install torch torchvision torchaudio
!pip install numpy
!pip install matplotlib

# Initialize dataset and model

In [None]:
from datasets import load_dataset
from transformers import BertTokenizer
import torch
import numpy as np
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import train_test_split
from transformers import BertForSequenceClassification, AdamW
import torch.nn.functional as F
from sklearn.metrics import classification_report, confusion_matrix
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from transformers import AutoTokenizer, AutoModelForSequenceClassification

In [None]:
dataset = load_dataset("zeroshot/twitter-financial-news-sentiment")
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

Train subset

In [None]:
train_text_data = dataset['train']['text']
train_numeric_labels = dataset['train']['label']
tokenized_train_data = tokenizer(train_text_data, padding=True, truncation=True, return_tensors='pt')
train_labels = torch.tensor(train_numeric_labels)
train_data = TensorDataset(
    tokenized_train_data['input_ids'],
    tokenized_train_data['attention_mask'],
    train_labels
)
train_loader = DataLoader(train_data, batch_size=64, shuffle=False)

Validation subset

In [None]:
validation_text_data = dataset['validation']['text']
validation_numeric_labels = dataset['validation']['label']
tokenized_validation_data = tokenizer(validation_text_data, padding=True, truncation=True, return_tensors='pt')
validation_labels = torch.tensor(validation_numeric_labels)
validation_dataset = TensorDataset(
    tokenized_validation_data['input_ids'],
    tokenized_validation_data['attention_mask'],
    validation_labels
)
validation_loader = DataLoader(validation_dataset, batch_size=64, shuffle=False)

Test subset

In [None]:
train_dataset = torch.utils.data.TensorDataset(tokenized_train_data['input_ids'], tokenized_train_data['attention_mask'], train_labels)
train_indices, test_indices = train_test_split(range(len(train_dataset)), test_size=0.2, random_state=42)
test_dataset = torch.utils.data.Subset(train_dataset, test_indices)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64)

Model

In [None]:
num_labels = 3  # Number of sentiment labels (negative, positive, neutral)
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=num_labels)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
patience = 3  # Number of epochs to wait for improvement
best_validation_accuracy = 0.0
no_improvement_count = 0

# Training

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


In [None]:
model.to(device)

In [None]:
num_epochs = 20  # Adjust the number of epochs as needed
for epoch in range(num_epochs):
    model.train()
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids, attention_mask, batch_labels = batch
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        batch_labels = batch_labels.to(device)
        outputs = model(input_ids, attention_mask=attention_mask)
        loss = F.cross_entropy(outputs.logits, batch_labels)
        loss.backward()
        optimizer.step()

    model.eval()
    with torch.no_grad():
        correct_predictions = 0
        total_samples = 0
        for batch in validation_loader:
            input_ids, attention_mask, batch_labels = batch
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            batch_labels = batch_labels.to(device)
            outputs = model(input_ids, attention_mask=attention_mask)
            predicted_labels = outputs.logits.argmax(dim=-1)
            correct_predictions += (predicted_labels == batch_labels).sum().item()
            total_samples += len(batch_labels)

        validation_accuracy = correct_predictions / total_samples

        if validation_accuracy > best_validation_accuracy:
            best_validation_accuracy = validation_accuracy
            no_improvement_count = 0
        else:
            no_improvement_count += 1
            if no_improvement_count >= patience:
                print(f'Early stopping triggered at epoch {epoch}')
                break
    print(epoch)

# Test

In [None]:
model.eval()
true_labels = []
predicted_labels = []
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

with torch.no_grad():
    for batch in test_loader:
        input_ids, attention_mask, batch_labels = batch
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        batch_labels = batch_labels.to(device)

        outputs = model(input_ids, attention_mask=attention_mask)
        predicted_batch_labels = outputs.logits.argmax(dim=-1)

        true_labels.extend(batch_labels.cpu().tolist())
        predicted_labels.extend(predicted_batch_labels.cpu().tolist())

true_labels = torch.tensor(true_labels)
predicted_labels = torch.tensor(predicted_labels)
accuracy = (true_labels == predicted_labels).sum().item() / len(true_labels)
class_report = classification_report(true_labels, predicted_labels, target_names=["negative", "positive", "neutral"])
conf_matrix = confusion_matrix(true_labels, predicted_labels)
print(f"Accuracy: {accuracy:.4f}")
print("Classification Report:")
print(class_report)
print("Confusion Matrix:")
print(conf_matrix)

# Save

In [None]:
save_path = '/content/drive/MyDrive/models'


In [None]:
model.save_pretrained(save_path)
tokenizer.save_pretrained(save_path)


# Cross-validation

In [None]:
train_text_data = dataset['train']['text']
train_numeric_labels = dataset['train']['label']
tokenized_train_data = tokenizer(train_text_data, padding=True, truncation=True, return_tensors='pt')
train_labels = np.array(train_numeric_labels)
n_splits = 10  # Adjust as needed
train_accuracies = []
validation_accuracies = []
cross_validator = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
for fold, (train_indices, validation_indices) in enumerate(cross_validator.split(tokenized_train_data['input_ids'], train_labels)):
    print(f"Fold {fold + 1}/{n_splits}")
    train_dataset = torch.utils.data.TensorDataset(
        tokenized_train_data['input_ids'][train_indices],
        tokenized_train_data['attention_mask'][train_indices],
        torch.tensor(train_labels[train_indices])
    )
    fold_train_accuracies = []
    fold_validation_accuracies = []
    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
    validation_dataset = torch.utils.data.TensorDataset(
        tokenized_train_data['input_ids'][validation_indices],
        tokenized_train_data['attention_mask'][validation_indices],
        torch.tensor(train_labels[validation_indices])
    )
    validation_loader = torch.utils.data.DataLoader(validation_dataset, batch_size=64, shuffle=False)
    num_epochs = 20  # Adjust the number of epochs as needed
    for epoch in range(num_epochs):
        model.train()
        for batch in train_loader:
            optimizer.zero_grad()
            input_ids, attention_mask, batch_labels = batch
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            batch_labels = batch_labels.to(device)
            outputs = model(input_ids, attention_mask=attention_mask)
            loss = F.cross_entropy(outputs.logits, batch_labels)
            loss.backward()
            optimizer.step()
        model.eval()
        with torch.no_grad():
            correct_predictions = 0
            total_samples = 0
            for batch in validation_loader:
                input_ids, attention_mask, batch_labels = batch
                input_ids = input_ids.to(device)
                attention_mask = attention_mask.to(device)
                batch_labels = batch_labels.to(device)
                outputs = model(input_ids, attention_mask=attention_mask)
                predicted_labels = outputs.logits.argmax(dim=-1)
                correct_predictions += (predicted_labels == batch_labels).sum().item()
                total_samples += len(batch_labels)
            validation_accuracy = correct_predictions / total_samples
            train_accuracy = correct_predictions / total_samples
            fold_train_accuracies.append(train_accuracy)
            if validation_accuracy > best_validation_accuracy:
                best_validation_accuracy = validation_accuracy
                no_improvement_count = 0
            else:
                no_improvement_count += 1
                if no_improvement_count >= patience:
                    print(f'Early stopping triggered at epoch {epoch}')
                    break
    train_accuracies.append(fold_train_accuracies)
    validation_accuracies.append(fold_validation_accuracies)

##This doesn't really work but to be honest, it's unnecessary for the training
plt.figure(figsize=(10, 6))
for fold in range(n_splits):
    plt.plot(range(num_epochs), train_accuracies[fold], label=f"Fold {fold + 1} Train")
    plt.plot(range(num_epochs), validation_accuracies[fold], label=f"Fold {fold + 1} Validation")

plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Training and Validation Accuracies')
plt.legend()
plt.grid(True)
plt.show()


# Find best learning rate


In [None]:
lr_finder = torch.optim.lr_scheduler.LambdaLR(optimizer, lambda _: 1)
lr_values = []
loss_values = []
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    print(epoch)
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids, attention_mask, batch_labels = batch
        input_ids = input_ids.to(device)
        attention_mask = attention_mask.to(device)
        batch_labels = batch_labels.to(device)
        outputs = model(input_ids, attention_mask=attention_mask)
        loss = F.cross_entropy(outputs.logits, batch_labels)
        loss.backward()
        optimizer.step()
        lr_values.append(optimizer.param_groups[0]['lr'])
        loss_values.append(loss.item())

        lr_finder.step()


best_lr_index = loss_values.index(min(loss_values))
best_lr = lr_values[best_lr_index]
print(f"Best learning rate: {best_lr}")