In [1]:
# Install dependencies
%pip install torch==1.11.0+cu113 torchdata==0.3.0 torchtext==0.12.0 -f https://download.pytorch.org/whl/cu113/torch_stable.html
%pip install ipywidgets transformers tqdm

Looking in links: https://download.pytorch.org/whl/cu113/torch_stable.htmlNote: you may need to restart the kernel to use updated packages.


ERROR: Could not find a version that satisfies the requirement torch==1.11.0+cu113 (from versions: 2.0.0, 2.0.1, 2.1.0, 2.1.1, 2.1.2, 2.2.0, 2.2.1)
ERROR: No matching distribution found for torch==1.11.0+cu113



Note: you may need to restart the kernel to use updated packages.


In [2]:
import torch
import torchtext

# Set a fixed value for the random seed to ensure reproducible results
SEED = 1234
# Determine whether a CUDA-compatible GPU is available, and use it if so; otherwise, use the CPU
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Apply the fixed random seed to PyTorch to ensure consistent initialization and random operations
torch.manual_seed(SEED)
# Ensure that any operations performed by cuDNN (a GPU-acceleration library used by PyTorch) are deterministic,
# which can help in reproducing results but may reduce performance
torch.backends.cudnn.deterministic = True


print("PyTorch Version: ", torch.__version__)
print("torchtext Version: ", torchtext.__version__)
print(f"Using {'GPU' if str(DEVICE) == 'cuda' else 'CPU'}.")

PyTorch Version:  2.2.1+cu118
torchtext Version:  0.17.1+cpu
Using GPU.


In [3]:
%pip install ipywidgets
# %jupyter nbextension enable --py widgetsnbextension


Note: you may need to restart the kernel to use updated packages.


In [4]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

tokenizer = AutoTokenizer.from_pretrained("roberta-base")

In [5]:
len(tokenizer.vocab)

50265

In [6]:
# tokens = tokenizer.tokenize('Hello WORLD how ARE yoU?')

# print(tokens)

In [7]:
# original input string
print(tokenizer(['hello world']))

# input string with tab (\t) character
print(tokenizer(['hello	world']))

# input string with newline (\n) character
print(tokenizer(['''
    hello
    world
''']))

{'input_ids': [[0, 42891, 232, 2]], 'attention_mask': [[1, 1, 1, 1]]}
{'input_ids': [[0, 42891, 50117, 8331, 2]], 'attention_mask': [[1, 1, 1, 1, 1]]}
{'input_ids': [[0, 50118, 1437, 1437, 1437, 20760, 50118, 1437, 1437, 1437, 232, 50118, 2]], 'attention_mask': [[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]}


In [8]:
print(tokenizer(['hello, world!']))

{'input_ids': [[0, 42891, 6, 232, 328, 2]], 'attention_mask': [[1, 1, 1, 1, 1, 1]]}


In [9]:

# Print only the 'input_ids'
print(tokenizer(['hello world ðŸ‘‹'])['input_ids'])

# Use f-string for formatting (Python 3.6+) to access the token corresponding to id 100
token_with_id_100 = list(tokenizer.get_vocab().keys())[list(tokenizer.get_vocab().values()).index(100)]
print(f"Token with id 100: {token_with_id_100}")

## Or, if you're using an older version of Python, use the .format() method
#print("Token with id 100: {}".format(token_with_id_100))


[[0, 42891, 232, 26964, 13859, 2]]
Token with id 100: I


In [10]:
# tokens = tokenizer.tokenize('Hello WORLD how ARE yoU?')

# indexes = tokenizer.convert_tokens_to_ids(tokens)

# print(indexes)

In [11]:
init_token = tokenizer.cls_token
eos_token = tokenizer.sep_token
pad_token = tokenizer.pad_token
unk_token = tokenizer.unk_token

print(init_token, eos_token, pad_token, unk_token)

<s> </s> <pad> <unk>


In [12]:
init_token_idx = tokenizer.convert_tokens_to_ids(init_token)
eos_token_idx = tokenizer.convert_tokens_to_ids(eos_token)
pad_token_idx = tokenizer.convert_tokens_to_ids(pad_token)
unk_token_idx = tokenizer.convert_tokens_to_ids(unk_token)

print(init_token_idx, eos_token_idx, pad_token_idx, unk_token_idx)

0 2 1 3


In [13]:
init_token_idx = tokenizer.cls_token_id
eos_token_idx = tokenizer.sep_token_id
pad_token_idx = tokenizer.pad_token_id
unk_token_idx = tokenizer.unk_token_id

print(init_token_idx, eos_token_idx, pad_token_idx, unk_token_idx)

0 2 1 3


In [14]:
max_input_length = tokenizer.max_model_input_sizes['roberta-base']

print(max_input_length)

512


In [15]:
from torchtext.vocab import vocab
from torchtext.data.utils import get_tokenizer

class TransformerTokenizer(torch.nn.Module):
    def __init__(self, tokenizer):
        super().__init__()  # Initialize the superclass (torch.nn.Module)
        self.tokenizer = tokenizer  # Store the tokenizer object for later use
    
    def forward(self, input):
        if isinstance(input, list):
            tokens = [] 
            for text in input:  # Iterate over each string in the input list
                tokens.append(self.tokenizer.tokenize(text))
            return tokens  # Return the list of lists of tokens
        elif isinstance(input, str):
            return self.tokenizer.tokenize(input)
        raise ValueError(f"Type {type(input)} is not supported.")
        
tokenizer_vocab = vocab(tokenizer.vocab, min_freq=0)

We will then define our text processing pipeline.

1. First we use the tokenizer to tokenize the text.
2. Then we convert each token to its vocabulary ID.
3. We will then cut the text to a maximum length. Note that the actual length we truncate to is 2 tokens shorter than the maximum length allowed by the model. This is because we will add two more tokens, one at the begginning and one at the end.
4. Add the Beginning of Sentence token a the beginning.
5. Add the End of Sentence token at the end.
6. Convert to tensor and pad

In [16]:
import torchtext.transforms as T

text_transform = T.Sequential(
    TransformerTokenizer(tokenizer),  # Tokenize
    T.VocabTransform(tokenizer_vocab),  # Conver to vocab IDs
    T.Truncate(max_input_length - 2),  # Cut to max length to add BOS and EOS token
    T.AddToken(token=tokenizer_vocab[init_token], begin=True),  # BOS token
    T.AddToken(token=tokenizer_vocab[eos_token], begin=False),  # EOS token
    T.ToTensor(padding_value=tokenizer_vocab[pad_token]),  # Convert to tensor and pad
)

print(text_transform)

Sequential(
  (0): TransformerTokenizer()
  (1): VocabTransform(
    (vocab): Vocab()
  )
  (2): Truncate()
  (3): AddToken()
  (4): AddToken()
  (5): ToTensor()
)


In [17]:
from torchtext.datasets import IMDB
from torchtext.data.functional import to_map_style_dataset
from datasets import load_dataset

# using the split version of the dataset
dataset = load_dataset("dair-ai/emotion", "split")

train_data = to_map_style_dataset(dataset["train"])
valid_data = to_map_style_dataset(dataset["validation"])
test_data = to_map_style_dataset(dataset["test"])

In [18]:
# from torch.utils.data import random_split

print("Full train data:", len(train_data))
print("Full val data:", len(valid_data))
print("Full test data:", len(test_data))

Full train data: 16000
Full val data: 2000
Full test data: 2000


In [19]:
from collections import OrderedDict

labels = ['sadness', 'joy', 'love', 'anger', 'fear', 'surprise']
# labels = ['sadness', 'joy']
label_vocab = vocab(OrderedDict([(label, 1) for label in labels])) #the frequency for each label is set to 1

# default_index = -1
# label_vocab.set_default_index(default_index)
# # assign default unknown token
# label_vocab.set_default_index(vocab['<unk>'])

In [20]:
print(label_vocab.get_stoi())

{'anger': 3, 'sadness': 0, 'joy': 1, 'love': 2, 'fear': 4, 'surprise': 5}


In [21]:
label_transform = T.Sequential(
    T.LabelToIndex(label_vocab.get_itos()),  # Convert to integer
    T.ToTensor(),  # Convert to tensor
)

In [22]:
import torch
from torch.utils.data import DataLoader

BATCH_SIZE = 64

def collate_batch(batch):
    labels, texts = zip(*batch)

    new_labels = []
    for x in labels:
        if x == "sadness":
            new_labels.append(0)
        elif x == "joy":
            new_labels.append(1)
        elif x == "love":
            new_labels.append(2)
        elif x == "anger":
            new_labels.append(3)
        elif x == "fear":
            new_labels.append(4)
        elif x == "surprise":
            new_labels.append(5)
    
    labels = label_transform(new_labels)
    
    texts = text_transform(list(texts))

    return labels.float().to(DEVICE), texts.to(DEVICE)

def _get_dataloader(data):
    return DataLoader(data, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)

train_dataloader:DataLoader = _get_dataloader(train_data)
valid_dataloader = _get_dataloader(valid_data)
test_dataloader = _get_dataloader(test_data)

In [23]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

roberta = AutoModelForSequenceClassification.from_pretrained("roberta-base")

Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [24]:
import torch.nn as nn

class RoBERTaGRUSentiment(nn.Module):
    def __init__(self, roberta, hidden_dim, output_dim, n_layers, bidirectional, dropout):        
        super().__init__()
        
        self.roberta = roberta

        # roberta's hidden_size is 768 by default
        self.embedding_dim = roberta.config.to_dict()['hidden_size']

        print("self.embedding_dim", self.embedding_dim)
        
        self.rnn = nn.GRU(self.embedding_dim,
                          hidden_dim,
                          num_layers = n_layers,
                          bidirectional = bidirectional,
                          batch_first = True,
                          dropout = 0 if n_layers < 2 else dropout)
        
        self.out = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        
        self.dropout = nn.Dropout(dropout)

        
    def forward(self, text):

        with torch.no_grad():
            embedded = self.roberta(text)[0]
        
        _, hidden = self.rnn(embedded)
                

        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1))

        print("self.out(hidden)", self.out(hidden))

        # hidden has shape 2
        return self.out(hidden)

In [166]:
# import torch
# import torch.nn as nn
# from transformers import AutoModel, AutoTokenizer

# class RoBERTaGRUClassifier(nn.Module):
#     def __init__(self, num_labels, hidden_size=768, gru_hidden_size=128):
#         super(RoBERTaGRUClassifier, self).__init__()
#         self.roberta = roberta
#         self.gru = nn.GRU(input_size=hidden_size, hidden_size=gru_hidden_size, batch_first=True)
#         self.fc = nn.Linear(gru_hidden_size, num_labels)

#     def forward(self, input_ids, attention_mask=None):
#         # Get RoBERTa embeddings
#         outputs = self.roberta(input_ids)
#         last_hidden_state = outputs['last_hidden_state']  # Shape: (batch_size, seq_len, hidden_size)

#         # Pool the last hidden state (CLS token)
#         pooled_output = last_hidden_state[:, 0, :]  # Shape: (batch_size, hidden_size)

#         # Pass through GRU
#         gru_output, _ = self.gru(pooled_output.unsqueeze(1))
#         gru_output = gru_output.squeeze(1)  # Shape: (batch_size, gru_hidden_size)

#         # Classification layer
#         logits = self.fc(gru_output)  # Shape: (batch_size, num_labels)
#         return logits

In [167]:
HIDDEN_DIM = 128  # 254 is better, less than 64 is no very favourable.
OUTPUT_DIM = 6  # We only need one neuron as output
N_LAYERS = 1
BIDIRECTIONAL = True
DROPOUT = 0.25

# model = RoBERTaGRUClassifier(roberta, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS, BIDIRECTIONAL, DROPOUT)
model = RoBERTaGRUClassifier(6)

In [168]:
#parameters refer to the internal variables that are learned during the training process. 
#For a neural network model, 
#parameters typically include weights and biases associated with each layer of the network.
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 345,606 trainable parameters


In [169]:
for name, param in model.named_parameters():                
    if name.startswith('roberta'):
        param.requires_grad = False

In [170]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'The model has {count_parameters(model):,} trainable parameters')

The model has 345,606 trainable parameters


In [171]:
for name, param in model.named_parameters():                
    if param.requires_grad:
        print(name)

gru.weight_ih_l0
gru.weight_hh_l0
gru.bias_ih_l0
gru.bias_hh_l0
fc.weight
fc.bias


In [172]:
import torch.optim as optim

optimizer = optim.Adam(model.parameters())

In [173]:
criterion = nn.BCEWithLogitsLoss()

In [174]:
model = model.to(DEVICE)
criterion = criterion.to(DEVICE)

In [175]:
def binary_accuracy(preds, y):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """

    #round predictions to the closest integer
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds == y).float() #convert into float for division 
    acc = correct.sum() / len(correct)
    return acc

In [176]:
from tqdm import tqdm

def train(model, iterator:DataLoader, optimizer, criterion):    
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in tqdm(iterator, desc="\tTraining"):
        optimizer.zero_grad()
                
        labels, texts = batch  # Note that this has to match the order in collate_batch

        new_labels = []
        for x in labels:
            if x == "sadness":
                new_labels.append(0)
            elif x == "joy":
                new_labels.append(1)
            elif x == "love":
                new_labels.append(2)
            elif x == "anger":
                new_labels.append(3)
            elif x == "fear":
                new_labels.append(4)
            elif x == "surprise":
                new_labels.append(5)

        predictions = model(texts).squeeze(1)
        loss = criterion(predictions, new_labels)

        ################################# binary???
        acc = binary_accuracy(predictions, new_labels)
        
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [177]:
from tqdm import tqdm

def evaluate(model, iterator, criterion):
    epoch_loss = 0
    epoch_acc = 0
    
    model.eval()
    
    with torch.no_grad():
        for batch in tqdm(iterator, desc="\tEvaluation"):
            labels, texts = batch  # Note that this has to match the order in collate_batch

            new_labels = []
            for x in labels:
                if x == "sadness":
                    new_labels.append(0)
                elif x == "joy":
                    new_labels.append(1)
                elif x == "love":
                    new_labels.append(2)
                elif x == "anger":
                    new_labels.append(3)
                elif x == "fear":
                    new_labels.append(4)
                elif x == "surprise":
                    new_labels.append(5)
                

            predictions = model(texts).squeeze(1)
            loss = criterion(predictions, new_labels)
            acc = binary_accuracy(predictions, new_labels)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
        
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [178]:
import time

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [179]:
N_EPOCHS = 5

best_valid_loss = float('inf')
print(f"Using {'GPU' if str(DEVICE) == 'cuda' else 'CPU'} for training.")

for epoch in range(N_EPOCHS):

    print(f'Epoch: {epoch+1:02}')
    start_time = time.time()
    
    train_loss, train_acc = train(model, train_dataloader, optimizer, criterion)
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    
    valid_loss, valid_acc = evaluate(model, valid_dataloader, criterion)
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. Acc: {valid_acc*100:.2f}%')
    
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'transformer-model.pt')

Using GPU for training.
Epoch: 01


	Training:   0%|          | 0/250 [00:00<?, ?it/s]


KeyError: 'last_hidden_state'

In [None]:
model.load_state_dict(torch.load('transformer-model.pt'))

# If you want to load a model trained on a GPU, but the current device is on CPU, then you need to explicitly state that
# >>> model.load_state_dict(torch.load('tut6-model.pt', map_location=torch.device('cpu')))

<All keys matched successfully>

In [None]:
test_loss, test_acc = evaluate(model, test_dataloader, criterion)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

	Evaluation: 100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 16/16 [00:22<00:00,  1.42s/it]

Test Loss: 0.614 | Test Acc: 68.95%





## Inference

We'll then use the model to test the sentiment of some sequences. We tokenize the input sequence, trim it down to the maximum length, add the special tokens to either side, convert it to a tensor, add a fake batch dimension and then pass it through our model.

In [None]:
def predict_sentiment(model, sentence):
    model.eval()
    processed_sentence = text_transform([sentence]).to(DEVICE)
    prediction = torch.sigmoid(model(processed_sentence))
    return prediction.item()

In [None]:
predict_sentiment(model, "This film is terrible")

0.2504885196685791

In [None]:
predict_sentiment(model, "This film is great")

0.9023417234420776