Name: Arjun Bhan  UNI: AB5666

In [None]:
!pip install portalocker
!pip install torchmetrics

Collecting portalocker
  Downloading portalocker-2.8.2-py3-none-any.whl (17 kB)
Installing collected packages: portalocker
Successfully installed portalocker-2.8.2
Collecting torchmetrics
  Downloading torchmetrics-1.2.0-py3-none-any.whl (805 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m805.2/805.2 kB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
Collecting lightning-utilities>=0.8.0 (from torchmetrics)
  Downloading lightning_utilities-0.10.0-py3-none-any.whl (24 kB)
Installing collected packages: lightning-utilities, torchmetrics
Successfully installed lightning-utilities-0.10.0 torchmetrics-1.2.0


In [None]:
import argparse
import logging
import time

import torch
from torch.utils.data import DataLoader
from torch.utils.data.dataset import random_split
from torchtext.data import get_tokenizer
from torchtext.data.functional import to_map_style_dataset
from torchtext.data.utils import get_tokenizer, ngrams_iterator
from torchtext.datasets import DATASETS
from torchtext.prototype.transforms import load_sp_model, PRETRAINED_SP_MODEL, SentencePieceTokenizer
from torchtext.utils import download_from_url
from torchtext.vocab import build_vocab_from_iterator
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence
import torch.nn.functional as F
from torchtext.vocab import GloVe
from tqdm import tqdm

torch.autograd.set_detect_anomaly(True)


### Information
- torchtext repo: https://github.com/pytorch/text/tree/main/torchtext
- torchtext documentation: https://pytorch.org/text/stable/index.html

### Constants

In [None]:
DATASET = "AG_NEWS"
DATA_DIR = ".data"
DEVICE = "cpu"
EMBED_DIM = 300
LR = 4.0
BATCH_SIZE = 16
NUM_EPOCHS = 5
PADDING_VALUE = 0
PADDING_IDX = PADDING_VALUE

### Get the tokenizer
- Use the WordLevel tokenizer.


In [None]:
basic_english_tokenizer = get_tokenizer("basic_english")


In [None]:
basic_english_tokenizer("This is some text ...")

['this', 'is', 'some', 'text', '.', '.', '.']

In [None]:
TOKENIZER = basic_english_tokenizer

### Get the data and get the vocabulary

In [None]:
def yield_tokens(data_iter):
    for _, text in data_iter:
        yield TOKENIZER(text)

In [None]:
train_iter = DATASETS[DATASET](root=DATA_DIR, split="train")
VOCAB = build_vocab_from_iterator(yield_tokens(train_iter), specials=('<pad>', '<unk>'))

VOCAB.set_default_index(VOCAB['<unk>'])

### Get GloVe embeddings ... This will be slow ...

In [None]:
GLOVE = GloVe()

.vector_cache/glove.840B.300d.zip: 2.18GB [06:49, 5.31MB/s]                            
100%|█████████▉| 2196016/2196017 [06:10<00:00, 5926.59it/s]


In [None]:
len(GLOVE), GLOVE.vectors.shape

(2196017, torch.Size([2196017, 300]))

### Helper functions

In [None]:
def text_pipeline(text):
    return VOCAB(TOKENIZER(text))

def label_pipeline(label):
    return int(label) - 1

Nice link on collate_fn and DataLoader in PyTorch: https://python.plainenglish.io/understanding-collate-fn-in-pytorch-f9d1742647d3

In [None]:
def collate_batch(batch):
    label_list, text_list = [], []
    for (_label, _text) in batch:
        label_list.append(label_pipeline(_label))

        processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
        text_list.append(processed_text.clone().detach())

    label_list = torch.tensor(label_list, dtype=torch.int64)
    text_list = pad_sequence(text_list, batch_first=True)

    return label_list.to(DEVICE), text_list.to(DEVICE)

### Get the data

In [None]:
train_iter = DATASETS[DATASET](root=DATA_DIR, split="train")
num_class = len(set([label for (label, _) in train_iter]))
print(f"The number of classes is {num_class} ...")

The number of classes is 4 ...


### Set up the model

Good reference on this type of model
- Recurrent CNN: https://ojs.aaai.org/index.php/AAAI/article/view/9513/9372

In [None]:
class CNN1dTextClassificationModel(nn.Module):
    def __init__(
        self,
        vocab_size,
        num_class,
        embed_dim = 300,
        use_pretrained = True,
        fine_tune_embeddings = True
    ):

        super(CNN1dTextClassificationModel, self).__init__()


        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx = PADDING_IDX)

        if use_pretrained:
            self.embedding.weight.requires_grad = False
            for i in range(vocab_size):
                token = VOCAB.lookup_token(i)
                if token in GLOVE.stoi:
                    glove_index = GLOVE.stoi[token]
                    self.embedding.weight[i, :] = GLOVE.vectors[glove_index]
            self.embedding.weight.requires_grad = True
        else:
            self.init_weights()

        if not fine_tune_embeddings:
            self.embedding.weight.requires_grad = False

        self.cnn2 = nn.Conv1d(in_channels = embed_dim, out_channels=1, kernel_size= 2)
        self.cnn3 = nn.Conv1d(in_channels = embed_dim, out_channels=1, kernel_size= 3)
        self.cnn4 = nn.Conv1d(in_channels = embed_dim, out_channels=1, kernel_size= 4)

        self.fc = nn.Linear(in_features = 3, out_features = num_class)

        self.dropout = nn.Dropout(0.3)
        self.relu = nn.ReLU()

        self.debug = False

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)

        self.fc.weight.data.uniform_(-initrange, initrange)

        self.fc.bias.data.zero_()

    def forward(self, text):


        embedded = self.embedding(text)

        if self.debug:
            print('embedding', embedded.shape)


        embedded = embedded.transpose(1, 2)


        cnn2 = self.cnn2(embedded)
        if self.debug:
            print('cnn2', cnn2.shape)


        cnn3 = self.cnn3(embedded)
        if self.debug:
            print('cnn3', cnn3.shape)


        cnn4 = self.cnn4(embedded)
        if self.debug:
            print('cnn4', cnn4.shape)

        cnn2 = F.max_pool1d(cnn2, kernel_size = cnn2.size(2)).squeeze(2)
        cnn3 = F.max_pool1d(cnn3, kernel_size = cnn3.size(2)).squeeze(2)
        cnn4 = F.max_pool1d(cnn4, kernel_size = cnn4.size(2)).squeeze(2)
        if self.debug:
            print('cnn2 after max', cnn2.shape)


        cnn_concat = torch.cat((cnn2, cnn3, cnn4), dim = 1)
        cnn_concat = self.dropout(cnn_concat)
        if self.debug:
            print('cnn concat', cnn_concat.shape)
            self.debug = False

        out = self.fc(cnn_concat)

        return out

class RecurrentCNNModel(nn.Module):
    def __init__(
        self,
        vocab_size,
        num_class = 4,
        e = 300,
        use_pretrained = True,
        fine_tune_embeddings = True,

        debug = True
    ):

        super(RecurrentCNNModel, self).__init__()

        self.embedding = nn.Embedding(vocab_size, e)

        self.c = 100
        self.h = 100
        self.initrange = 0.5

        if use_pretrained:
            self.embedding.weight.requires_grad = False

            for i in range(vocab_size):
                token = VOCAB.lookup_token(i)

                if token in GLOVE.stoi:
                    glove_index = GLOVE.stoi[token]
                    self.embedding.weight[i, :] = GLOVE.vectors[glove_index]
            self.embedding.weight.requires_grad = True
        else:
            self.init_weights()

        if not fine_tune_embeddings:
            self.embedding.weight.requires_grad = False


        self.Wl = nn.Linear(self.c, self.c)
        self.Wr = nn.Linear(self.c, self.c)

        self.Wsl = nn.Linear(e, self.c)
        self.Wsr = nn.Linear(e, self.c)

        self.W2 = nn.Linear(self.c * 2 + e, self.h)
        self.W4 = nn.Linear(self.h, num_class)

        self.dropout = nn.Dropout(0.3)
        self.relu = nn.ReLU()

        self.debug = False

    def init_weights(self):

      self.embedding.weight.data.uniform_(-self.initrange, self.initrange),
      self.W1.weight.data.uniform_(-self.initrange, self.initrange),
      self.Wr.weight.data.uniform_(-self.initrange, self.initrange),
      self.Wsl.weight.data.uniform_(-self.initrange, self.initrange),
      self.Wsr.weight.data.uniform_(-self.initrange, self.initrange)

      self.W1.bias.data.zero_(),
      self.Wr.bias.data.zero_(),
      self.Wsl.bias.data.zero_(),
      self.Wsr.bias.data.zero_(),
      self.W2.bias.data.zero_(),
      self.W4.bias.data.zero_()


    def forward(self, text):

        embedded = self.embedding(text)

        N, L, D = embedded.size(0), embedded.size(1), embedded.size(2)

        cr = torch.zeros(N, L, self.c)

        if self.debug:
            print('cr ', cr.shape)

        cl = torch.zeros(N, L, self.c)


        for l in range(1, L):
            cl[:, l, :] = self.relu(self.Wl(cl[: , l - 1, :].clone())+ self.Wsl(embedded[:, l - 1, :].clone()))


        for l in range(L-2, -1, -1):
            cr[:, l, :] = self.relu(self.Wr(cr[: , l + 1, :].clone()) + self.Wsr(embedded[:, l+1, :].clone()))


        x = torch.cat((cl, embedded, cr), dim = -1)
        if self.debug:
            print('x ', x.shape)


        y2 = torch.tanh(self.W2(x))
        if self.debug:
            print('y2 ', y2.shape)

        y2 = y2.transpose(1, 2)
        if self.debug:
            print('y2 ', y2.shape)

        y3 = torch.max(y2, dim = 2, keepdim = True)[0]
        y3 = y3.squeeze(2)
        if self.debug:
            print('y3 ', y3.shape)

        y4 = self.W4(y3)
        if self.debug:
            print('y4 ', y4.shape)
            self.debug = False

        return y4

### Set up the model

In [None]:
USE_PRETRANED = True,

FINE_TUNE_EMBEDDINGS = True

criterion = torch.nn.CrossEntropyLoss().to(DEVICE)

In [None]:
model =  RecurrentCNNModel(vocab_size = len(VOCAB), num_class = num_class)

optimizer = torch.optim.SGD(model.parameters(), lr = LR)

scheduler = torch.optim.lr_scheduler.StepLR(optimizer, gamma=0.1, step_size = 1.0)

### Set up the data

In [None]:
train_iter, test_iter = DATASETS[DATASET]()
train_dataset = to_map_style_dataset(train_iter)
test_dataset = to_map_style_dataset(test_iter)

num_train = int(len(train_dataset) * 0.95)
split_train_, split_valid_ = random_split(train_dataset, [num_train, len(train_dataset) - num_train])

train_dataloader = DataLoader(split_train_, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
valid_dataloader = DataLoader(split_valid_, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)

### Train the model

In [None]:
def train(dataloader, model, optimizer, criterion, epoch):
    model.train()
    total_acc, total_count = 0, 0
    log_interval = 100

    for idx, (label, text) in tqdm(enumerate(dataloader)):
        optimizer.zero_grad()
        predicted_label = model(text)

        loss = criterion(predicted_label, label)

        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)

        optimizer.step()
        total_acc += (predicted_label.argmax(1) == label).sum().item()
        total_count += label.size(0)
        if idx % log_interval == 0 and idx > 0:
            print(
                "| epoch {:3d} | {:5d}/{:5d} batches "
                "| accuracy {:8.3f}".format(epoch, idx, len(dataloader), total_acc / total_count)
            )
            total_acc, total_count = 0, 0

In [None]:
def evaluate(dataloader, model):
    model.eval()
    total_acc, total_count = 0, 0

    with torch.no_grad():
        for idx, (label, text) in enumerate(dataloader):
            predited_label = model(text)
            total_acc += (predited_label.argmax(1) == label).sum().item()
            total_count += label.size(0)
    return total_acc / total_count

In [None]:
for epoch in range(1, NUM_EPOCHS + 1):
    epoch_start_time = time.time()
    train(train_dataloader, model, optimizer, criterion, epoch)
    accu_val = evaluate(valid_dataloader, model)
    scheduler.step()
    print("-" * 59)
    print(
        "| end of epoch {:3d} | time: {:5.2f}s | "
        "valid accuracy {:8.3f} ".format(epoch, time.time() - epoch_start_time, accu_val)
    )
    print("-" * 59)

print("Checking the results of test dataset.")
accu_test = evaluate(test_dataloader, model)
print("test accuracy {:8.3f}".format(accu_test))

In [None]:
# Make a Conv Text model
model = CNN1dTextClassificationModel(vocab_size = len(VOCAB), num_class = num_class)

# Set the optimizer to SGD
optimizer = torch.optim.SGD(model.parameters(), lr = LR)

# Set the scheduler to StepLR with gamma=0.1 and step_size = 1.0
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, gamma=0.1, step_size = 1.0)

In [None]:
# Train the Conv1d model
for epoch in range(1, NUM_EPOCHS + 1):
    epoch_start_time = time.time()
    train(train_dataloader, model, optimizer, criterion, epoch)
    accu_val = evaluate(valid_dataloader, model)
    scheduler.step()
    print("-" * 59)
    print(
        "| end of epoch {:3d} | time: {:5.2f}s | "
        "valid accuracy {:8.3f} ".format(epoch, time.time() - epoch_start_time, accu_val)
    )
    print("-" * 59)

print("Checking the results of test dataset.")
accu_test = evaluate(test_dataloader, model)
print("test accuracy {:8.3f}".format(accu_test))

"Why do you think this CNN does not do very well on this data?". Also, please explain why. (Hint: the answer is fairly short)

The CNN does not do very well on this data as it doesn't apply nonlinearity to its model's architecture. This results in the model not being able to understand the data as well as the RNNCNN which applies nonlinearity. Nonlinearity is important as it allows the model to explore nonlinear patterns within the data.