In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/twitter_validation.csv
/kaggle/input/twitter_training.csv


# Vanilla RNN classification
Twitter sentiment analysis using Vanilla RNN.

This covers for Vanilla, GRU and LSTM variant.

No much twearking to be done, when switching between variants of RNN's

In [2]:
# import libaries
import pandas as pd
import numpy as np

from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader

import time
from tqdm.auto import tqdm 

In [3]:
# load data
column_names = ['ID', 'Game', 'Sentiment', 'Text']

train = pd.read_csv('/kaggle/input/twitter_training.csv', names=column_names)
test = pd.read_csv('/kaggle/input/twitter_validation.csv', names=column_names)

del column_names
train.shape, test.shape

((74682, 4), (1000, 4))

In [4]:
# train = train.iloc[:5000, :]

### Text preprocessing

In [5]:
# drop duplicated
train.drop_duplicates(subset=['Text'], inplace=True)
test.drop_duplicates(subset=['Text'], inplace=True)

# drop rows with na
train.dropna(subset=['Text'], inplace=True)
test.dropna(subset=['Text'], inplace=True)

In [6]:
# count vectorizer
vectorizer = CountVectorizer()
X_train = vectorizer.fit_transform(train['Text']).toarray()
y_train = train['Sentiment'].map({'Negative': 0, 'Positive': 1, 'Neutral': 2, 'Irrelevant': 3}).values

X_test = vectorizer.transform(test['Text']).toarray()
y_test = test['Sentiment'].map({'Negative': 0, 'Positive': 1, 'Neutral': 2, 'Irrelevant': 3}).values

del train, test

### Dataset to tensor

In [7]:
# X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
# y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)

# X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
# y_test_tensor = torch.tensor(y_test, dtype=torch.long).to(device)

In [8]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, x, y=None):
        self.x = torch.tensor(x, dtype=torch.float32).to(device)
        self.y = torch.tensor(y, dtype=torch.long).to(device) if y is not None else None

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx] if self.y is not None else torch.empty((1, 1), dtype=torch.float32)

In [9]:
train_dataset = CustomDataset(X_train, y_train)
test_dataset = CustomDataset(X_test, y_test)

### DataLoader

In [10]:
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

### Building the Network

In [45]:
# create RNN model
class VanillaRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(VanillaRNN, self).__init__()
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True) # replace RNN with choice GRU, LSTM.
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        h0 = torch.zeros(1, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(1, x.size(0), self.hidden_size).to(device) # for LSTM only
        out, _ = self.rnn(x, h0) # replace rnn for gru, for LSTM out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :]) # batches, hidden_state(we took the last), features
        return out

In [46]:
# instantiate the model
input_size = X_train.shape[1]
hidden_size = 256
output_size = 4  # number of classes to predict
model = VanillaRNN(input_size, hidden_size, output_size).to(device)

# define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# optimizer = optim.SGD(params=model.parameters(), lr=0.1)

# accuracy function
def accuracy_fn(y_true, y_pred):
    correct = torch.eq(y_true, y_pred).sum().item()
    acc = (correct/len(y_true)) * 100
    return acc

### Training

In [47]:
def train_step(model: torch.nn.Module,
              data_loader: torch.utils.data.DataLoader,
              criterion: torch.nn.Module,
              optimizer: torch.optim.Optimizer,
              accuracy_fn,
              device: torch.device = device):
    
    # Training loop
    train_loss, train_acc = 0, 0
    
    
    for batch, (inputs, labels) in enumerate(data_loader):
        model.train()
        outputs = model(inputs.unsqueeze(1))

        loss = criterion(outputs.squeeze(), labels)
        train_loss += loss
        train_acc += accuracy_fn(labels, outputs.argmax(dim=1))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    train_loss /= len(data_loader)
    train_acc /= len(data_loader)
    print(f'Train loss: {train_loss:.5f} | Train accuracy: {train_acc:.2f}%')

In [48]:
def test_step(data_loader: torch.utils.data.DataLoader,
              model: torch.nn.Module,
              criterion: torch.nn.Module,
              accuracy_fn):
    
    test_loss, test_acc = 0, 0
    
    model.eval()
    with torch.inference_mode():
        for X, y in data_loader:
            pred = model(X.unsqueeze(1))
            # _, predicted = torch.max(pred, 1)
            
            test_loss += criterion(pred.squeeze(), y)
            test_acc += accuracy_fn(y, pred.argmax(dim=1))
            
        test_loss /= len(data_loader)
        test_acc /= len(data_loader)
        print(f"Test loss: {test_loss:.5f} | Test accuracy: {test_acc:.2f}%\n")

In [49]:
epochs = 5

for epoch in tqdm(range(epochs)):
    print(f'Epoch: {epoch}\n-------------------')
    train_step(model = model,
                data_loader = train_dataloader,
                criterion = criterion,
                optimizer = optimizer ,
                accuracy_fn = accuracy_fn,
                device = device)

    test_step(data_loader = test_dataloader,
                model = model,
                criterion = criterion,
                accuracy_fn = accuracy_fn)

  0%|          | 0/5 [00:00<?, ?it/s]

Epoch: 0
-------------------
Train loss: 0.67213 | Train accuracy: 74.60%
Test loss: 0.22371 | Test accuracy: 93.16%

Epoch: 1
-------------------
Train loss: 0.28199 | Train accuracy: 89.99%
Test loss: 0.17949 | Test accuracy: 94.86%

Epoch: 2
-------------------
Train loss: 0.20648 | Train accuracy: 92.61%
Test loss: 0.17353 | Test accuracy: 95.05%

Epoch: 3
-------------------
Train loss: 0.17391 | Train accuracy: 93.67%
Test loss: 0.19119 | Test accuracy: 95.03%

Epoch: 4
-------------------
Train loss: 0.15269 | Train accuracy: 94.47%
Test loss: 0.18712 | Test accuracy: 95.41%



Reference: 
[Twitter Sentiment Analysis](https://www.kaggle.com/code/dheekumar/rnn-model)