In [None]:
import numpy as np
import matplotlib.pyplot as plt

from datasets import load_dataset
# from transformers import BertModel, BertTokenizer
from transformers import AutoModel, AutoTokenizer

import torch
from torch import optim
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

In [None]:
# Parameters
batch_size = 64
num_epochs = 5
num_samples = 10
learning_rate = 0.0005  # learning rate for the gradient descent optimizer, related to the step size

In [None]:
# load the "irony" dataset(name=irony)
cache_dir = "./data_cache_"
subset_name = "irony"

train_dataset_ = load_dataset(
    "tweet_eval",
    name=subset_name,
    split="train",
    ignore_verifications=True,
    cache_dir=cache_dir,
)
print(f"Training dataset with {len(train_dataset_)} instances loaded")

val_dataset_ = load_dataset(
    "tweet_eval",
    name=subset_name,
    split="validation",
    ignore_verifications=True,
    cache_dir=cache_dir,
)
print(f"Validation dataset with {len(val_dataset_)} instances loaded")

test_dataset_ = load_dataset(
    "tweet_eval",
    name=subset_name,
    split="test",
    ignore_verifications=True,
    cache_dir=cache_dir,
)
print(f"Test dataset with {len(test_dataset_)} instances loaded")

num_classes_ = np.unique(train_dataset_['label']).size



Training dataset with 2862 instances loaded




Validation dataset with 955 instances loaded




Test dataset with 784 instances loaded


In [None]:
# create Tokenizer object
# tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
tokenizer = AutoTokenizer.from_pretrained("roberta-base")

# create 'input_ids', 'token_type_ids', 'attention_mask' for train/val/test dataset
def tokenize_function(dataset):
    model_inputs = tokenizer(dataset['text'], padding="max_length", max_length=100, truncation=True)
    return model_inputs

train_dataset_ = train_dataset_.map(tokenize_function, batched=True)
val_dataset_ = val_dataset_.map(tokenize_function, batched=True)
test_dataset_ = test_dataset_.map(tokenize_function, batched=True)



  0%|          | 0/1 [00:00<?, ?ba/s]



In [None]:
# convert dataset to dataloader
def convert_to_data_loader(dataset, num_classes):
    # convert from list to tensor
    input_tensor = torch.from_numpy(np.array(dataset['input_ids']))
    label_tensor = torch.from_numpy(np.array(dataset['label'])).long()
    # 'attention_mask' is also taken into consideration when constructing DataLoader(def forward())
    atten_tensor = torch.from_numpy(np.array(dataset['attention_mask']))
    
    tensor_dataset = TensorDataset(input_tensor, atten_tensor, label_tensor)
    loader = DataLoader(tensor_dataset, batch_size=batch_size, shuffle=True)

    return loader

# get all sentences ready for the model
train_loader = convert_to_data_loader(train_dataset_, num_classes_)
val_loader = convert_to_data_loader(val_dataset_, num_classes_)
test_loader = convert_to_data_loader(test_dataset_, num_classes_)

模型部分

In [None]:
class RoBertaClassifier(nn.Module):
    def __init__(self, freeze_bert, aleatoric_use, num_labels):
        super(RoBertaClassifier, self).__init__()

        # use the pretrained bert model corresponding to the previous tokenizer
        self.aleatoric_use = aleatoric_use
        # self.bert = BertModel.from_pretrained("bert-base-uncased")
        self.bert = AutoModel.from_pretrained("roberta-base")
        self.config = self.bert.config
        """
        "_name_or_path": "roberta-base",
        "architectures": ["RobertaForMaskedLM"],
        "attention_probs_dropout_prob": 0.1,
        "bos_token_id": 0,
        "classifier_dropout": null,
        "eos_token_id": 2,
        "hidden_act": "gelu",
        "hidden_dropout_prob": 0.1,
        "hidden_size": 768,
        "initializer_range": 0.02,
        "intermediate_size": 3072,
        "layer_norm_eps": 1e-05,
        "max_position_embeddings": 514,
        "model_type": "roberta",
        "num_attention_heads": 12,
        "num_hidden_layers": 12,
        "pad_token_id": 1,
        "position_embedding_type": "absolute",
        "transformers_version": "4.21.0",
        "type_vocab_size": 1,
        "use_cache": true,
        "vocab_size": 50265
        """
        # specify the parameter for the classifier
        n_input = self.config.hidden_size
        n_hidden = 50
        p_ = 0.2

        # add dense layers to act as the classifier
        self.classifier = nn.Linear(n_input, n_hidden)

        self.predict = torch.nn.Linear(n_hidden, num_labels)   # predicted output
        self.get_var = torch.nn.Linear(n_hidden, num_labels)   # predicted variance

        # execute when freeze_bert=True
        if freeze_bert:
            for param in self.bert.parameters():
                param.requires_grad = False
        else:
            for param in self.bert.parameters():
                param.requires_grad = True
    
    def forward(self, input_ids, attention_mask):
        # get the outputs of the roberta model
        bert_outputs = self.bert(input_ids=input_ids,attention_mask=attention_mask)

        # extract the last hidden state of the token `[CLS]` for classification task
        last_hidden_state_outputs = bert_outputs[0][:,0,:]

        # Feed input to classifier to compute results(one number for each class)
        x = last_hidden_state_outputs  # 64*768
        x = self.classifier(x)  # 64*50
        x = F.relu(x)  # 64*50
        x = F.dropout(x, p=0.2)  # 64*50
        
        logits = self.predict(x)  # logits layer 64*2
        if self.aleatoric_use:  # consider aleatoric uncertainty --> two sets of output
          sigma = self.get_var(x)  # uncertainty layer 64*2
          return logits, sigma
        else:
          return logits

In [None]:
# model epistemic uncertainty
model_epis = RoBertaClassifier(freeze_bert=True, aleatoric_use=False, num_labels=num_classes_)
model_epis

Some weights of the model checkpoint at roberta-base were not used when initializing RobertaModel: ['lm_head.bias', 'lm_head.layer_norm.weight', 'lm_head.decoder.weight', 'lm_head.dense.weight', 'lm_head.layer_norm.bias', 'lm_head.dense.bias']
- This IS expected if you are initializing RobertaModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


RoBertaClassifier(
  (bert): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(50265, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0): RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((7

In [None]:
def aleatoric_loss_function(labels, pred_y, sigma):

  N = labels.size(0)
  logvar = torch.log(sigma**2)
  loss = torch.sum(0.5*(torch.exp((-1)*logvar)) * (labels - pred_y)**2 + 0.5*logvar)
  loss = loss/N
  return loss

In [None]:
def apply_dropout(m):
  if type(m) == F.dropout:  # type(m) == nn.Dropout or
    m.train()

# net.eval()
# net.apply(apply_dropout)

In [None]:
def train_epistemic(num_epochs, model, train_dataloader, dev_dataloader):  

    # loss_fn = nn.CrossEntropyLoss()  # = softmax+NLLLOSS
    loss_fn = nn.NLLLoss()  # create loss function object
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)  # create the optimizer
    
    # Store the data for plotting
    X = []
    train_losses_set = []
    train_accuracy_set = []
    dev_losses_set = []
    dev_accuracy_set = []
    train_uncertainty_set = []
    dev_uncertainty_set = []
    
    for e in range(num_epochs):
        # Track performance on the training set as we are learning...
        total_correct = 0
        total_trained = 0
        train_losses = []
        train_epis_uncertainty = []

        model.train()  # Put the model in training mode.

        # batch_atten(atten_mask) is also used to compute the model output
        for i, (batch_input_ids, batch_atten, batch_labels) in enumerate(train_dataloader):

            optimizer.zero_grad()  # Reset the optimizer

            prob_total = torch.zeros((num_samples, batch_labels.size(0), num_classes_)) # 10*64*2
            
            # Sample from the model distribution
            for t in range(num_samples):
              # get the logit output
              logit = model(batch_input_ids, batch_atten) # 64*2
              # transform the logit into probability
              prob_total[t] = F.softmax(logit, dim=1) # 64*2
            
            # compute the mean over 10 model results: p_c
            prob_ave = torch.mean(prob_total, 0)  # 64*2

            # Compute the loss for the current batch of data 
            batch_loss = loss_fn(torch.log(prob_ave), batch_labels) # torch.log()-->ln
            batch_epis_uncertainty = torch.mean(torch.sum((-1)*prob_ave*torch.log(prob_ave),dim=1)) # the cross entropy of p_c itself --> epstiemic uncertainty
            print("my batch_loss:{}".format(batch_loss))
            print("batch_epis_uncertainty:{}".format(batch_epis_uncertainty))

            # Perform back propagation to compute the gradients with respect to each weight
            batch_loss.backward()

            # Update the weights using the compute gradients
            optimizer.step()

            # Record the loss from this sample to keep track of progress.
            train_losses.append(batch_loss.item())
            train_epis_uncertainty.append(batch_epis_uncertainty.item())

            # Count correct labels so we can compute accuracy on the training set
            pred_y = torch.max(prob_ave, 1)[1].data.numpy()
            total_correct += (pred_y == batch_labels.data.numpy()).sum().item()
            total_trained += batch_labels.size(0)

        train_accuracy = total_correct/total_trained*100

        print("Epoch: {}/{}".format((e+1), num_epochs),
              "Training Loss: {:.4f}".format(np.mean(train_losses)),
              "Training Uncertainty: {:.4f}".format(np.mean(train_epis_uncertainty)),
              "Training Accuracy: {:.4f}%".format(train_accuracy))
        
        X.append(e+1)
        train_losses_set.append(np.mean(train_losses))
        train_accuracy_set.append(train_accuracy)
        train_uncertainty_set.append(np.mean(train_epis_uncertainty))  # save but not using

        
        if e!=4:
          continue
          
        model.eval()  # Switch model to evaluation mode
        model.apply(apply_dropout) # Keep the dropout layer open when testing
        
        total_correct = 0
        total_trained = 0
        dev_losses = []
        dev_epis_uncertainties = []

        for dev_input_ids, dev_atten, dev_labels in dev_dataloader:
            
            dev_prob_total = torch.zeros((num_samples, dev_labels.size(0), num_classes_))

            for t in range(num_samples):
              dev_logit = model(dev_input_ids,dev_atten)
              dev_prob_total[t] = F.softmax(dev_logit, dim=1)
            dev_prob_ave = torch.mean(dev_prob_total, 0)

            # Compute the loss for the development data
            dev_loss = loss_fn(torch.log(dev_prob_ave), dev_labels) # dev_loss computed by cross entropy
            dev_epis_uncertainty = torch.mean(torch.sum((-1)*dev_prob_ave*torch.log(dev_prob_ave),dim=1))

            # Save the loss on the dev set
            dev_losses.append(dev_loss.item())
            dev_epis_uncertainties.append(dev_epis_uncertainty.item())

            # Count the number of correct predictions
            dev_output_y = torch.max(dev_prob_ave, 1)[1].data.numpy()
            # predicted_labels = dev_output_y.argmax(1)
            total_correct += (dev_output_y == dev_labels.data.numpy()).sum().item()
            total_trained += dev_labels.size(0)
            
        dev_accuracy = total_correct/total_trained*100
        
        print("Epoch: {}/{}".format((e+1), num_epochs),
              "Validation Loss: {:.4f}".format(np.mean(dev_losses)),
              "Validation Uncertainty: {:.4f}".format(np.mean(dev_epis_uncertainties)),
              "Validation Accuracy: {:.4f}%".format(dev_accuracy))
        
        dev_losses_set.append(np.mean(dev_losses))
        dev_accuracy_set.append(dev_accuracy)
        dev_uncertainty_set.append(np.mean(dev_epis_uncertainties))  # save but not using
    
    # Plotting to show the accuracy and loss
    plt.figure()
    plt.plot(X,train_accuracy_set,'ob--',label='Train Accuracy')
    plt.plot(X,dev_accuracy_set,'or--',label='Validation Accuracy')
    plt.legend()
    plt.xlabel('Number of epochs')
    plt.ylabel('Accuracy(%)')
    
    plt.figure()
    plt.plot(X,train_losses_set,'ob--',label='Train Loss')
    plt.plot(X,dev_losses_set,'or--',label='Validation Loss')
    plt.legend()
    plt.xlabel('Number of epochs')
    plt.ylabel('Loss')

    plt.figure()
    plt.plot(X,train_uncertainty_set,'ob--',label='Train Loss')
    plt.plot(X,dev_uncertainty_set,'or--',label='Validation Loss')
    plt.legend()
    plt.xlabel('Number of epochs')
    plt.ylabel('Epsitemic Uncertainty')
    
    return model

In [None]:
# model epistemic uncertainty
trained_model = train_epistemic(num_epochs=num_epochs, model=model_epis, train_dataloader=train_loader, dev_dataloader=val_loader)

my batch_loss:0.6754757761955261
batch_epis_uncertainty:0.6904257535934448
my batch_loss:0.6756528615951538
batch_epis_uncertainty:0.6904394030570984
my batch_loss:0.6864073276519775
batch_epis_uncertainty:0.6910537481307983
my batch_loss:0.6924782395362854
batch_epis_uncertainty:0.690927267074585
my batch_loss:0.6659243702888489
batch_epis_uncertainty:0.6905658841133118
my batch_loss:0.6685757637023926
batch_epis_uncertainty:0.6911106705665588
my batch_loss:0.6910408735275269
batch_epis_uncertainty:0.6907455325126648
my batch_loss:0.6840200424194336
batch_epis_uncertainty:0.690031886100769
my batch_loss:0.6833774447441101
batch_epis_uncertainty:0.6891690492630005
my batch_loss:0.6890801787376404
batch_epis_uncertainty:0.6896067261695862
my batch_loss:0.692340075969696
batch_epis_uncertainty:0.6902782917022705
my batch_loss:0.6918727159500122
batch_epis_uncertainty:0.6899918913841248
my batch_loss:0.6702067255973816
batch_epis_uncertainty:0.690862238407135
my batch_loss:0.6873812675476

In [None]:
# # for test
# for i, (batch_input_ids, batch_atten, batch_labels) in enumerate(train_loader):
#   golden_labels = batch_labels.view(-1,1)
#   y_hat = torch.zeros(batch_size, num_classes_)
#   y_hat = y_hat.scatter(1, golden_labels, 1)
#   print(y_hat.size(0))
#   break

64


In [None]:
def predict_nn(trained_model, test_loader):

    trained_model.eval() # Switch model to evaluation mode 
    trained_model.apply(apply_dropout) # Keep the dropout layer open when testing

    correct = 0  # count the number of correct classification labels

    gold_labs = []  # gold labels to return
    pred_labs = []  # predicted labels to return

    for inputs, atten, labels in test_loader:
      test_prob_total = torch.zeros((5, labels.size(0), num_classes_))
      for i in range(5):
        test_mu = trained_model(inputs, atten)
        test_prob_total[i]=F.softmax(test_mu, dim=1)
      test_prob_avg = torch.mean(test_prob_total, 0)
      
      # take the label with biggest output as the predicted label
      predicted_labels = torch.max(test_prob_avg, 1)[1].data.numpy()

      gold_labs.extend(labels.tolist())
      pred_labs.extend(predicted_labels.tolist())
    
    return gold_labs, pred_labs

In [None]:
gold_labs, pred_labs = predict_nn(trained_model, test_loader)

# classification report
from sklearn.metrics import classification_report, confusion_matrix
import pandas as pd
target_names = ['not irony', 'irony']
print("The classification report is:")
print(classification_report(gold_labs, pred_labs,target_names=target_names))

# confusion matrix
cm = confusion_matrix(gold_labs, pred_labs)
df_cm = pd.DataFrame(cm, index=target_names, columns=target_names)
print("The confusion matrix is:")
print(df_cm)