In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer

import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader

from nltk.tokenize import RegexpTokenizer
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer

In [None]:
train_val_df_raw = pd.read_csv("train.csv")
test_df_raw = pd.read_csv("test.csv")

In [None]:
train_df_raw, val_df_raw = train_test_split(train_val_df_raw, test_size=0.1, random_state=42)

In [None]:
train_df_raw.head(2)

In [None]:
test_df_raw.head(2)

In [None]:
val_df_raw.head(2)

In [None]:
train_df = train_df_raw.drop(["id", "keyword", "location"], axis=1)
val_df = val_df_raw.drop(["id", "keyword", "location"], axis=1)
test_df = test_df_raw.drop(["id", "keyword", "location"], axis=1)

test_df["target"] = 0  # hacky way for custom dataset class :)

In [None]:
tokenizer = RegexpTokenizer('\w+')
sw = stopwords.words("english")
ps = PorterStemmer()

In [None]:
def clean_text(text):
    tokens = tokenizer.tokenize(text.lower())
    clean_words = [w for w in tokens if w not in sw]
    text = " ".join([ps.stem(w) for w in clean_words])
    return text

In [None]:
train_df.text = train_df.text.apply(clean_text)
val_df.text = val_df.text.apply(clean_text)
test_df.text = test_df.text.apply(clean_text)

In [None]:
cv = CountVectorizer()
cv.fit_transform(train_df.text)

In [None]:
train_df.shape, test_df.shape, val_df.shape

In [None]:
class CustomDataset(Dataset):
    def __init__(self, df):
        self.df = df

    def __len__(self):
        return self.df.shape[0]

    def __getitem__(self, idx):
        X = cv.transform([self.df.text.iloc[idx]]).toarray()[0]
        X = torch.tensor(X, dtype=torch.float32)
        y = self.df.target.iloc[idx]
        return X, y

In [None]:
training_data = CustomDataset(train_df)
test_data = CustomDataset(test_df)
val_data = CustomDataset(val_df)

In [None]:
train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_data, batch_size=64, shuffle=True)

In [None]:
for x, y in train_dataloader:
    print(x.shape)
    print(y.shape)
    break

In [None]:
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using {device} device")

In [None]:
in_features = cv.get_feature_names_out().shape[0]
print(in_features)

In [None]:
class RNNModel(nn.Module):
    def __init__(self, hidden_dim, layer_dim):
        super(RNNModel, self).__init__()
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        self.rnn = nn.RNN(in_features, hidden_dim, layer_dim, batch_first=True, nonlinearity='relu')
        self.fc = nn.Linear(hidden_dim, 2)

    def forward(self, x):
        h0 = torch.zeros((self.layer_dim, self.hidden_dim), device=device)
        out, hn = self.rnn(x, h0)
        out = self.fc(out)
        return out

hidden_dim = 100
layer_dim = 2
model = RNNModel(hidden_dim, layer_dim).to(device)
print(model)

In [None]:
# class NeuralNet(nn.Module):
#     def __init__(self):
#         super().__init__()
#         self.fc1 = nn.Linear(in_features, 128)
#         self.bn1 = nn.BatchNorm1d(128)
#         self.fc2 = nn.Linear(128, 2)

#     def forward(self, x):
#         x = F.relu(self.bn1(self.fc1(x)))
#         x = self.fc2(x)
#         return x

# model = NeuralNet().to(device)
# print(model)

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

In [None]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [None]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(val_dataloader, model, loss_fn)
print("Done!")

In [None]:
predicated_ans = []

In [None]:
for X, y in test_dataloader:
    X = X.to(device)
    ans_np = torch.argmax(model(X), dim=1).cpu().numpy()
    for n in ans_np:
        predicated_ans.append(n)

In [None]:
ans_df = pd.DataFrame({"id": test_df_raw.id, "target": predicated_ans})
ans_df.to_csv("outputs/ans7.csv", index=False)