In [None]:
#%% packages
import pandas as pd
import numpy as np
from collections import Counter
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import seaborn as sns
import matplotlib.pyplot as plt
import os

In [None]:
#%% data import 
DATA_DIR = '../data'
twitter_file = os.path.join(DATA_DIR, 'Tweets.csv')
df = pd.read_csv(twitter_file).dropna()
df.head()

In [None]:
df.isnull().sum()

In [None]:
#drop rows with missing values
df.dropna(subset=['text', 'selected_text'], inplace=True)

In [None]:
# %% get class values based on categories
cat_id = {'neutral': 1, 
          'negative': 0, 
          'positive': 2}

df['class'] = df['sentiment'].map(cat_id)

In [None]:
#%% Hyperparameters
BATCH_SIZE = 512
NUM_EPOCHS = 80

In [None]:
#%% separate independent and dependent features
X = df['text'].values
y = df['class'].values

In [None]:
# %% train test split
TEST_DATA_RATIO = 0.5
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = TEST_DATA_RATIO, random_state=43)
print(f"X train: {X_train.shape}, y train: {y_train.shape}\nX test: {X_test.shape}, y test: {y_test.shape}")


In [None]:
# Create an instance of the CountVectorizer as OneHotEncoder
one_hot = CountVectorizer()
X_train_onehot = one_hot.fit_transform(X_train)
X_test_onehot = one_hot.transform(X_test)

In [None]:
#Huge spare matrix here!
X_train_onehot

In [None]:
X_test_onehot

In [None]:
#%% Dataset class
from torch.utils.data import Dataset

class SentimentData(Dataset):
    def __init__(self, X, y):
        super().__init__()
        self.X = torch.Tensor(X.toarray())
        self.y = torch.Tensor(y).type(torch.LongTensor)
        self.len = len(self.X)
        
    def __len__(self):
        return self.len
    
    def __getitem__(self, index):
        return self.X[index], self.y[index]
    
train_ds = SentimentData(X= X_train_onehot, y=y_train)
test_ds = SentimentData(X_test_onehot, y_test)


In [None]:
# %% Dataloader
train_loader = DataLoader(dataset=train_ds, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_ds, batch_size=15000)

In [None]:
# %% Model
class SentimentModel(nn.Module):
    def __init__(self, NUM_FEATURES, NUM_CLASSES, HIDDEN = 10):
        super().__init__()
        self.linear = nn.Linear(NUM_FEATURES, HIDDEN)
        self.linear2 = nn.Linear(HIDDEN, NUM_CLASSES)
        self.relu = nn.ReLU()
        self.log_softmax = nn.LogSoftmax(dim=1)
        
    def forward(self, x):
        x = self.linear(x)
        x = self.relu(x)
        x = self.linear2(x)
        x = self.log_softmax(x)
        return x

In [None]:
#%% Model, Loss and Optimizer
model = SentimentModel(NUM_FEATURES = X_train_onehot.shape[1], NUM_CLASSES = 3)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters())

In [None]:
# %% Model Training

train_losses = []
for e in range(NUM_EPOCHS):
    curr_loss = 0
    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        y_pred_log = model(X_batch)
        loss = criterion(y_pred_log, y_batch.long())
        
        curr_loss += loss.item()
        loss.backward()
        optimizer.step()
    train_losses.append(curr_loss)
    print(f"Epoch {e}, Loss: {curr_loss}")

In [None]:
# %%
sns.lineplot(x=list(range(len(train_losses))), y= train_losses)
plt.show()

In [None]:
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        y_test_pred_log = model(X_batch)
        y_test_pred = torch.argmax(y_test_pred_log, dim = 1)

In [None]:
# %% 
y_test_pred_np = y_test_pred.squeeze().cpu().numpy()

In [None]:
# %%
acc = accuracy_score(y_pred=y_test_pred_np, y_true = y_test)
f"The accuracy of the model is {np.round(acc, 3)*100}%."

In [None]:
# %%
most_common_cnt = Counter(y_test).most_common()[0][1]
print(f"Naive Classifier: {np.round(most_common_cnt / len(y_test) * 100, 1)} %")


In [None]:
#%% Confusion Matrix
sns.heatmap(confusion_matrix(y_test_pred_np, y_test), annot=True, fmt=".0f")
