In [1]:
import numpy as np
import pandas as pd
import torch
import torchtext
import zipfile
import pathlib
from pathlib import Path
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torchtext.data.functional import numericalize_tokens_from_iterator
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
from tqdm.auto import tqdm


import logging
import os
import warnings
warnings.filterwarnings("ignore")

# Due to warning when initializing the "spacy" tokenizer
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'  # disable tensorflow logging
logging.getLogger('tensorflow').disabled = True  # disable tensorflow warning messages



In [4]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cpu'

In [5]:

df = pd.read_csv('artifacts/train_cleaned.csv')
df_valid = pd.read_csv('artifacts/valid_cleaned.csv')


# limit df
df = df[:10000]

df_valid = df_valid[:1000]

In [6]:
df.duplicated().sum()

162

In [7]:
import pandas as pd
import torch

from typing import Tuple

class CustomDataset(torch.utils.data.Dataset):
    """Custom dataset class for text classification."""
    
    def __init__(self,text,target):
        self.text = text
        self.target = target
        
    def __len__(self) -> int:
        """Return length of dataset."""
        return len(self.text)
    
    def __getitem__(self, index: int) -> Tuple[str, int]:
        """Return item at given index."""
       
        text = self.text[index]
        target = self.target[index]
        
        return text, target
       

In [8]:

train_data = CustomDataset(df['tweet'],df['label'])
test_data = CustomDataset(df_valid['tweet'],df_valid['label'])


In [9]:
next(iter(train_data))

('im getting on borderlands and i will murder you all ,', 3)

In [10]:
# creating vocabulary  
    
tokenizer = get_tokenizer("spacy")

def yield_tokens(data_iter):
    for text in data_iter:
        text = text.lower()
        
        yield tokenizer(text)
        
        
token_generator = yield_tokens(df['tweet'])
        
vocab = build_vocab_from_iterator(token_generator, specials=["<UNK>"])
vocab.set_default_index(vocab["<UNK>"])

In [11]:
def collate_fn(samples):
    # Separate the texts and targets from the samples in a batch
    texts, targets = zip(*samples)
    
    # Tokenize the texts
    tokenized_texts = [tokenizer(text.lower()) for text in texts]
    
    # Convert the tokenized texts to numerical values
    text_indices = [torch.tensor(vocab(token)) for token in tokenized_texts]
    
    # Pad the text sequences to have the same length
    padded_texts = torch.nn.utils.rnn.pad_sequence(text_indices, batch_first=True)
    
    # Convert the targets to tensors
    target_tensor = torch.tensor(targets)
    
    return padded_texts, target_tensor


In [12]:
data_loader = DataLoader(train_data,batch_size=3,collate_fn=collate_fn)

In [13]:
count=0
for i in data_loader:
    count+=1
    print(i[0].shape)
    print(i[0])
    if count==3:
        break


torch.Size([3, 13])
tensor([[   3,  140,  217,   15,   26,    7,    3,   88, 1141,   17,   37,    4,
            0],
        [   3,  128,  374,    6,    2, 2867,    7,    3,   88,  731,   17,   37,
            4],
        [   3,  140,  217,   15,   26,    7,    3,   88,  731,   17,   37,    4,
            0]])
torch.Size([3, 14])
tensor([[   3,  140,  374,   15,   26,    7,    3,   88, 1141,   17,   37,    4,
            0,    0],
        [   3,  140,  217,   15,   26,   63,    7,    3,   88, 1141,   17,   38,
           37,    4],
        [   3,  140,  217,  221,   26,    7,    3,   85, 1141,   17,   37,    4,
            0,    0]])
torch.Size([3, 62])
tensor([[  25,    3,  800,    8,  388,  292,  432,  228,   10,  101,    1,    1,
            1,   50,   17,   39,   44,  170,    3,  128,    8,  412,  206,  541,
            7,  968,   12,   55,   11,   21,  229,  431,    1,   25,    3,  647,
            6,  188,  417,    8, 5416,   10,   21,  237,    1,    1,  138,   12,
            2,

In [14]:

train_dataloader = DataLoader(train_data,batch_size=20,collate_fn=collate_fn)
test_dataloader = DataLoader(test_data,batch_size=20,collate_fn=collate_fn)

### write a class for RNN/LSTM

In [15]:
import torch.nn as nn

num_classes = df['label'].nunique()

class RNNClassify(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_size,num_classes=4):
        super().__init__()
        
        # Define the embedding layer
        self.embed = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        
        self.rnn = nn.RNN(embed_dim, hidden_size,batch_first=True)
        
        self.linear = nn.Linear(hidden_size, num_classes)
        
        # Initialize the weights of the module
        self.init_weights()
        
    def init_weights(self):
        initrange = 0.5
        self.embed.weight.data.uniform_(-initrange, initrange)
        self.rnn.weight_ih_l0.data.uniform_(-initrange, initrange)
        self.rnn.weight_hh_l0.data.uniform_(-initrange, initrange)
        self.linear.weight.data.uniform_(-initrange, initrange)
        self.linear.bias.data.zero_()
        
    def forward(self, input):
        # Embed the input
        embedded = self.embed(input)
        #print('embedded shape:',embedded.shape)
        
        # Pass the embedded input through the RNN layer
        output, hidden = self.rnn(embedded)
        #print('rnn output shape:',output.shape)
        #print('rnn hidden shape:',hidden.shape)
        
        output = output[:, -1, :]  # taking last output of RNN
        #print('rnn last output shape:',output.shape)
        
        # Pass the output through the linear layer
        output = self.linear(output)
        
        # Return the output
        return output


In [16]:
VOCAB_SIZE = len(vocab)
VOCAB_SIZE

10690

In [17]:
model_rnn = RNNClassify(vocab_size=VOCAB_SIZE,embed_dim=64,hidden_size=16)

In [18]:
import torch.nn as nn

num_classes = df['label'].nunique()

class LSTMClassify(nn.Module):
    
    def __init__(self, vocab_size, embed_dim, hidden_size,num_classes=4):
        super().__init__()
        
        # Define the embedding layer
        self.embed = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        
        self.lstm = nn.LSTM(embed_dim, hidden_size,batch_first=True)
        
        self.linear = nn.Linear(hidden_size, num_classes)

        
    def forward(self, input):
        # Embed the input
        embedded = self.embed(input)
        #print('embedded shape:',embedded.shape)
        
        # Pass the embedded input through the LSTM layer
        output, (hidden,cell) = self.lstm(embedded)
        
        
        output = output[:, -1, :] 
        #print('LSTM last output shape:',output.shape)
        
        # Pass the output through the linear layer
        output = self.linear(output)
        
        # Return the output
        return output


In [19]:
import torch.nn as nn

class LSTMClassify(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_size, num_classes=4, num_layers=2, bidirectional=True, dropout=0.2):
        super().__init__()
        
        # Define the embedding layer
        self.embed = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        
        self.lstm = nn.LSTM(embed_dim, hidden_size, num_layers=num_layers, batch_first=True, bidirectional=bidirectional, dropout=dropout)
        
        lstm_output_size = hidden_size
        if bidirectional:
            lstm_output_size *= 2
        
        self.linear = nn.Linear(lstm_output_size, num_classes)
        
    def forward(self, input):
        # Embed the input
        embedded = self.embed(input)
        
        # Pass the embedded input through the LSTM layer
        output, (_, _) = self.lstm(embedded)
        
        output = output[:, -1, :]  # taking the last output of the LSTM
        
        # Pass the output through the linear layer
        output = self.linear(output)
        
        # Return the output
        return output


In [20]:
model_lstm = LSTMClassify(vocab_size=VOCAB_SIZE,embed_dim=64,hidden_size=16)

### Write train-test loop for Mini-batch Gradient Descent 

```

optimizer = torch.optim.Adam(model_rnn.parameters(),lr=0.001)
loss_fn = torch.nn.CrossEntropyLoss()  # remember it gives logits (row outputs)

epochs = 10

for epoch in tqdm(range(epochs)):
    train_loss, train_acc = 0, 0

    for batch, (X, y) in enumerate(train_dataloader):
        model_rnn.train()
        X, y = X.to(device), y.to(device)
            
        y_logits = model_rnn(X)
        loss = loss_fn(y_logits, y)
        
        train_loss += loss.item()
        train_acc += (y_logits.argmax(1) == y).sum().item() / len(y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_loss /= len(train_dataloader)  # Calculate average loss per epoch
    train_acc /= len(train_dataloader)  # Calculate average accuracy per epoch

    print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}')
    
    
    # evaluate model
    with torch.inference_mode():
        model_rnn.eval()
        
        test_loss,test_acc = 0,0
        for batch, (X_test, y_test) in enumerate(test_dataloader):
            
            X_test, y_test = X_test.to(device), y_test.to(device)
            
            y_logits = model_rnn(X_test)

            loss = loss_fn(y_logits, y_test)

            test_loss += loss.item()

            # Compute accuracy
            test_preds = y_logits.argmax(dim=1)
            test_acc += (test_preds == y_test).sum().item() / len(y_test)
        
        test_loss /= len(test_dataloader)  # Calculate average loss per epoch
        test_acc /= len(test_dataloader)  # Calculate average accuracy per epoch


        
        print(f'Epoch {epoch+1}/{epochs}, Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}')
        
        print('--'*25)


```

### let's create a function for `train` and `test`

In [199]:
def train_step(model,
               dataloader,
               loss_fn,
               optimizer:torch.optim,
               device=device):
    
    train_loss,train_acc = 0,0
    for batch,(X,y) in enumerate(dataloader):
        model.train()
        X, y = X.to(device), y.to(device)
            
        y_logits = model(X)

        loss = loss_fn(y_logits, y)

        train_loss += loss
        train_acc += (y_logits.argmax(1) == y).sum().item() / len(y)

        optimizer.zero_grad()
        loss.backward()

        optimizer.step()
    
    
    train_loss /= len(dataloader)
    train_acc /= len(dataloader)
        
    return train_loss,train_acc

In [200]:
def test_step(model,
               dataloader,
               loss_fn,
               device=device):
    
    with torch.inference_mode():
        model.eval()
        
        test_loss,test_acc = 0,0
        for batch, (X_test, y_test) in enumerate(dataloader):
            
            X_test, y_test = X_test.to(device), y_test.to(device)
            
            y_logits = model(X_test)

            loss = loss_fn(y_logits, y_test)

            test_loss += loss.item()

            # Compute accuracy
            test_preds = y_logits.argmax(dim=1)
            test_acc += (test_preds == y_test).sum().item() / len(y_test)
            
        
        test_loss /= len(dataloader)  
        test_acc /= len(dataloader) 


    return test_loss,test_acc 



In [201]:
optimizer = torch.optim.Adam(model_lstm.parameters(),lr=0.01)
loss_fn = torch.nn.CrossEntropyLoss()  # remember it gives logits (row outputs)

epochs = 20

for epoch in tqdm(range(epochs)):
    train_loss, train_accuracy = train_step(model_lstm,train_dataloader,loss_fn,optimizer)
    print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}')
    
    test_loss, test_accuracy = test_step(model_lstm,test_dataloader,loss_fn)
    print(f'Epoch {epoch+1}/{epochs}, Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')
    print('--'*25)   
        
        

  0%|          | 0/20 [00:00<?, ?it/s]

Epoch 1/20, Train Loss: 1.3304, Train Accuracy: 0.3676
Epoch 1/20, Test Loss: 1.4537, Test Accuracy: 0.2770
--------------------------------------------------
Epoch 2/20, Train Loss: 1.3429, Train Accuracy: 0.3691
Epoch 2/20, Test Loss: 1.4543, Test Accuracy: 0.2770
--------------------------------------------------
Epoch 3/20, Train Loss: 1.3446, Train Accuracy: 0.3649
Epoch 3/20, Test Loss: 1.4331, Test Accuracy: 0.2780
--------------------------------------------------
Epoch 4/20, Train Loss: 1.3269, Train Accuracy: 0.3793
Epoch 4/20, Test Loss: 1.4564, Test Accuracy: 0.2790
--------------------------------------------------
Epoch 5/20, Train Loss: 1.3154, Train Accuracy: 0.3871
Epoch 5/20, Test Loss: 1.4335, Test Accuracy: 0.2850
--------------------------------------------------


KeyboardInterrupt: 

In [None]:
for triple in zip(train_data[0],train_data[1],train_data[2]):
    print(triple)