# Fine-tuning BERT with validation

### Loading data and libraries; testing/training split

In [3]:
# Import necessary libraries
import pandas as pd
import os
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertForSequenceClassification, Trainer, TrainingArguments
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
import shutil

# Load the sentiment scores CSV
excel_path = r"C:/Users/MCOB PHD 14/Dropbox/Charlie's Dissertation/Beige Books/manual_sentiment.csv"
sentiment_data = pd.read_csv(excel_path)

# Define the label function
def label_sentiment(score):
    if score <= -0.3:
        return 0  # Negative
    elif score <= 0.2:
        return 1  # Mixed
    else:
        return 2  # Positive

# Apply the label function to the sentiment scores
sentiment_data['label'] = sentiment_data['human_sentiment'].apply(label_sentiment)

# Define path where text files are stored
text_files_dir = r"C:/Users/MCOB PHD 14/Dropbox/Charlie's Dissertation/Beige Books/selected_chunks2"

# Load the text files and create a DataFrame
text_data = {}
for filename in os.listdir(text_files_dir):
    if filename.endswith('.txt'):
        file_path = os.path.join(text_files_dir, filename)
        with open(file_path, 'r', encoding='utf-8') as file:
            text_data[filename] = file.read()

# Combine text data with sentiment data
text_df = pd.DataFrame(list(text_data.items()), columns=['file_names', 'text'])
combined_data = pd.merge(sentiment_data, text_df, on='file_names')

# Split data into training and testing sets
train_data, test_data = train_test_split(combined_data, test_size=0.2, random_state=42, stratify=combined_data['label'])

# Save training and testing files in separate folders for future use
#train_text_files_dir = r"C:/Users/MCOB PHD 14/Dropbox/Charlie's Dissertation/Beige Books/train_texts"
#test_text_files_dir = r"C:/Users/MCOB PHD 14/Dropbox/Charlie's Dissertation/Beige Books/test_texts"
#os.makedirs(train_text_files_dir, exist_ok=True)
#os.makedirs(test_text_files_dir, exist_ok=True)

for _, row in train_data.iterrows():
    src_path = os.path.join(text_files_dir, row['file_names'])
    dest_path = os.path.join(train_text_files_dir, row['file_names'])
    shutil.copyfile(src_path, dest_path)

for _, row in test_data.iterrows():
    src_path = os.path.join(text_files_dir, row['file_names'])
    dest_path = os.path.join(test_text_files_dir, row['file_names'])
    shutil.copyfile(src_path, dest_path)

## Tokenization

In [4]:
# Tokenize the text data
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

def tokenize_texts(texts, max_length=512):
    tokenized_texts = []
    for text in texts:
        tokenized_input = tokenizer(
            text,
            padding='max_length',   # Pad to max_length
            truncation=True,        # Truncate if longer than max_length
            max_length=max_length,  # Maximum length for input
            return_tensors='pt'     # Return PyTorch tensors
        )
        tokenized_texts.append({
            'input_ids': tokenized_input['input_ids'].squeeze(),
            'attention_mask': tokenized_input['attention_mask'].squeeze()
        })
    return tokenized_texts

# Tokenize training and testing texts
train_texts = train_data['text'].tolist()
train_labels = train_data['label'].tolist()
tokenized_train_texts = tokenize_texts(train_texts)

test_texts = test_data['text'].tolist()
test_labels = test_data['label'].tolist()
tokenized_test_texts = tokenize_texts(test_texts)

# Create custom PyTorch Dataset
class SentimentDataset(Dataset):
    def __init__(self, tokenized_texts, labels):
        self.tokenized_texts = tokenized_texts
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        input_ids = self.tokenized_texts[idx]['input_ids']
        attention_mask = self.tokenized_texts[idx]['attention_mask']
        label = torch.tensor(self.labels[idx])
        return {'input_ids': input_ids, 'attention_mask': attention_mask, 'labels': label}

# Create the training and testing datasets
train_dataset = SentimentDataset(tokenized_train_texts, train_labels)
test_dataset = SentimentDataset(tokenized_test_texts, test_labels)

# Create DataLoaders
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=False)



## Fine-tuning

In [7]:
# Load the BERT model with a classification head
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=3)

# Define training arguments
training_args = TrainingArguments(
    output_dir='./results',          # output directory
    num_train_epochs=3,              # number of training epochs
    per_device_train_batch_size=8,   # batch size for training
    per_device_eval_batch_size=8,    # batch size for evaluation
    warmup_steps=500,                # number of warmup steps
    weight_decay=0.01,               # strength of weight decay
    logging_dir='./logs',            # directory for storing logs
    eval_strategy="epoch",           # Evaluate at the end of each epoch
)

# Initialize the Trainer with the datasets and model
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
)

# Fine-tune the model on the training dataset
trainer.train()

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
                                                 
 33%|███▎      | 100/300 [20:42<38:50, 11.65s/it]

{'eval_loss': 1.0229394435882568, 'eval_runtime': 83.7562, 'eval_samples_per_second': 2.388, 'eval_steps_per_second': 0.298, 'epoch': 1.0}


                                                   
 67%|██████▋   | 200/300 [40:41<19:16, 11.56s/it]

{'eval_loss': 0.783829927444458, 'eval_runtime': 84.6447, 'eval_samples_per_second': 2.363, 'eval_steps_per_second': 0.295, 'epoch': 2.0}


                                                   
100%|██████████| 300/300 [1:01:28<00:00, 12.30s/it]

{'eval_loss': 0.8426475524902344, 'eval_runtime': 84.1755, 'eval_samples_per_second': 2.376, 'eval_steps_per_second': 0.297, 'epoch': 3.0}
{'train_runtime': 3688.6682, 'train_samples_per_second': 0.651, 'train_steps_per_second': 0.081, 'train_loss': 0.953960673014323, 'epoch': 3.0}





TrainOutput(global_step=300, training_loss=0.953960673014323, metrics={'train_runtime': 3688.6682, 'train_samples_per_second': 0.651, 'train_steps_per_second': 0.081, 'total_flos': 631472202547200.0, 'train_loss': 0.953960673014323, 'epoch': 3.0})

## Assess model 

In [8]:
# Evaluate the model on the test dataset
predictions, true_labels, _ = trainer.predict(test_dataset)

# Convert predictions to label indices
predicted_labels = torch.argmax(torch.tensor(predictions), axis=1)

# Print a classification report for the test dataset
report = classification_report(true_labels, predicted_labels, target_names=["Negative", "Mixed", "Positive"])
print(report)

# Calculate and print the accuracy on the test dataset
accuracy = accuracy_score(true_labels, predicted_labels)
print(f'Accuracy on test data: {accuracy:.4f}')



100%|██████████| 25/25 [01:22<00:00,  3.32s/it]

              precision    recall  f1-score   support

    Negative       0.57      0.87      0.69        39
       Mixed       0.56      0.73      0.64        83
    Positive       0.97      0.38      0.55        78

    accuracy                           0.62       200
   macro avg       0.70      0.66      0.62       200
weighted avg       0.72      0.62      0.61       200

Accuracy on test data: 0.6250





In [13]:
# Save model

model.save_pretrained('./BeigeBERT_three_validated')
tokenizer.save_pretrained('./BeigeBERT_three_validated')

# The model and tokenizer can be loaded later for inference


('./BeigeBERT_three_validated\\tokenizer_config.json',
 './BeigeBERT_three_validated\\special_tokens_map.json',
 './BeigeBERT_three_validated\\vocab.txt',
 './BeigeBERT_three_validated\\added_tokens.json')

## RoBERTa test

In [16]:
from transformers import RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.utils.class_weight import compute_class_weight
import numpy as np

# Step 1: Load the RoBERTa tokenizer and re-tokenize the text data

# Load the RoBERTa tokenizer
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')

# Function to tokenize text using RoBERTa tokenizer
def tokenize_texts_roberta(texts, max_length=512):
    tokenized_texts = []
    for text in texts:
        tokenized_input = tokenizer(
            text,
            padding='max_length',    # Pad all sequences to max_length
            truncation=True,         # Truncate sequences longer than max_length
            max_length=max_length,   # Set maximum length for sequences
            return_tensors='pt'      # Return PyTorch tensors
        )
        tokenized_texts.append({
            'input_ids': tokenized_input['input_ids'].squeeze(), 
            'attention_mask': tokenized_input['attention_mask'].squeeze()
        })
    return tokenized_texts

# Re-tokenize training and testing texts
tokenized_train_texts_roberta = tokenize_texts_roberta(train_texts)
tokenized_test_texts_roberta = tokenize_texts_roberta(test_texts)

# Step 2: Create PyTorch datasets using re-tokenized data

class SentimentDataset(Dataset):
    def __init__(self, tokenized_texts, labels):
        self.tokenized_texts = tokenized_texts
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        input_ids = self.tokenized_texts[idx]['input_ids']
        attention_mask = self.tokenized_texts[idx]['attention_mask']
        label = torch.tensor(self.labels[idx])
        return {'input_ids': input_ids, 'attention_mask': attention_mask, 'labels': label}

# Create the datasets using re-tokenized data
train_dataset_roberta = SentimentDataset(tokenized_train_texts_roberta, train_labels)
test_dataset_roberta = SentimentDataset(tokenized_test_texts_roberta, test_labels)


# Define training arguments
training_args = TrainingArguments(
    output_dir='./results',          # Output directory for saving model checkpoints
    num_train_epochs=3,              # Number of training epochs
    per_device_train_batch_size=8,   # Batch size for training
    per_device_eval_batch_size=8,    # Batch size for evaluation
    warmup_steps=500,                # Number of warmup steps
    weight_decay=0.01,               # Strength of weight decay
    logging_dir='./logs',            # Directory for storing logs
    eval_strategy="epoch",           # Evaluate at the end of each epoch
)



In [18]:
#Step 4: Initialize the Trainer with the re-tokenized datasets and RoBERTa model

# Load the RoBERTa model with a classification head
model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=3)

# Initialize the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset_roberta,   # Use the re-tokenized training dataset
    eval_dataset=test_dataset_roberta,     # Use the re-tokenized testing dataset
    # Optionally pass class weights to the Trainer
    # compute_metrics=...  # Add custom evaluation metrics if needed
)

# Step 5: Fine-tune the model on the re-tokenized training dataset
trainer.train()

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
                                                 
 33%|███▎      | 100/300 [21:04<39:29, 11.85s/it]

{'eval_loss': 1.0409733057022095, 'eval_runtime': 95.0076, 'eval_samples_per_second': 2.105, 'eval_steps_per_second': 0.263, 'epoch': 1.0}


                                                   
 67%|██████▋   | 200/300 [42:33<20:00, 12.01s/it]

{'eval_loss': 0.7915773987770081, 'eval_runtime': 94.8621, 'eval_samples_per_second': 2.108, 'eval_steps_per_second': 0.264, 'epoch': 2.0}


                                                   
100%|██████████| 300/300 [1:04:11<00:00, 12.84s/it]

{'eval_loss': 0.6947354674339294, 'eval_runtime': 92.7937, 'eval_samples_per_second': 2.155, 'eval_steps_per_second': 0.269, 'epoch': 3.0}
{'train_runtime': 3851.31, 'train_samples_per_second': 0.623, 'train_steps_per_second': 0.078, 'train_loss': 0.9571084594726562, 'epoch': 3.0}





TrainOutput(global_step=300, training_loss=0.9571084594726562, metrics={'train_runtime': 3851.31, 'train_samples_per_second': 0.623, 'train_steps_per_second': 0.078, 'total_flos': 631472202547200.0, 'train_loss': 0.9571084594726562, 'epoch': 3.0})

In [19]:
# Step 6: Evaluate the model on the re-tokenized test dataset

# Evaluate the model on the test dataset
roberta_predictions, roberta_true_labels, _ = trainer.predict(test_dataset_roberta)

# Convert predictions to label indices
roberta_predicted_labels = torch.argmax(torch.tensor(roberta_predictions), axis=1)

# Print a classification report for the test dataset
from sklearn.metrics import classification_report, accuracy_score

roberta_report = classification_report(roberta_true_labels, roberta_predicted_labels, target_names=["Negative", "Mixed", "Positive"])
print(roberta_report)

# Calculate and print the accuracy on the test dataset
roberta_accuracy = accuracy_score(roberta_true_labels, roberta_predicted_labels)
print(f'RoBERTa accuracy on test data: {roberta_accuracy:.4f}')

100%|██████████| 25/25 [01:31<00:00,  3.65s/it]

              precision    recall  f1-score   support

    Negative       0.63      0.79      0.70        39
       Mixed       0.64      0.65      0.65        83
    Positive       0.79      0.68      0.73        78

    accuracy                           0.69       200
   macro avg       0.69      0.71      0.69       200
weighted avg       0.70      0.69      0.69       200

RoBERTa accuracy on test data: 0.6900





In [20]:
# Save the RoBERTa model and tokenizer
model.save_pretrained('./RoBERTa_three_validated')
tokenizer.save_pretrained('./RoBERTa_three_validated')

('./RoBERTa_three_validated\\tokenizer_config.json',
 './RoBERTa_three_validated\\special_tokens_map.json',
 './RoBERTa_three_validated\\vocab.json',
 './RoBERTa_three_validated\\merges.txt',
 './RoBERTa_three_validated\\added_tokens.json')

# VADER test

We're going to:

1. Calculate sentiment on the training texts using VADER.
2. Change those to class labels based on the same thresholds.
3. Calculate the classification error rates.

In [10]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

# Step 1: Calculate Sentiment Scores Using VADER

# Initialize VADER sentiment analyzer
analyzer = SentimentIntensityAnalyzer()

# Calculate VADER sentiment scores for training texts
vader_scores = [analyzer.polarity_scores(text)['compound'] for text in test_texts]

# Step 2: Convert Sentiment Scores to Class Labels

# Define the same label conversion function used for the model
def vader_to_label(score):
    if score < -0.2:
        return 0  # Negative
    elif score <= 0.2:
        return 1  # Mixed
    else:
        return 2  # Positive

# Convert VADER scores to class labels
vader_labels = [vader_to_label(score) for score in vader_scores]

# Step 3: Calculate Classification Error Rates

# Calculate accuracy
vader_accuracy = accuracy_score(test_labels, vader_labels)
print(f'VADER accuracy on training data: {vader_accuracy:.4f}')

# Calculate classification report
vader_classification_report = classification_report(test_labels, vader_labels, target_names=["Negative", "Mixed", "Positive"])
print("Classification Report:\n", vader_classification_report)

# Calculate confusion matrix
vader_confusion_matrix = confusion_matrix(test_labels, vader_labels)
print("Confusion Matrix:\n", vader_confusion_matrix)


VADER accuracy on training data: 0.4950
Classification Report:
               precision    recall  f1-score   support

    Negative       0.52      0.44      0.47        39
       Mixed       0.53      0.10      0.16        83
    Positive       0.49      0.95      0.64        78

    accuracy                           0.49       200
   macro avg       0.51      0.49      0.43       200
weighted avg       0.51      0.49      0.41       200

Confusion Matrix:
 [[17  4 18]
 [15  8 60]
 [ 1  3 74]]


In [11]:
print(vader_classification_report)

              precision    recall  f1-score   support

    Negative       0.52      0.44      0.47        39
       Mixed       0.53      0.10      0.16        83
    Positive       0.49      0.95      0.64        78

    accuracy                           0.49       200
   macro avg       0.51      0.49      0.43       200
weighted avg       0.51      0.49      0.41       200



In [15]:
# Convert vader_labels to positive, negative, and mixed
def vader_to_label(score):
    if score < -0.2:
        return 'negative'
    elif score <= 0.2:
        return 'mixed'
    else:
        return 'positive'
    
vader_labels = [vader_to_label(score) for score in vader_scores]

# Save to a dataframe
vader_df = pd.DataFrame({'file_names': test_data['file_names'], 'vader_sentiment': vader_labels})

vader_df.head()

# Save to a CSV
vader_df.to_csv('vader_sentiment.csv', index=True)