### Setup

The following directories/files are not included in the repository, to be extracted from `.zip` files in the following structure
```md
├── models
│   └── BiLSTM
│       ├── config.json
│       └── model.safetensors
└── raw_data
    ├── balancedtest.csv
    └── fulltrain.csv
```

In [None]:
import pandas as pd
import torch
import torch.nn as nn
import numpy as np
import torchtext

from sklearn.metrics import accuracy_score, precision_recall_fscore_support 
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from torch.utils.data import Dataset, DataLoader
from torchtext.vocab import GloVe

In [None]:
print(torch.cuda.is_available())
torch.cuda.device(0)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
SEED = 42 # seed random state for comparison, testing
PARTITION_SIZE = 500 # Adjust lower if potato PC and higher if gaming rig or want results closer to actual
enable_all_data = True # SET TO FALSE IF PREPROCESSING TAKES A LONG TIME (True = test on PARTITION_SIZE training and PARTITION_SIZE testing samples)

### Pre-processing

In [None]:
df = pd.read_csv('/kaggle/input/lun-raw/fulltrain.csv', header=None, index_col = False)
df.head()

In [None]:
df = df if enable_all_data else df.sample(n=PARTITION_SIZE, random_state=SEED)

X = df.iloc[:, 1] 
y = df.iloc[:, 0] - 1

y.value_counts()

### Dataset

In [None]:
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=SEED)

In [None]:
class CustomDataset(Dataset):
    
#     def __init__(self, X, y, glove, max_length):
    def __init__(self, X, y, glove):
        self.X = X
        self.y = y
        self.glove = glove
#         self.max_length = max_length
#         self.unk_token = self.glove['<unk>']

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        text = self.X.iloc[index]
        label = self.y.iloc[index]

        # Tokenize text and convert to GloVe indices
        tokens = text.split()  # Split text into tokens by whitespace
#         indices = [self.glove.stoi.get(token, self.unk_token) for token in tokens]  # Map OOV tokens to index 1

        indices = [self.glove[token] for token in tokens]
#         text_len = len(indices)
        
        indices = torch.stack(indices)
#         padding = torch.zeros((self.max_length - text_len, 300)) # Pad with zeros

#         return torch.cat([indices, padding], dim=0).to(device), label, text_len

        return indices.to(device), label

In [None]:
glove = GloVe(name='6B', dim=300)
# max_length = 120000

In [None]:
# train_dataset = CustomDataset(X_train, y_train, glove, max_length)
# val_dataset = CustomDataset(X_val, y_val, glove, max_length)
train_dataset = CustomDataset(X, y, glove)

In [None]:
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
# val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)

### Model

In [None]:

class BiLSTM(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, num_layers):
        super(BiLSTM, self).__init__()
        
        self.hidden_size, self.num_layers = hidden_dim, num_layers
        
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, 4)

    def forward(self, text):
        
        h0 = torch.zeros(self.num_layers * 2, text.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers * 2, text.size(0), self.hidden_size).to(device)

        # Forward pass through LSTM
        out, _ = self.lstm(text, (h0, c0))

        out = torch.cat((out[:, -1, :self.hidden_size], out[:, 0, self.hidden_size:]), dim=1)
    
        # Decode the hidden state
        out = self.fc(out)
        return out

### Training

In [None]:
model = BiLSTM(300, 256, 2).to(device)

In [None]:
optimizer = torch.optim.AdamW(model.parameters(), lr=2e-5)
criterion = nn.CrossEntropyLoss()

In [None]:
EPOCHS = 5

n = 0

for epoch in range(EPOCHS):
    model.train()
    for texts, labels in train_loader:
        texts = texts.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(texts)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        n += 1
        
        if n % 100 == 0:
            print(f'Epoch [{epoch+1}/{EPOCHS}], Loss: {loss.item()}%')

#     # Validation
#     model.eval()
#     with torch.no_grad():
#         correct = 0
#         total = 0
#         for texts, labels in val_loader:
#             texts = texts.to(device)
#             labels = labels.to(device)
#             outputs = model(texts)
#             _, predicted = torch.max(outputs.data, 1)
#             total += labels.size(0)
#             correct += (predicted == labels).sum().item()

#         print(f'Epoch [{epoch+1}/{EPOCHS}], Val Accuracy: {100 * correct / total:.2f}%')
    

### Save model

In [None]:
torch.save(model.state_dict(), "/kaggle/working/BiLSTM")

### Evaluation

In [None]:
# TEST DATA 
test_df = pd.read_csv('/kaggle/input/lun-raw/balancedtest.csv', index_col = False)
test_df = test_df if enable_all_data else test_df.sample(PARTITION_SIZE)

In [None]:
X_test = test_df.iloc[:, 1]
y_test = test_df.iloc[:, 0] - 1

In [None]:
test_dataset = CustomDataset(X_test, y_test, glove)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [None]:
model.eval()
correct = 0
total = 0
y_true = []
y_pred = []

with torch.no_grad():
    for texts, labels in test_loader:
        texts = texts.to(device)
        labels = labels.to(device)
        outputs = model(texts)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        y_true.extend(labels.cpu().numpy())
        y_pred.extend(predicted.cpu().numpy())

In [None]:
test_accuracy = accuracy_score(y_true, y_pred)
test_precision, test_recall, test_f1, _ = precision_recall_fscore_support(y_true, y_pred, average='macro')

In [None]:
print(f'Test Accuracy: {test_accuracy:.8f},\tTest Precision: {test_precision:.8f},\tTest Recall: {test_recall:.8f},\tTest f1: {test_f1:.8f}')

In [None]:
print(pd.Series(y_pred).value_counts())

In [None]:
class_test_precision, class_test_recall, class_test_f1, class_ = precision_recall_fscore_support(y_true, y_pred)
for i in range(4):
    print(f'Class {i}:\tTest Precision: {class_test_precision[i]:.8f},\tTest Recall: {class_test_recall[i]:.8f},\tTest f1: {class_test_f1[i]:.8f}')