# Text classification with A Transformer

The following notebook shows how to perform text classification with an LSTM, or Long Short Term Memory, network: a special case of an RNN.

Here we utilize learned embeddings, which we initialize with pretrained embeddings, specifically 50-dimensional GloVe.

In [136]:
import torch

First we read in the pretrained embeddings. Looking at the first few lines of the file, we can see that each line is the word followed by numbers in each dimesnion of the embedding, all of which are separated by spaces

In [137]:
read_file = open('glove.6B.50d.txt', "r", encoding="utf8")
lines = read_file.readlines()
print_first = 5
i = 1
for line in lines:
  if i > print_first:
    break
  print(line)
  print()
  i += 1

the 0.418 0.24968 -0.41242 0.1217 0.34527 -0.044457 -0.49688 -0.17862 -0.00066023 -0.6566 0.27843 -0.14767 -0.55677 0.14658 -0.0095095 0.011658 0.10204 -0.12792 -0.8443 -0.12181 -0.016801 -0.33279 -0.1552 -0.23131 -0.19181 -1.8823 -0.76746 0.099051 -0.42125 -0.19526 4.0071 -0.18594 -0.52287 -0.31681 0.00059213 0.0074449 0.17778 -0.15897 0.012041 -0.054223 -0.29871 -0.15749 -0.34758 -0.045637 -0.44251 0.18785 0.0027849 -0.18411 -0.11514 -0.78581


, 0.013441 0.23682 -0.16899 0.40951 0.63812 0.47709 -0.42852 -0.55641 -0.364 -0.23938 0.13001 -0.063734 -0.39575 -0.48162 0.23291 0.090201 -0.13324 0.078639 -0.41634 -0.15428 0.10068 0.48891 0.31226 -0.1252 -0.037512 -1.5179 0.12612 -0.02442 -0.042961 -0.28351 3.5416 -0.11956 -0.014533 -0.1499 0.21864 -0.33412 -0.13872 0.31806 0.70358 0.44858 -0.080262 0.63003 0.32111 -0.46765 0.22786 0.36034 -0.37818 -0.56657 0.044691 0.30392


. 0.15164 0.30177 -0.16763 0.17684 0.31719 0.33973 -0.43478 -0.31086 -0.44999 -0.29486 0.16608 0.11963 -0.41328 -0.4

You're probably wondering what these numbers mean. For a nice explanation, check out [this medium article](https://towardsdatascience.com/light-on-math-ml-intuitive-guide-to-understanding-glove-embeddings-b13b4f19c010), or if you're brave enough, check out the [original paper](https://nlp.stanford.edu/pubs/glove.pdf) 


Now we need to put the above embeddings in a format where our neural network can access each words's embedding and modify them during the course of its training. Here is a toy example of what we want to do:

In [138]:
# toy example of how we access the embeddings:

from torch import nn

weight = torch.FloatTensor([[1, 2.3, 3], [4, 5.1, 6.3]]) # weight is a FloatTensor containing pretrained weights
embedding = nn.Embedding.from_pretrained(weight)

# Get embeddings for index 1
input = torch.LongTensor([1])
embedding(input)

tensor([[4.0000, 5.1000, 6.3000]])

Now, lets do the same thing for our GloVe embeddings

In [139]:
vocab,embeddings = [],[]

with open('glove.6B.50d.txt','rt', encoding="utf8") as fi:
    full_content = fi.read() # read the file
    full_content = full_content.strip() # remove leading and trailing whitespace
    full_content = full_content.split('\n') # split the text into a list of lines

for i in range(len(full_content)):
    i_word = full_content[i].split(' ')[0] # get the word at the start of the line
    i_embeddings = [float(val) for val in full_content[i].split(' ')[1:]] # get the embedding of the word in an array
    # add the word and the embedding to our lists
    vocab.append(i_word)
    embeddings.append(i_embeddings)

# convert our lists to numpy arrays:
import numpy as np
vocab_npa = np.array(vocab)
embs_npa = np.array(embeddings)

Adding tokens and their corresponding embeddings to handle padding and unknown words:

In [140]:
# insert tokens for padding and unknown words into our vocab
vocab_npa = np.insert(vocab_npa, 0, '<pad>')
vocab_npa = np.insert(vocab_npa, 1, '<unk>')
print(vocab_npa[:10])

# make embeddings for these 2:
# -> for the '<pad>' token, we set it to all zeros
# -> for the '<unk>' token, we set it to the mean of all our other embeddings

pad_emb_npa = np.zeros((1, embs_npa.shape[1])) 
unk_emb_npa = np.mean(embs_npa, axis=0, keepdims=True) 

#insert embeddings for pad and unk tokens to embs_npa.
embs_npa = np.vstack((pad_emb_npa,unk_emb_npa,embs_npa))
print(embs_npa.shape)

['<pad>' '<unk>' 'the' ',' '.' 'of' 'to' 'and' 'in' 'a']
(400002, 50)


Our embedding layer should have dimensions =  ***number of words in vocab*** $\times$ ***number of dimensions in embedding***

In [141]:
import torch
my_embedding_layer = torch.nn.Embedding.from_pretrained(torch.from_numpy(embs_npa).float())

# sanity check
assert my_embedding_layer.weight.shape == embs_npa.shape
print(my_embedding_layer.weight.shape)

torch.Size([400002, 50])


Now let's try using this layer to get the embeddings of each word in a sentence 

In [142]:
sentence = 'this is an example'
tokens = sentence.split(' ')
for token in tokens:
  token_id_in_vocab = np.where(vocab_npa == token)
  embedding = my_embedding_layer(torch.LongTensor([token_id_in_vocab]))
  print(token)
  print(embedding)
  print()

this
tensor([[[[ 5.3074e-01,  4.0117e-01, -4.0785e-01,  1.5444e-01,  4.7782e-01,
            2.0754e-01, -2.6951e-01, -3.4023e-01, -1.0879e-01,  1.0563e-01,
           -1.0289e-01,  1.0849e-01, -4.9681e-01, -2.5128e-01,  8.4025e-01,
            3.8949e-01,  3.2284e-01, -2.2797e-01, -4.4342e-01, -3.1649e-01,
           -1.2406e-01, -2.8170e-01,  1.9467e-01,  5.5513e-02,  5.6705e-01,
           -1.7419e+00, -9.1145e-01,  2.7036e-01,  4.1927e-01,  2.0279e-02,
            4.0405e+00, -2.4943e-01, -2.0416e-01, -6.2762e-01, -5.4783e-02,
           -2.6883e-01,  1.8444e-01,  1.8204e-01, -2.3536e-01, -1.6155e-01,
           -2.7655e-01,  3.5506e-02, -3.8211e-01, -7.5134e-04, -2.4822e-01,
            2.8164e-01,  1.2819e-01,  2.8762e-01,  1.4440e-01,  2.3611e-01]]]])

is
tensor([[[[ 6.1850e-01,  6.4254e-01, -4.6552e-01,  3.7570e-01,  7.4838e-01,
            5.3739e-01,  2.2239e-03, -6.0577e-01,  2.6408e-01,  1.1703e-01,
            4.3722e-01,  2.0092e-01, -5.7859e-02, -3.4589e-01,  2.1664e-01,

We now know how to get the embedding of a word, meaning we can now feed these words to a neural network and train it to learn any task we want using text as an input! Now, let's begin working on our dataset:

The dataset we are using, `imdb_reviews.csv` is uploaded to google drive so just use the following command to download the file into the local environment.

To try using your own dataset, you could replace the drive id with the id of another file, or if you're working locally then just download the .csv and read it using `pandas.read_csv('path_to_file')`

In [143]:
import pandas as pd
from sklearn.model_selection import train_test_split

# load train.csv into a pandas dataframe
# we can use any other NLP binary classfication problem here as long as
# there are only 2 columns: 'review' and 'label'
# where review contains text and label contains 0/1

df = pd.read_csv("imdb_reviews.csv")
df.head()

train_prop = 0.7 # 70% for training set
test_prop = 0.2 # 20% for test set
val_prop = 0.1 # 10% for validation set

# split the data into training and validation sets
train_val_df, test_df = train_test_split(df, test_size=test_prop, shuffle=True, random_state=11)
df, val_df = train_test_split(train_val_df, test_size=val_prop/(train_prop+val_prop), shuffle=True, random_state=11)

# print the number of rows in each set
print(f"Train:       {len(df)} rows")
print(f"Test:        {len(test_df)} rows")
print(f"Validation:  {len(val_df)} rows")

Train:       43508 rows
Test:        12431 rows
Validation:  6216 rows


We want to be able to classify the above reviews, so that we can predict the label of a new review with our model.

First, we want to be turn our dataframe into an object of the Pytorch's Dataset class so we can easily split our data into batches and feed it to our neural network:

In [144]:
class TransformerDataset(torch.utils.data.Dataset):
    def __init__(self, df, vocab, max_seq_length, pad_token, unk_token):
        # make a list of our labels
        self.labels = df.label.tolist()

        # make a dictionary converting each word to its id in the vocab, as well
        # as the reverse lookup
        self.word2idx = {term:idx for idx,term in enumerate(vocab)}
        self.idx2word = {idx:word for word,idx in self.word2idx.items()} 
        
        self.pad_token,self.unk_token = pad_token,unk_token

        self.input_ids = [] 
        self.sequence_lens = [] 
        self.labels = []

        for i in range(df.shape[0]):
            # clean up each sentence and turn it into tensor containing the  
            # token ids of each word. Also add padding to make them all the 
            # same length as the longest sequence
            input_ids,sequence_len = self.convert_text_to_input_ids(
                df.iloc[i].review,
                pad_to_len = max_seq_length) 
            
            self.input_ids.append(input_ids.reshape(-1))
            self.sequence_lens.append(sequence_len)
            self.labels.append(df.iloc[i].label)
        
        #sanity checks
        assert len(self.input_ids) == df.shape[0]
        assert len(self.sequence_lens) == df.shape[0]
        assert len(self.labels) == df.shape[0]
    
    def convert_text_to_input_ids(self,text,pad_to_len):
        # truncate excess words (beyond the length we should pad to)
        words = text.strip().split()[:pad_to_len]

        # add padding till we've reached desired length 
        deficit = pad_to_len - len(words) 
        words.extend([self.pad_token]*deficit)

        # replace words with their id
        for i in range(len(words)):
            if words[i] not in self.word2idx:
                # if word is not in vocab, then use <unk> token
                words[i] = self.word2idx[self.unk_token] 
            else:
                # else find the id associated with the word 
                words[i] = self.word2idx[words[i]] 
        return torch.Tensor(words).long(),pad_to_len - deficit

    def __len__(self):
        # Make dataset compatible with len() function
        return len(self.input_ids)
    
    def __getitem__(self, i):
        # for the ith indexm return a dictionary containing id, length and label
        sample_dict = dict()
        sample_dict['input_ids'] = self.input_ids[i].reshape(-1)
        sample_dict['sequence_len'] = torch.tensor(self.sequence_lens[i]).long()
        sample_dict['labels'] = torch.tensor(self.labels[i]).type(torch.FloatTensor)
        return sample_dict

Defining the Transformer (Encoder) model:

In [145]:
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

class TransformerEncoder(torch.nn.Module):
    def __init__(self, config):
        super(TransformerEncoder, self).__init__()
        
        # use the pretrained embeddings and check whether or not we should
        # freeze embeddings from our config dict
        pretrained_embeddings = config['pretrained_embeddings'] if 'pretrained_embeddings' in config else None
        freeze_embeddings = config['freeze_embeddings'] if 'freeze_embeddings' in config else False
        if pretrained_embeddings is not None:
            # use pretrained embeddings
            self.vocab_size = pretrained_embeddings.shape[0]
            self.embedding_dim = pretrained_embeddings.shape[1]
            self.embedding = torch.nn.Embedding.from_pretrained(
                torch.from_numpy(pretrained_embeddings).float(),
                freeze=freeze_embeddings
                )
        else:
            # use randomly initialized embeddings
            assert 'vocab' in config and 'embedding_dim' in config
            self.vocab_size = config['vocab'].shape[0]
            self.embedding_dim = config['embedding_dim']
            if freeze_embeddings:
                # why would you do this?
                print(
                    'WARNING:Freezing Randomly Initialized Embeddings!!😭😭😭'
                    )
            self.embedding = torch.nn.Embedding(
                self.vocab_size,
                self.embedding_dim,
                freeze = freeze_embeddings
                )
        
        # store some values from the config 
        self.hidden_size = config['hidden_size']
        self.hidden_size_2 = config['hidden_size_2']
        self.encoder_layer_cnt = config['encoder_layer_cnt']
        self.dropout = config['dropout_prob']
        self.n_heads = config['n_heads']

        self.pos_encoder = PositionalEncoding(self.embedding_dim, self.dropout)
        encoder_layer = nn.TransformerEncoderLayer(self.embedding_dim, self.n_heads, self.hidden_size, self.dropout, batch_first=True)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, self.encoder_layer_cnt)
        self.fc1 = nn.Linear(self.embedding_dim, self.hidden_size_2)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(self.hidden_size_2, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, batch):
        x = batch['input_ids'].to(device) # lookup token ids for our inputs

        # LAYERS                          INPUT TO EACH LAYER
        x = self.embedding(x)           # torch.Size([32, 100])
        x = self.pos_encoder(x)         # torch.Size([32, 100, 50])
        x = self.transformer_encoder(x) # torch.Size([32, 100, 50])
        x = torch.mean(x, dim = 1)      # torch.Size([32, 100, 50])
        x = self.fc1(x)                  # torch.Size([32, 50])
        x = self.relu(x)
        x = self.fc2(x)                  # torch.Size([32, 50])
        x = self.sigmoid(x)             # torch.Size([32, 1])
        return x                        # FINAL = torch.Size([32, 1])
    
    def get_embedding_dims(self):
        return self.vocab_size, self.embedding_dim

In the above model, we used the functions pack_padded_sequence() and its inverse to pass input to the model

In [146]:
config = {
    #model configurations
    'batch_size':16,
    'max_seq_length':1000,
    'lr':0.0001,
    'label_count':2,
    'n_heads': 5,
    'dropout_prob': 1/50,
    'hidden_size':768,
    'encoder_layer_cnt':3,
    'hidden_size_2': 16,

    #embeddings configurations
    'pretrained_embeddings':embs_npa,
    'freeze_embeddings':True,
    'vocab':vocab_npa,
    'pad_token':'<pad>',
    'unk_token':'<unk>',

    #data
    'train_df': df, #TODO: set val and test to appropriate
    'val_df': df,
    'test_df': df,
}

In [147]:
from torch import nn, optim
from torch.utils.data import DataLoader

train_dataset = TransformerDataset(
    df = config['train_df'], 
    vocab = config['vocab'],
    max_seq_length = config['max_seq_length'],
    pad_token = config['pad_token'],
    unk_token = config['unk_token']
)
val_dataset = TransformerDataset(
    df = config['val_df'], 
    vocab = config['vocab'],
    max_seq_length = config['max_seq_length'],
    pad_token = config['pad_token'],
    unk_token = config['unk_token']
)
test_dataset = TransformerDataset(
    df = config['test_df'], 
    vocab = config['vocab'],
    max_seq_length = config['max_seq_length'],
    pad_token = config['pad_token'],
    unk_token = config['unk_token']
)

train_loader = DataLoader(train_dataset, batch_size = config['batch_size'], shuffle = True)
val_loader = DataLoader(val_dataset, batch_size = config['batch_size'], shuffle = True)
test_loader = DataLoader(test_dataset, batch_size = config['batch_size'], shuffle = True)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = TransformerEncoder(config)
model.to(device)
model.train()

loss_criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr = config['lr'])

In [148]:
from sklearn.metrics import f1_score
from sklearn.metrics import roc_auc_score
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay


def validation(model, val_loader):
    with torch.no_grad():
        all_outputs = []
        all_labels = []
        for data in val_loader:
            labels = data['labels'].to(device)
            outputs = model(data)
            all_outputs = all_outputs + torch.round(outputs.squeeze()).tolist()
            all_labels = all_labels + labels.tolist()
        
        accuracy = sum([i==j for i, j in zip(all_outputs, all_labels)]) / len(all_labels)
        f1 = f1_score(y_pred= all_outputs, y_true=all_labels)

        return accuracy, f1

def testing(model, test_loader):
    with torch.no_grad():
        all_outputs = []
        all_labels = []
        all_scores = []
        for data in test_loader:
            labels = data['labels'].to(device)
            outputs = model(data)
            all_scores = all_scores + (outputs.squeeze()).tolist()
            all_outputs = all_outputs + torch.round(outputs.squeeze()).tolist()
            all_labels = all_labels + labels.tolist()
        accuracy = sum([i==j for i, j in zip(all_outputs, all_labels)]) / len(all_labels)
        f1 = f1_score(y_pred= all_outputs, y_true=all_labels)
        roc = roc_auc_score(y_score= all_scores, y_true=all_labels)
        cm = confusion_matrix(all_labels, all_outputs)
        disp = ConfusionMatrixDisplay(confusion_matrix=cm)
        disp.plot(cmap = 'magma')
        plt.show()
        print('Test Statistics:')
        print()
        print(f"Accuracy     : {accuracy}")
        print(f"F1 score     : {f1}")
        print(f"AUC ROC score: {roc}")
        return 

In [150]:
from tqdm.notebook import tqdm

epochs = 5
for epoch in range(1, epochs + 1):
  progress_bar = tqdm(train_loader, leave=False)
  losses = []
  accuracies = []
  total = 0
  for data in progress_bar:
    target = data['labels'].to(device)
    optimizer.zero_grad()
    output = model(data)
    loss = loss_criterion(output.squeeze(), target)
    loss.backward()
    # torch.nn.utils.clip_grad_norm_(model.parameters(), 3)
    optimizer.step()
    accuracy = torch.sum(target == torch.round(output.squeeze())) / target.shape[0]

    losses.append(loss.item())
    accuracies.append(accuracy.item())
    total += 1

    progress_bar.set_description(f'Loss: {loss.item():.3f}, Train Accuracy: {accuracy:.3f}')
  
  val_accuracy, val_f1 = validation(model, val_loader)
  print(f'Epoch: {epoch}')
  print(f'Training   | Loss: {(sum(losses) / total):.4f} | Accuracy: {(sum(accuracies) / total):.2f}% ')
  print(f'Validation | F1:   {val_f1:.4f} | Accuracy: {val_accuracy:.2f}% ')

  0%|          | 0/2720 [00:00<?, ?it/s]

Epoch: 1
Training   | Loss: 0.3800 | Accuracy: 0.83% 
Validation | F1:   0.8346 | Accuracy: 0.83% 


  0%|          | 0/2720 [00:00<?, ?it/s]

Epoch: 2
Training   | Loss: 0.3795 | Accuracy: 0.83% 
Validation | F1:   0.8357 | Accuracy: 0.83% 


  0%|          | 0/2720 [00:00<?, ?it/s]

Epoch: 3
Training   | Loss: 0.3814 | Accuracy: 0.83% 
Validation | F1:   0.8350 | Accuracy: 0.83% 


  0%|          | 0/2720 [00:00<?, ?it/s]

KeyboardInterrupt: 

Now we will test our model using the test dataset:

In [None]:
testing(model, test_loader)

### Save model:

In [151]:
torch.save(model.state_dict(), r'D:\Python\Transformer_Classification\checkpoints\v2.pt')

### Load existing model:

In [149]:
model = TransformerEncoder(config)
model.to(device)
model.load_state_dict(torch.load(r'D:\Python\Transformer_Classification\checkpoints\v1.pt'))
model.train()

TransformerEncoder(
  (embedding): Embedding(400002, 50)
  (pos_encoder): PositionalEncoding(
    (dropout): Dropout(p=0.02, inplace=False)
  )
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0): TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=50, out_features=50, bias=True)
        )
        (linear1): Linear(in_features=50, out_features=768, bias=True)
        (dropout): Dropout(p=0.02, inplace=False)
        (linear2): Linear(in_features=768, out_features=50, bias=True)
        (norm1): LayerNorm((50,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((50,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.02, inplace=False)
        (dropout2): Dropout(p=0.02, inplace=False)
      )
      (1): TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=50, out_feature