In [None]:
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
import joblib

import kagglehub
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from keras.preprocessing.text import Tokenizer
from keras_preprocessing.sequence import pad_sequences
from sentence_transformers import SentenceTransformer

import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import TensorDataset, DataLoader, Dataset, random_split

from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping
from keras.utils import to_categorical
from keras.layers import Embedding, GRU, LSTM, SimpleRNN, Dense, Dropout

# path = kagglehub.dataset_download("datatattle/covid-19-nlp-text-classification")
# os.rename(path, './datasets')

In [None]:
df = pd.read_csv('./datasets/Corona_NLP_test.csv')
df.shape

In [None]:
df.head()

In [None]:
sent_scores = {
    'Extremely Negative': -2,
    'Negative': -1,
    'Neutral': 0,
    'Positive': 1,
    'Extremely Positive': 2
}

df['sentiment_score'] = df['Sentiment'].apply(lambda x: sent_scores[x])
dumm = pd.get_dummies(df['Sentiment'])
df = df.join(dumm)

df = df[['OriginalTweet', 'sentiment_score', 'Extremely Negative', 'Negative', 'Neutral', 'Positive', 'Extremely Positive']]
df.columns = ['text', 'sentiment_score', 'extremely_negative', 'negative', 'neutral', 'positive', 'extremely_positive']
df.head()

In [None]:
y_num = df['sentiment_score'].to_numpy()
y_cat = df.drop(['text', 'sentiment_score'], axis=1).to_numpy()

In [None]:
# tfidf = TfidfVectorizer(stop_words='english')

# X_tfidf = tfidf.fit_transform(df['text'])
# X_tfidf

In [None]:
tokenizer = Tokenizer(num_words = 5000)
tokenizer.fit_on_texts(df['text'].values)
tokenized = tokenizer.texts_to_sequences(df['text'].values)
X_tokenized = pad_sequences(tokenized, maxlen = 128)
X_tokenized

In [None]:
embedding = SentenceTransformer('labse')

In [None]:
def batched(iterable, batch_size):
    batch = []
    for item in iterable:
        batch.append(item)
        if len(batch) == batch_size:
            yield batch
            batch = []
    if batch:
        yield batch

In [None]:
# embeddings = []

# for batch in tqdm(batched(df['text'].values, 50), total = len(df) / 50):
#     embeddings.extend(embedding.encode(batch))
    
# joblib.dump(embeddings, './embeddings.joblib')

# keras

In [None]:
# X_train, X_test, y_train, y_test = train_test_split(X_tokenized, y_cat)

# model = Sequential()

# model.add(Embedding(input_dim = 5000, output_dim = 64, input_length = 128))
# model.add(LSTM(64, return_sequences=True))
# model.add(Dropout(.2))
# model.add(LSTM(64))
# model.add(Dropout(.2))
# model.add(Dense(5, activation='softmax'))

# model.compile(optimizer=Adam(learning_rate=.001), loss='categorical_crossentropy')

# model.fit(X_train, y_train, epochs=15, batch_size=32)

In [None]:
# X_train, X_test, y_train, y_test = train_test_split(embeddings, y_cat)

# model = Sequential()

# model.add(Dense(64, activation='relu'))
# model.add(Dropout(.2))
# model.add(Dense(32, activation='relu'))
# model.add(Dropout(.2))
# model.add(Dense(5, activation='softmax'))

# model.compile(optimizer=Adam(learning_rate=.001), loss='categorical_crossentropy')

# model.fit(X_train, y_train, epochs=100, batch_size=32)

# pytorch

In [None]:
class TokenizedDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.long)
        self.y = torch.tensor(y, dtype=torch.long)
        
    def __len__(self):
        return len(self.X)
        
    def __getitem__(self, i):
        features = self.X[i]
        label = self.y[i]
        return features, label
    
class EmbeddedDataset(Dataset):
    def __init__(self, embeddings_file, y):
        self.X = torch.tensor(np.array(joblib.load('./embeddings.joblib')), dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)
        
    def __len__(self):
        return len(self.X)
        
    def __getitem__(self, i):
        features = self.X[i]
        label = self.y[i]
        return features, label

In [None]:
class LSTMModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.embedding_layer = nn.Embedding(5000, 64)
        self.lstm1 = nn.LSTM(64, 32, dropout=.5)
        self.lstm2 = nn.LSTM(32, 16, dropout=.5)
        self.final = nn.Linear(16, 5)
        
    def forward(self, x):
        x = self.embedding_layer(x)
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        logits = self.final(x)
        return logits
    
# class PreEmbeddedModel(nn.Module):
#     def __init__(self):
#         super().__init__()
#         self.l1 = nn.Linear()
#         self.lstm1 = nn.LSTM(64, 32, dropout=.2)
#         self.lstm2 = nn.LSTM(32, 32, dropout=.2)
#         self.final = nn.Linear(32, 5)
        
#     def forward(self, x):
#         x = self.embedding(x)
#         x, _ = self.lstm1(x)
#         x, _ = self.lstm2(x)
#         logits = self.final(x)
#         return logits

In [None]:
def train(dataloader, model, loss_fn, optimizer):
    model.train()
    for batch, (X,y) in enumerate(dataloader):
        pred = model(X)
        loss = loss_fn(pred, y)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    return loss

def test(dataloader, model, loss_fn):
    model.eval()
    num_batches = len(dataloader)
    test_loss = 0
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            
    return test_loss

In [None]:
tokenized_data = TokenizedDataset(X_tokenized, y_cat)

train_size = int(.8 * len(tokenized_data))
test_size = len(tokenized_data) - train_size

train_tok, test_tok = random_split(tokenized_data, [train_size, test_size])
train_loader = DataLoader(train_tok, batch_size=20, shuffle=True)
test_loader = DataLoader(test_tok, batch_size=20, shuffle=True)

lstm_model = LSTMModel()

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(lstm_model.parameters(), lr=.001)

max_epochs = 5
for e in range(max_epochs):
    train_loss = train(train_loader, lstm_model, loss_fn, optimizer)
    test_loss = test(test_loader, lstm_model, loss_fn)
    print(f'----- EPOCH: {e} -----')
    print(f'Train loss: {train_loss}')
    print(f'Test loss: {test_loss}')