In [1]:
import torch
import torchvision
import torchtext
import numpy as np
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import pandas as pd

np.random.seed(42)

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
data = pd.read_csv("data/IMDB Dataset.csv")
data.head()

Unnamed: 0,review,sentiment
0,One of the other reviewers has mentioned that ...,positive
1,A wonderful little production. <br /><br />The...,positive
2,I thought this was a wonderful way to spend ti...,positive
3,Basically there's a family where a little boy ...,negative
4,"Petter Mattei's ""Love in the Time of Money"" is...",positive


In [4]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 50000 entries, 0 to 49999
Data columns (total 2 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   review     50000 non-null  object
 1   sentiment  50000 non-null  object
dtypes: object(2)
memory usage: 781.4+ KB


In [5]:
data["review"] = data["review"].replace({"<br /><br />":""}, regex = True)
data["sentiment"].replace({"negative":0, "positive" : 1}, inplace = True)
data["sentiment"] = data["sentiment"].astype(np.int16)

In [6]:
from sklearn.model_selection import train_test_split

train_set_full, test_set = train_test_split(data, 
                                            test_size = 0.1, stratify = data["sentiment"], 
                                            random_state = 42)
train_set, valid_set = train_test_split(train_set_full, 
                                        test_size = 5000, stratify = train_set_full["sentiment"], 
                                        random_state = 42)

In [7]:
tokenizer = torchtext.data.utils.get_tokenizer("basic_english")

def generate_tokens(text_data):
    for text in text_data:
        yield tokenizer(text)

In [8]:
vocab_size = 15000

vocab = torchtext.vocab.build_vocab_from_iterator(generate_tokens(train_set["review"]), specials = ["<unk>"],
                                          max_tokens = vocab_size)
vocab.set_default_index(vocab["<unk>"])

In [9]:
max_length = max(data["review"].apply(lambda x: len(tokenizer(x))))

In [10]:
def padding(reviews, tokenizer = tokenizer, vocab = vocab, length = max_length):
    reviews = reviews.values
    encoded_text = [(vocab(tokenizer(review)).float(), label) for review, label in reviews]
    
    padded_features = [(torch.nn.functional.pad(torch.tensor(encoded_review),
                                   (0, length - len(encoded_review)),
                                   mode='constant', 
                                   value=0), label) for encoded_review, label in encoded_text]
    return padded_features

In [11]:
class MovieDataset(Dataset):
    def __init__(self, text_data, vocab, tokenizer, length):
        text_data = text_data.values
        encoded_text = [(vocab(tokenizer(review)), label) for review, label in text_data]
        padded_features = [(torch.nn.functional.pad(torch.tensor(review),
                                   (0, length - len(review)), 
                                   mode='constant', 
                                   value=0), label) for review, label in encoded_text]
        self.encoded_text = padded_features
        
    def __len__(self):
        return len(self.encoded_text)
    
    def __getitem__(self, idx):
        seq = self.encoded_text[idx][0]
        target_sentiment = self.encoded_text[idx][1] 
        return seq.clone().detach(), target_sentiment

In [12]:
batch_size = 32

train_loader = DataLoader(MovieDataset(train_set, vocab, tokenizer, max_length), 
                          batch_size = batch_size, shuffle = True)
valid_loader = DataLoader(MovieDataset(valid_set, vocab, tokenizer, max_length), batch_size = batch_size, shuffle = True)
test_loader = DataLoader(MovieDataset(test_set, vocab, tokenizer, max_length), batch_size = batch_size, shuffle = True)

In [13]:
import math

class PositionalEncoder(torch.nn.Module):
    def __init__(self, max_length, embed_size, dtype = np.float32):
        super().__init__()
        self.dropout = torch.nn.Dropout(0.2)
        assert embed_size % 2 == 0, "even size required"
        
        p, i = torch.meshgrid(2 * torch.arange(embed_size // 2), torch.arange(max_length), 
                              indexing = "ij")
        pos_embed = torch.empty(1, max_length, embed_size)
        pos_embed[0, :, ::2] = torch.sin(p / 10000 ** (i / embed_size)).T
        pos_embed[0, :, 1::2] = torch.cos(p / 10000 ** (i / embed_size)).T
        self.register_buffer("pos_embed", pos_embed)
        
    def forward(self, x):
        x = x + self.pos_embed[:x.size(0)]
        return self.dropout(x)

In [14]:
class TransformerEncoder(torch.nn.Module):
    def __init__(self, vocab_size, max_length, embed_size):
        super().__init__()
        self.embedding = torch.nn.Sequential(
            torch.nn.Embedding(vocab_size, embed_size),
            PositionalEncoder(max_length, embed_size)
        )
        self.multihead = torch.nn.MultiheadAttention(embed_size, 1, dropout = 0.2, batch_first = True)
        self.FeedForward = torch.nn.Sequential(
            torch.nn.Linear(embed_size, 128),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.3),
            torch.nn.Linear(128, embed_size),
            torch.nn.Dropout(0.4) 
        )
        self.layer_norm1 = torch.nn.LayerNorm(embed_size)
        self.layer_norm2 = torch.nn.LayerNorm(embed_size)
        self.Classification = torch.nn.Sequential(
            torch.nn.Linear(embed_size, 128),
            torch.nn.ReLU(),
            torch.nn.Dropout(0.3),
            torch.nn.Linear(128, 1),
        )
        
    def forward(self, x):
        mask = torch.not_equal(x, 0)
        x = self.embedding(x)
        skip = x
        x, _ = self.multihead(x, x, x, key_padding_mask = mask)
        x = torch.add(x, skip)
        x = self.layer_norm1(x)
        skip = x
        x = self.FeedForward(x)
        x = torch.add(x, skip)
        x = self.layer_norm2(x)
        x = x.mean(dim = 1)
        x = self.Classification(x)
        return x.squeeze()

In [15]:
model = TransformerEncoder(vocab_size, max_length, 64)
loss_fn = torch.nn.BCEWithLogitsLoss()
optimizer = torch.optim.NAdam(model.parameters(), lr = 0.001)
model = model.to(device)

In [16]:
def train(dataloader, valid_dataloader, model, loss_fn, optimizer, device, function = None):
    size = len(dataloader.dataset)
    model.train()
    train_correct = 0
    for batch, (x, y) in enumerate(dataloader):
        x, y = x.to(device), y.to(device).float()
        pred = model(x)
        loss = loss_fn(pred, y)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
            
        train_correct += ((pred > 0.5) == y).type(torch.float).sum().item()
        
        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(x)
            
            print(f"loss: {loss:>5f}  [{current:>3d}/{size:>3d}]")
    print(f"Final Training Accuracy: {(100*train_correct / size):>0.1f}%")
    model.eval()
    size = len(valid_dataloader.dataset)
    num_batches = len(valid_dataloader)
    valid_loss, correct = 0, 0
    with torch.no_grad():
        for x, y in valid_dataloader:
            x, y = x.to(device), y.to(device).float()
            pred = model(x)
            valid_loss += loss_fn(pred, y).item()
            correct += ((pred > 0.5) == y).type(torch.float).sum().item()
    if function:
        function(valid_loss, model)
    valid_loss /= num_batches
    correct /= size
    print(f"Validation Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {valid_loss:>8f} \n")
            
    
            
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device), y.to(device).float()
            pred = model(x)
            test_loss += loss_fn(pred, y).item()
            correct += ((pred > 0.5) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [17]:
epochs = 10
for epoch in range(epochs):
    print(f"Epoch {epoch+1}\n-------------------------------")
    train(train_loader, valid_loader, model, loss_fn, optimizer, device)

Epoch 1
-------------------------------
loss: 0.728312  [ 32/40000]
loss: 0.698378  [3232/40000]
loss: 0.699329  [6432/40000]
loss: 0.694687  [9632/40000]
loss: 0.694447  [12832/40000]
loss: 0.695529  [16032/40000]
loss: 0.692197  [19232/40000]
loss: 0.693455  [22432/40000]
loss: 0.692359  [25632/40000]
loss: 0.691468  [28832/40000]
loss: 0.689224  [32032/40000]
loss: 0.692275  [35232/40000]
loss: 0.690408  [38432/40000]
Final Training Accuracy: 50.0%


  return torch._native_multi_head_attention(


Validation Error: 
 Accuracy: 50.0%, Avg loss: 0.693105 

Epoch 2
-------------------------------
loss: 0.692193  [ 32/40000]
loss: 0.695631  [3232/40000]
loss: 0.702477  [6432/40000]
loss: 0.696355  [9632/40000]
loss: 0.695912  [12832/40000]
loss: 0.690812  [16032/40000]
loss: 0.695397  [19232/40000]
loss: 0.693265  [22432/40000]
loss: 0.694485  [25632/40000]
loss: 0.701045  [28832/40000]
loss: 0.695461  [32032/40000]
loss: 0.694243  [35232/40000]
loss: 0.686387  [38432/40000]
Final Training Accuracy: 50.0%
Validation Error: 
 Accuracy: 50.0%, Avg loss: 0.695965 

Epoch 3
-------------------------------
loss: 0.700880  [ 32/40000]
loss: 0.694241  [3232/40000]
loss: 0.691354  [6432/40000]
loss: 0.689961  [9632/40000]
loss: 0.688124  [12832/40000]
loss: 0.683701  [16032/40000]
loss: 0.704618  [19232/40000]
loss: 0.684549  [22432/40000]
loss: 0.705065  [25632/40000]
loss: 0.669506  [28832/40000]
loss: 0.652000  [32032/40000]
loss: 0.587934  [35232/40000]
loss: 0.662937  [38432/40000]
Fin

In [18]:
test(test_loader, model, loss_fn)

Test Error: 
 Accuracy: 87.7%, Avg loss: 0.375420 



In [23]:
example_pos = "That was one of the best movies I've ever seen!"
example_pos = torch.nn.functional.pad(torch.tensor(vocab(tokenizer(example_pos))),
                                   (0, max_length - len(tokenizer(example_pos))), 
                                   mode='constant', 
                                   value=0)
model(example_pos.to(device).unsqueeze(0)) > 0.5
#True = positive, False = negative

tensor(True, device='cuda:0')

In [28]:
example_neg = "That movie was horrible. I fell asleep halfway through."
example_neg = torch.nn.functional.pad(torch.tensor(vocab(tokenizer(example_neg))),
                                   (0, max_length - len(tokenizer(example_neg))), 
                                   mode='constant', 
                                   value=0)
model(example_neg.to(device).unsqueeze(0)) > 0.5

tensor(False, device='cuda:0')

In [30]:
example_neu = "The movie was okay. I liked the part where the main character saved everyone, but I did start falling asleep."
example_neu = torch.nn.functional.pad(torch.tensor(vocab(tokenizer(example_neu))),
                                   (0, max_length - len(tokenizer(example_neu))), 
                                   mode='constant', 
                                   value=0)
model(example_neu.to(device).unsqueeze(0)) > 0.5

tensor(False, device='cuda:0')