In [None]:
!wget https://nlp.stanford.edu/data/glove.6B.zip
!unzip glove.6B.zip

In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from torchvision import transforms
from torch import nn
import torchsummary
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
from datasets import load_dataset
%matplotlib inline

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
dataset = load_dataset("imdb")

training_set = dataset["train"]
testing_set = dataset["test"]

EDA

In [None]:
train_df = pd.DataFrame(training_set)
train_df = train_df.rename(columns={"text": "training text", "label": "training label"})

test_df = pd.DataFrame(testing_set)
test_df = test_df.rename(columns={"text": "testing text", "label": "testing label"})

# 3. Concatenate
df = pd.concat([train_df, test_df], axis=1)
df.tail()

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(7, 7))

train_counts = df["training label"].value_counts()
test_counts = df["testing label"].value_counts()

axes[0].pie(x=train_counts.values, labels=train_counts.index)
axes[0].set_title("train label distribution")

axes[1].pie(x=test_counts.values, labels=test_counts.index)
axes[1].set_title("test label distribution")

plt.tight_layout()
plt.show()

Emedding and model

In [None]:
# GLOVE PRETRAINED EMBEDDINGS
embedding_index = {}
with open("glove.6B.100d.txt") as file:
    lines = file.readlines()
    for line in lines:
        line = line.split()
        word = line[0]
        embedding = np.array(line[1:], dtype=np.float32)
        embedding_index[word] = embedding

In [None]:
embedding_index

In [None]:
#CLEAN THE TRAINING DATA
def keep_alphabet(text:str):
    temp = ""
    text = text.replace("<br />", " ")
    for character in text:
        if character.isalpha() or character == " ":
            temp += character
    return temp

In [None]:
def prepare_lists_of_words(text:str):
    return keep_alphabet(text).lower().split()

For preparing the dataset

In [None]:
frequency_of_words = {}
def count_freq(words:list):
    for word in words:
        frequency_of_words[word] = frequency_of_words.get(word, 0) + 1

In [None]:
for example in training_set["text"]:
    count_freq(prepare_lists_of_words(example))

In [None]:
frequencies_list = sorted(frequency_of_words.items(), key=lambda x: x[1], reverse=True) #frequency of each word in all our training examples

word_to_index = {items[0]:index for index, items in enumerate(frequencies_list, start=2) if items[1] > 5} #start at 2 to leave space for padding and UNK
word_to_index["<PAD>"] = 0
word_to_index["<UNK>"] = 1

list(word_to_index.items())[:5] #our word to index dict

Setting up the embedding matrix

In [None]:
UNK = np.random.rand(1,100)
PAD = np.zeros((1,100),dtype=np.float32)

embedding_matrix = np.zeros((len(word_to_index) + 2, 100)) #num words x 100 (embedding size)
embedding_matrix[0] = PAD # set up the <PAD>
embedding_matrix[1] = UNK # set up the <UNK>

for word, index in word_to_index.items():
    if word in embedding_index: # check if the word has an embedding in gloVe
        embedding_matrix[index] = embedding_index[word]
    else:
        embedding_matrix[index] = UNK #else make it <UNK>

In [None]:
embedding_matrix

In [None]:
def words_to_id(listofwords:list):
    ids = []
    for word in listofwords:
        if word in word_to_index:
            ids.append(word_to_index[word])
        else:
            ids.append(1) #UNK
    return ids
words_to_id(['this', 'movie', 'curiousyellow'])

In [None]:
def pad_truncate(list_of_elements, max_length=100):
    length = len(list_of_elements)
    if length < max_length:
        for i in range(abs(max_length-length)):
            list_of_elements.append(word_to_index["<PAD>"])

    if length > max_length:
        list_of_elements = list_of_elements[:max_length]

    return list_of_elements

In [None]:
"""
PROCESSING THE DATASET

1- prepare_lists_of_words(text): Cleans raw text into a list of words.

2- words_to_id(list_of_words): Converts words to their integer IDs.

3- pad_truncate(list_of_ids): Forces the ID list to be max_length.

"""

In [None]:
X_train = []
y_train = []
X_test = []
y_test = []

for example in training_set:
    text = example["text"]
    label = example["label"]

    words = prepare_lists_of_words(text)
    ids = words_to_id(words)
    ids = pad_truncate(ids)

    X_train.append(ids)
    y_train.append(label)

for example in testing_set:
    text = example["text"]
    label = example["label"]

    words = prepare_lists_of_words(text)
    ids = words_to_id(words)
    ids = pad_truncate(ids)

    X_test.append(ids)
    y_test.append(label)

len(X_train[:1][0])

In [None]:
# transform = transforms.ToTensor()
X_train = torch.tensor(X_train, dtype=torch.long)
y_train = torch.tensor(y_train, dtype=torch.long)
X_test = torch.tensor(X_test, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)

torch_train_dataset = TensorDataset(X_train, y_train)
torch_test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(torch_train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(torch_test_dataset, batch_size=64, shuffle=False)

In [None]:
class TextCNN(nn.Module):
    def __init__(self, embedding_matrix):
        super().__init__()

        self.relu = nn.ReLU()

        #empty embedding layer
        self.embedding = nn.Embedding(num_embeddings=embedding_matrix.shape[0], embedding_dim=embedding_matrix.shape[1])

        #embedding layer with trained weights
        self.embedding.weight.data.copy_(torch.from_numpy(embedding_matrix))

        #freeze the params, we dont want to fine tune the embeddings for now.
        self.embedding.weight.requires_grad = False

        # 2-gram convs
        self.gram2conv = nn.Conv1d(in_channels=100, out_channels=64, kernel_size=2)
        self.pool2 = nn.MaxPool1d(kernel_size=99)

        # 3-gram convs
        self.gram3conv = nn.Conv1d(in_channels=100, out_channels=64, kernel_size=3) #(batch size, embedding dimension (in_channels), number of embeddings (sequence length)
        self.pool3 = nn.MaxPool1d(kernel_size=98) #100 - 3 + 1 = 98

        # 4-gram convs
        self.gram4conv = nn.Conv1d(in_channels=100, out_channels=64, kernel_size=4)
        self.pool4 = nn.MaxPool1d(kernel_size=97) #100 - 4 + 1 = 97

        # 5-gram convs
        self.gram5conv = nn.Conv1d(in_channels=100, out_channels=64, kernel_size=5)
        self.pool5 = nn.MaxPool1d(kernel_size=96) # 100  - 5 + 1 = 96

        #dropout
        self.dropout = nn.Dropout(p=0.5)

        #linear layer
        self.linear = nn.Linear(in_features=256, out_features=2) # 2 output neurons

    def forward(self, x):

        # x will have shape of (batch size, number of words (ids) -> which then are converted into embeddings)
        # so our x will be (batch size, number of embeddings (sequence / max length), embedding dimension)

        x = self.embedding(x)

        # Conv1d, however, takes inputs of (batch size, embedding dimension (in_channels), number of embeddings (sequence length))
        # so we must somehow swap the max length and embedding dimension

        x = x.permute(0, 2, 1) #put index 2 in index 1, and index 1 in index 2
        # now x has shape of (batch size, embedding dimension, sequence length)

        # 2-gram conv layers
        x2 = self.gram2conv(x)
        x2 = self.relu(x2)
        x2 = self.pool2(x2)
        x2 = torch.squeeze(x2, dim=2)

        # 3-gram conv layers
        x3 = self.gram3conv(x)
        x3 = self.relu(x3)
        x3 = self.pool3(x3) # now the shape of x is: (64, 32, 1) 64 batches, 32 rows of 1 column representing the max of each row
        x3 = torch.squeeze(x3, dim=2) # make x (64, 32) instead of (64, 32, 1)

        # 4-gram conv layers
        x4 = self.gram4conv(x)
        x4 = self.relu(x4)
        x4 = self.pool4(x4)
        x4 = torch.squeeze(x4, dim=2)

        # 5-gram conv layers
        x5 = self.gram5conv(x)
        x5 = self.relu(x5)
        x5 = self.pool5(x5)
        x5 = torch.squeeze(x5, dim=2)

        total_features = torch.hstack([x2, x3, x4, x5])

        total_features = self.dropout(total_features)
        total_features = self.linear(total_features)

        return total_features


Training Loop

In [None]:
model = TextCNN(embedding_matrix=embedding_matrix).to(device)
model = torch.compile(model)

In [None]:
# torchsummary.summary(model, input_size=(100,))

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.1)

In [None]:
def get_accuracy(loader):
    model.eval()
    with torch.no_grad():
        correct = 0
        for X, y in loader:
            X, y = X.to(device), y.to(device)
            output = model(X)
            correct += (torch.argmax(output, dim=1) == y).sum().item()
        model.train()
        return correct / len(loader.dataset)


In [None]:
def train(epochs):
    for epoch in range(epochs):

        epoch_loss = 0

        for X, y in train_loader:

            X, y = X.to(device), y.to(device)

            optimizer.zero_grad()

            output = model(X)

            loss = loss_fn(output, y)

            epoch_loss += loss.item() #keep running loss

            loss.backward()

            optimizer.step()

        print(f"Epoch {epoch + 1}   Training Loss= {epoch_loss/len(train_loader)} ", end="")
        print(f"   Train Accuracy={get_accuracy(train_loader)}   Test Accuracy= {get_accuracy(test_loader)}" if (epoch+1) % 5 == 0 else "")


In [None]:
train(10)