In [1]:
from transformers import RobertaForSequenceClassification
from transformers import AdamW
import torch
from torch.nn import CrossEntropyLoss
from torch.utils.data import DataLoader, TensorDataset
import os
from d2l import torch as d2l
from sklearn.metrics import accuracy_score

In [2]:
data_dir = "data/aclImdb"

In [3]:
#@save
def read_imdb(data_dir, is_train):
    """Read the IMDb review dataset text sequences and labels."""
    ### YOUR CODE HERE
    data = []
    labels = []
    
    dir = 'train' if is_train else 'test'
    
    for label in ['pos', 'neg']:
        # Directory path for each postive and negative
        dir_path = os.path.join(data_dir, dir, label)
        for file_name in os.listdir(dir_path):
            file_path = os.path.join(dir_path, file_name)
            with open(file_path, 'r', encoding='utf-8') as f:
                review = f.read()
                data.append(review)
                # Assign label = 1 for positive and 0 for negative
                labels.append(1 if label == 'pos' else 0)
    ### END OF YOUR CODE
    return data, labels


In [4]:
#@save
def load_data_imdb(batch_size, num_steps=500):
    """Return data iterators and the vocabulary of the IMDb review dataset."""
    ### YOUR CODE HERE
    train_data = read_imdb(data_dir, is_train=True)
    test_data = read_imdb(data_dir, is_train=False)

    train_tokens = d2l.tokenize(train_data[0], token='word')
    train_vocab = d2l.Vocab(train_tokens, min_freq=5, reserved_tokens=['<pad>'])

    test_tokens = d2l.tokenize(test_data[0], token='word')
    test_vocab = d2l.Vocab(test_tokens, min_freq=5, reserved_tokens=['<pad>'])

    vocab = d2l.Vocab(train_tokens, min_freq=5, reserved_tokens=['<pad>'])

    train_features = torch.tensor([d2l.truncate_pad(vocab[line], num_steps, vocab['<pad>']) for line in train_tokens])
    test_features = torch.tensor([d2l.truncate_pad(vocab[line], num_steps, vocab['<pad>']) for line in test_tokens])
    
    train_labels = torch.tensor(train_data[1], dtype=torch.float32)
    test_labels = torch.tensor(test_data[1], dtype=torch.float32)

    train_dataset = TensorDataset(train_features, train_labels)
    test_dataset = TensorDataset(test_features, test_labels)

    train_iter = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_iter = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

    ### END OF YOUR CODE
    return train_iter, test_iter, vocab

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else "cpu")
print(device)
model_name = 'roberta-base' # Can change to other models of roberta
max_len = 512 
batch_size = 4
epochs = 3
learning_rate = 1e-6
eps = 1e-8

train_iter, test_iter, vocab = load_data_imdb(batch_size)

test_accuracies = []

  # Load RoBERTa model with classification head (2 classes for sentiment)
model = RobertaForSequenceClassification.from_pretrained(model_name, num_labels=2)
model.to(device)

optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate, eps=eps)

criterion = CrossEntropyLoss()

print("Starting Training")

for epochs in range(0, epochs):
  model.train()
  total_loss = 0.0
  total_num = len(train_iter)
  correct = 0
  for features, labels in train_iter:
      optimizer.zero_grad()
      # features, labels = features.to(device), labels.to(device)

      # out = model(features)
      # loss = criterion(out, labels.long())
      input_ids = features.to(device)
      attention_mask = (features != vocab['<pad>']).to(device)
      labels = labels.to(device, dtype=torch.long)

      # Forward pass
      outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
      loss = outputs.loss
      logits = outputs.logits

      total_loss += loss.item()

      loss.backward()
      optimizer.step()

      _, predicted = torch.max(logits, 1)
      total_num += labels.size(0)
      correct += (predicted == labels).sum().item()
    
  train_acc = 100 * correct/total_num
  #train_accuracies.append(train_acc)
  #train_losses.append(total_loss)
  print(f"Epoch {epochs}; Training accuracy: {train_acc:.2f}%")

  model.eval()  
  correct = 0
  total_num = 0

  predictions, true_labels = [], []

  with torch.no_grad():  
      for features, labels in test_iter:
      # Move data to device
          input_ids = features.to(device)
          attention_mask = (features != vocab['<pad>']).to(device)
          labels = labels.to(device)

          # Forward pass
          outputs = model(input_ids, attention_mask=attention_mask)
          logits = outputs.logits
          predictions.extend(torch.argmax(logits, dim=-1).cpu().numpy())
          true_labels.extend(labels.cpu().numpy())

  accuracy = accuracy_score(true_labels, predictions)
  print(f"Test Accuracy: {accuracy:.4f}")