In [None]:
import torch
from datasets import load_dataset
import numpy as np
from torch.utils.data import DataLoader
from torch.optim import AdamW
from transformers import get_scheduler
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import f1_score, classification_report, accuracy_score
import torch
torch.cuda.is_available()

The program can work with any of the four datasets with minor modifications. This program is adapted for the relabeled version of the mixed-label dataset. The program is written and commented with the help of ChatGPT and Copilot.

In [None]:
dataset = load_dataset("Statistikkprosjekt/Mixed")
#dataset = load_dataset("ltg/norec_sentence","ternary")
num_classes = 4
dataset

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class CNN_NLP(nn.Module):
 
    def __init__(self,
                 
                 vocab_size=1024,
            
                 filter_sizes=[3, 4, 5],
                 num_filters=[100, 100, 100],
                 num_classes=4,
                 dropout=0.5):


        super(CNN_NLP, self).__init__()

    
        # Conv Network
        self.conv1d_list = nn.ModuleList([
            nn.Conv1d(in_channels=vocab_size,
                      out_channels=num_filters[i],
                      kernel_size=filter_sizes[i])
            for i in range(len(filter_sizes))
        ])
        # Fully-connected layer and Dropout
        self.fc = nn.Linear(np.sum(num_filters), num_classes)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, input_ids):


       
        x_embed = input_ids.float()


        x_reshaped = x_embed.permute(0, 2, 1)

        # Apply CNN and ReLU. Output shape: (b, num_filters[i], L_out)
        x_conv_list = [F.relu(conv1d(x_reshaped)) for conv1d in self.conv1d_list]

        # Max pooling. Output shape: (b, num_filters[i], 1)
        x_pool_list = [F.max_pool1d(x_conv, kernel_size=x_conv.shape[2])
            for x_conv in x_conv_list]
        
        # Concatenate x_pool_list to feed the fully connected layer.
        # Output shape: (b, sum(num_filters))
        x_fc = torch.cat([x_pool.squeeze(dim=2) for x_pool in x_pool_list],
                         dim=1)
        
        # Compute logits. Output shape: (b, n_classes)
        logits = self.fc(self.dropout(x_fc))

        return logits

In [None]:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("ltg/norbert3-base")


def tokenize_function(examples):
    return tokenizer(examples["review"], padding="max_length", truncation=True, max_length = 90, return_tensors="pt")



In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
num_epochs = 10


In [None]:
# models is where the BiGRU model will be saved 
# norbert_models is where the NorBERT model will be saved

models = [
     "example_folder/example_model1.pth", "example_folder/example_model2.pth", "example_folder/example_model3.pth", "example_folder/example_model4.pth", "example_folder/example_model5.pth", "example_folder/example_model6.pth", "example_folder/example_model7.pth", "example_folder/example_model8.pth", "example_folder/example_model9.pth", "example_folder/example_model10.pth", "example_folder/example_model11.pth", "example_folder/example_model12.pth", "example_folder/example_model13.pth", "example_folder/example_model14.pth", "example_folder/example_model15.pth"
          ]
        
norbert_models = [
    "example_folder/example_norbert1.pth", "example_folder/example_norbert2.pth", "example_folder/example_norbert3.pth", "example_folder/example_norbert4.pth", "example_folder/example_norbert5.pth", "example_folder/example_norbert6.pth", "example_folder/example_norbert7.pth", "example_folder/example_norbert8.pth", "example_folder/example_norbert9.pth", "example_folder/example_norbert10.pth", "example_folder/example_norbert11.pth", "example_folder/example_norbert12.pth", "example_folder/example_norbert13.pth", "example_folder/example_norbert14.pth", "example_folder/example_norbert15.pth"
       ]




In [None]:
from transformers import AutoModel


In [None]:
for i, j in zip( models, norbert_models):
    last = 199
    best = 199
    print(i)


    tokenized_datasets= dataset.map(tokenize_function, batched=True)

    tokenized_datasets  = tokenized_datasets.remove_columns(["review"])

    tokenized_datasets = tokenized_datasets.rename_column("polarity", "labels")

    tokenized_datasets.set_format("torch")

    train_dataloader = DataLoader(tokenized_datasets["train"], shuffle=True, batch_size=14)
    eval_dataloader = DataLoader(tokenized_datasets["validation"], batch_size=10)
    num_training_steps = 10 * len(train_dataloader)

    norbert = AutoModel.from_pretrained("ltg/norbert3-large", trust_remote_code=True)

    text = "Your input text goes here."
    tokens = tokenizer(text, return_tensors="pt")

   
    with torch.no_grad():
        outputs_bert = norbert(**tokens)

    last_hidden_states = outputs_bert.last_hidden_state

    cls_embeddings = last_hidden_states[:, 0, :]

    bert_hidden_size = last_hidden_states.size(-1)

    model = CNN_NLP(vocab_size=bert_hidden_size, num_classes=num_classes, dropout=0.5, filter_sizes=[2,3,4], num_filters=[200, 200, 200])
    
    # Optimizer and learning rate scheduler
    optimizer_norbert = AdamW(norbert.parameters(), lr=6e-6) 
    optimizer2 = AdamW(model.parameters(), lr=1e-5)  
    lr_scheduler_norbert = get_scheduler(
    name="linear", optimizer=optimizer_norbert, num_warmup_steps=0, num_training_steps=num_training_steps)
    lr_scheduler2 = get_scheduler(
    name="linear", optimizer=optimizer2, num_warmup_steps=0, num_training_steps=num_training_steps)

    # Move the model to the GPU
    norbert.to(device)
    model.to(device)

   # Training loop
    for epoch in range(num_epochs):
      
        average_loss = 0   
        norbert.train()
        model.train()   
        train_loss = 0
        train_samples = 0
        for batch in train_dataloader:
            
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = norbert(**batch)

            batch_hidden_states = outputs.last_hidden_state
            model_output =    model(batch_hidden_states)
            loss = F.cross_entropy(model_output, batch["labels"])
      


            train_loss += loss * batch['labels'].size(0)  
            train_samples += batch['labels'].size(0)

           
            
            loss.backward()

            optimizer_norbert.step()
            optimizer2.step()
            lr_scheduler_norbert.step()
            lr_scheduler2.step()
            optimizer_norbert.zero_grad()
            optimizer2.zero_grad()
        print(f'Epoch {epoch} Loss: {train_loss / train_samples}')
        norbert.eval()
        model.eval()
        total_loss = 0.0
        total_samples = 0
        # Evaluate the model on the validation set
        for batch in eval_dataloader:
                
                batch = {k: v.to(device) for k, v in batch.items()}
               
                with torch.no_grad():
                    outputs = norbert(**batch)

                    batch_hidden_states = outputs.last_hidden_state
                    model_output =    model(batch_hidden_states)
                loss = F.cross_entropy(model_output, batch["labels"])


                total_loss += loss * batch['labels'].size(0) 
                total_samples += batch['labels'].size(0)


        average_loss = total_loss / total_samples
        print(f'Evaluation Loss: {average_loss}')    

 
        

        if average_loss < last:
            
            Counter = 0
            if average_loss < best:
                torch.save(model, i)
                torch.save(norbert, j)
                best = average_loss
           
        last = average_loss
        Counter +=1
        if Counter > 2:   
                print(epoch)         
                break

In [None]:
import numpy as np
# Load the model first trained model
norbert = torch.load(norbert_models[0])
model = torch.load(models[0])

In [None]:
from sklearn.metrics import f1_score, classification_report, accuracy_score, recall_score, precision_score

model.eval()

all_predictions = []
all_labels = []

# evaluate the model
for batch in eval_dataloader:
    batch = {k: v.to(device) for k, v in batch.items()}
    with torch.no_grad():
        outputs = norbert(**batch)
        batch_hidden_states = outputs.last_hidden_state
        model_output =    model(batch_hidden_states)

    
        logits = model_output
 
        predictions = torch.argmax(logits, dim=1)

        all_predictions.extend(predictions.cpu().numpy())
        all_labels.extend(batch["labels"].cpu().numpy())

# Calculate F1 score
f1 = f1_score(all_labels, all_predictions, average='macro')
ac = accuracy_score(all_labels, all_predictions)
recall = recall_score(all_labels, all_predictions, average='macro')
precision = precision_score(all_labels, all_predictions, average='macro')
print(f"Test F1 Score: {f1:.3f}")
print(f"Test Accuracy: {ac:.3f}")
print(f"Test Recall: {recall:.3f}")
print(f"Test Precision: {precision:.3f}")

# Optionally, you can print a detailed classification report
report = classification_report(all_labels, all_predictions)
print("Classification Report:\n", report)

In [None]:

# Calculate confusion matrix
conf_matrix = confusion_matrix(all_labels, all_predictions)

# Display the confusion matrix
plt.figure(figsize=(10, 8))
c = sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=np.unique(all_labels), yticklabels=np.unique(all_labels))
c.collections[0].colorbar.remove()
plt.title("Confusion Matrix")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.show() 

# Optionally, you can print a detailed classification report
report = classification_report(all_labels, all_predictions)
print("Classification Report:\n", report)

In [None]:
# Calculate percentages for each true label
total_true_labels = np.sum(conf_matrix, axis=1)
percentages = (conf_matrix / total_true_labels[:, np.newaxis]) * 100

# Replace NaN values with 0 (for cases where the true label count is 0)
percentages = np.nan_to_num(percentages)

In [None]:
c = sns.heatmap(percentages, annot=True, fmt="f", cmap="Blues", xticklabels=np.unique(all_labels), yticklabels=np.unique(all_labels))
c.collections[0].colorbar.remove()
plt.title("Confusion Matrix")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.show()