<a href="https://colab.research.google.com/github/KrishPro/sentiment-analysis/blob/main/colab-trainer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
from google.colab import drive
drive.mount('/content/drive')

PWD = "/content/drive/MyDrive/Models/sentiment-analysis/bert"
# PWD = ""

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
"""
Written by KrishPro @ KP
"""

from collections import Counter
import numpy as np
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import Vocab, vocab as build_vocab
from torch.nn.utils.rnn import pad_sequence
import torch.utils.data as data
from tqdm import tqdm
import pandas as pd
import torch
import os
import re

PAD_IDX = 0

class Dataset(data.Dataset):
    def __init__(self, batch_size: int) -> None:
        super().__init__()
        
        tqdm.pandas()

        self.tokenizer = get_tokenizer("spacy", language="en_core_web_sm")

        df: pd.DataFrame = self.load_data(self.tokenizer)

        df['text'] = df['text'].map(lambda t: ["[CLS]"] + t)

        self.vocab = self.create_vocab(df['text'], min_freq=2)
        
        df['text'] = df['text'].apply(self.vocab)

        df = df[df['text'].map(len) < 500]

        df['text_len'] = df['text'].map(len)

        df = df.sort_values('text_len')
        df = df.drop('text_len', axis=1).reset_index().drop('index', axis=1)
        
        self.data = df.values.tolist()
        self.data = list(self.chunks(self.data, batch_size))[:-1]

    def __getitem__(self, idx):
        rev, label = self.data[idx]
        rev = [torch.tensor(r) for r in rev]
        label = torch.tensor(label)
        return pad_sequence(rev, padding_value=PAD_IDX), label
    
    def __len__(self):
        return len(self.data)

    @staticmethod
    def chunks(lst, n):
        """Yield successive n-sized chunks from lst."""
        for i in range(0, len(lst), n):
            yield [l[0] for l in lst[i:i + n]], [l[1] for l in lst[i:i + n]]

    def load_data(self, data_dir: str = "/home/krish/Datasets/IMDB-Pos-vs-Neg"):
        if os.path.exists(os.path.join(PWD, "Data/data.json")):
            return pd.read_json(os.path.join(PWD, "Data/data.json"))

        else:
            splits: list[str] = os.listdir(data_dir)

            file_paths = [os.path.join(data_dir, file_name) for file_name in splits]
            dataframe = pd.concat(list(map(pd.read_csv, file_paths)), ignore_index=True)
            dataframe['text'] = dataframe['text'].map(self.clean_text)
            dataframe['text'] = dataframe['text'].progress_map(self.tokenizer)
            return dataframe

    @staticmethod
    def clean_text(text: str) -> str:
        text = text.lower()
        text = text.replace("<br />", "")
        text = re.sub(r'[^\w\s]', "", text)
        return text

    def create_vocab(self, text: pd.Series, min_freq: int) -> Vocab:
        if os.path.exists(os.path.join(PWD, "Data/word-count.pth")):
            counter = torch.load(os.path.join(PWD, "Data/word-count.pth"))
        else:
            counter = Counter()
            for t in tqdm(text):
                counter.update(t)
            torch.save(counter, os.path.join(PWD, "Data/word-count.pth"))


        vocab = build_vocab(counter, min_freq)
        vocab.insert_token("[UNK]", len(vocab))
        vocab.insert_token("[PAD]", PAD_IDX)
        vocab.set_default_index(vocab.get_stoi()["[UNK]"])

        return vocab

def main():
    dataset = Dataset(64)
    dataloader = data.DataLoader(dataset, batch_size=None, shuffle=True, num_workers=os.cpu_count(), pin_memory=True)

    rev, tar = next(iter(dataloader))

    print(rev.shape, tar.shape)

if __name__ == '__main__':
    main()

TypeError: ignored

In [None]:
"""
Written by KrishPro @ KP
"""

from typing import Callable, Union
from torch.nn import functional as F
import torch.nn as nn
import torch
import math

# `PositionalEncoding` is copied from https://pytorch.org/tutorials/beginner/translation_transformer.html#seq2seq-network-using-transformer
class PositionalEncoding(nn.Module):
    def __init__(self,
                 emb_size: int,
                 dropout: float,
                 maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()
        den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)
        pos = torch.arange(0, maxlen).reshape(maxlen, 1)
        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos * den)
        pos_embedding[:, 1::2] = torch.cos(pos * den)
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding: torch.Tensor):
        return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

class EmbeddingLayer(nn.Module):
    def __init__(self, vocab_size: int, emb_size: int, dropout: float, padding_idx: int):
        super(EmbeddingLayer, self).__init__()
        self.embedding_layer = nn.Embedding(vocab_size, emb_size, padding_idx=padding_idx)
        self.positional_encoding = PositionalEncoding(emb_size, dropout)
        self.emb_size = emb_size

    def forward(self, indices: torch.Tensor):
        assert indices.dtype == torch.long, f"indices.dtype must be torch.long, Got {indices.dtype}"
        
        embeddings: torch.Tensor = self.embedding_layer(indices) * math.sqrt(self.emb_size)
        embeddings: torch.Tensor = self.positional_encoding(embeddings)
        return embeddings

class Bert(nn.Module):
    def __init__(self, d_model: int, vocab_size: int, nhead: int, dim_feedforward: int, num_encoder_layers: int, dropout: float, padding_idx: int, activation:  Union[str, Callable[[torch.Tensor], torch.Tensor]] = F.relu, layer_norm_eps: float = 1e-5, batch_first: bool = False, norm_first: bool = False):
        super(Bert, self).__init__()

        self.pad_idx = padding_idx
        self.embedding_layer = EmbeddingLayer(vocab_size, d_model, dropout, self.pad_idx)

        encoder_layer = nn.TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout,
                                                    activation, layer_norm_eps, batch_first, norm_first)
        encoder_norm = nn.LayerNorm(d_model, eps=layer_norm_eps)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)

        self.classifier = nn.Linear(d_model, 1)

    def create_pad_mask(self, r: torch.Tensor):
        return (r == self.pad_idx).T

    def forward(self, r: torch.Tensor):
        # r.shape: (S, N)
        padding_mask: torch.Tensor = self.create_pad_mask(r)
        r: torch.Tensor = self.embedding_layer(r)
        # r.shape: (S, N, E)
        mem: torch.Tensor = self.encoder(r, mask=None, src_key_padding_mask=padding_mask)
        # mem.shape: (S, N, E)
        mem = mem[0] # Taking the encoding for the [CLS] token
        # mem.shape: (N, E)
        output: torch.Tensor = self.classifier(mem)
        # output.shape: (N, 1)
        return torch.sigmoid(output).squeeze(1)

def main():
    bert = Bert(d_model=512, vocab_size=30_000, nhead=8, dim_feedforward=512, num_encoder_layers=6, dropout=0.1, padding_idx=0)

    fake_reviews = torch.randint(0, 29_999, (150, 64))
    fake_outs: torch.Tensor = bert(fake_reviews)

    print(fake_reviews.shape, fake_outs.shape)


if __name__ == '__main__':
    main()

In [None]:
"""
Written by KrishPro @ KP
"""

from torch.utils.tensorboard import SummaryWriter
from torch.utils import data
from data import PAD_IDX, Dataset
from model import Bert
from tqdm import tqdm

import torch.optim as optim
import torch.nn as nn
import torch
import time
import os

LEARNING_RATE = 3e-4
BATCH_SIZE = 64
D_MODEL = 512
NHEAD = 8
NUM_ENCODER_LAYERS = 6
DIM_FEEDFORWARD = D_MODEL * 2
DROPOUT = 0.1

def load_checkpoint(bert: Bert, checkpoint_path: str = "checkpoints/latest.pth"):
    checkpoint_path = os.path.join(PWD, checkpoint_path)
    if os.path.exists(checkpoint_path):
        bert.load_state_dict(torch.load(checkpoint_path))
    return bert

def save_checkpoint(bert: Bert, checkpoint_path: str = "checkpoints/latest.pth"):
    checkpoint_path = os.path.join(PWD, checkpoint_path)
    torch.save(bert.state_dict(), checkpoint_path)

def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    dataset = Dataset(BATCH_SIZE)
    dataloader = data.DataLoader(dataset, batch_size=None, shuffle=True, num_workers=os.cpu_count(), pin_memory=True)

    vocab_size = len(dataset.vocab)

    bert = Bert(D_MODEL, vocab_size, NHEAD, DIM_FEEDFORWARD, NUM_ENCODER_LAYERS, DROPOUT, PAD_IDX)
    bert = load_checkpoint(bert)
    bert = bert.to(device)

    optimizer = optim.Adam(bert.parameters(), lr=LEARNING_RATE, betas=(0.9, 0.98), eps=1e-9)
    criterion = nn.BCELoss() 

    writer = SummaryWriter(os.path.join(PWD, f"runs/{time.time()}"))
    global_step = 0

    optimizer.zero_grad()

    with tqdm(dataloader) as pbar:
        for i, (review, target) in enumerate(pbar):

            predictions = bert(review.to(device))

            loss: torch.Tensor = criterion(predictions, target.to(device).float())

            loss.backward()

            writer.add_scalar("loss", loss.item(), global_step=global_step)
            global_step += 1

            if i % 4 == 0:
                pbar.set_postfix(loss=loss.item())
                optimizer.step()
                optimizer.zero_grad()
        
        save_checkpoint(bert)

if __name__ == "__main__":
    main()