In [None]:
import datasets
from datasets import load_dataset
import warnings
import pandas as pd
warnings.filterwarnings("ignore")

dataset = load_dataset("yelp_polarity")

In [None]:
dataset

In [None]:
df_train = pd.DataFrame(dataset['train'])
df_train.head()
print(df_train.shape)

In [None]:
df_test = pd.DataFrame(dataset['test'])
df_test.head()
print(df_test.shape)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import re
from sklearn.model_selection import train_test_split

import numpy as np
from collections import Counter
from tqdm import tqdm

In [None]:
# Load data
train_data = df_train.copy()
test_data = df_test.copy()

# Split the train data into train and validation
train_data, val_data = train_test_split(train_data, test_size=0.2, random_state=42)

train_texts = train_data['text'].values
train_labels = train_data['label'].values
val_texts = val_data['text'].values
val_labels = val_data['label'].values
test_texts = test_data['text'].values
test_labels = test_data['label'].values

print('Train data:', train_texts.shape, train_labels.shape)
print('Validation data:', val_texts.shape, val_labels.shape)
print('Test data:', test_texts.shape, test_labels.shape)

In [None]:
# clean the text and for every doc, we only take the first 100 words
def clean_text(text):
    text = text.lower()
    text = re.sub(r'[^a-z\s]', '', text)
    # delete extra spaces
    text = re.sub(r'\s+', ' ', text).strip()
    # delete single characters
    text = ' '.join([word for word in text.split() if len(word) > 1])
    # only take the first 100 words
    text = ' '.join(text.split()[:100])
    return text

train_texts = [clean_text(text) for text in train_texts]
val_texts = [clean_text(text) for text in val_texts]
test_texts = [clean_text(text) for text in test_texts]

In [None]:
print(train_texts[0])

In [None]:
# Build Vocabulary
vocabulary = Counter()
for doc in train_texts:
    vocabulary.update(doc.split())
vocab_size = len(vocabulary)
print(vocab_size) 

In [None]:
# Assign an index to each word
word_to_idx = {word: i+1 for i, word in enumerate(vocabulary)} # Starting index from 1, 0 is reserved for padding
print(list(word_to_idx.items())[:5])

In [None]:
train_texts = [[word_to_idx[word] for word in doc.split()] for doc in train_texts]
print(train_texts[0])

val_texts = [[word_to_idx.get(word, 0) for word in doc.split()] for doc in val_texts]
test_texts = [[word_to_idx.get(word, 0) for word in doc.split()] for doc in test_texts]

In [None]:
# Padding
max_length = 256
train_texts = [doc + [0]*(max_length-len(doc)) for doc in train_texts]
val_texts = [doc + [0]*(max_length-len(doc)) for doc in val_texts]
test_texts = [doc + [0]*(max_length-len(doc)) for doc in test_texts]

In [None]:
# Convert to PyTorch Tensors
X_train = torch.tensor(train_texts, dtype=torch.long)
y_train = torch.tensor(train_labels, dtype=torch.float32)
print(X_train.shape, y_train.shape)

In [None]:
X_val = torch.tensor(val_texts, dtype=torch.long)
y_val = torch.tensor(val_labels, dtype=torch.float32)
print(X_val.shape, y_val.shape)

X_test = torch.tensor(test_texts, dtype=torch.long)
y_test = torch.tensor(test_labels, dtype=torch.float32)
print(X_test.shape, y_test.shape)

In [None]:
class SentimentAnalysisModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, output_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.out = nn.Linear(embedding_dim, output_dim)
    
    def forward(self, text):
        embedded = self.embedding(text)
        return torch.sigmoid(self.out(embedded.mean(dim=1)))

In [None]:
# Initialize the model
model = SentimentAnalysisModel(vocab_size+1, 100, 1)

# Loss and Optimizer
criterion = nn.BCELoss()  # Binary Cross Entropy Loss
optimizer = optim.Adam(model.parameters(), lr=3e-4)

In [None]:
print(X_train[0].shape)
# build a data loader
train_dataset = list(zip(X_train, y_train))
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)

In [None]:
# ignore the warning
import warnings
warnings.filterwarnings("ignore")
warnings.filterwarnings('ignore', "Intel MKL WARNING")

In [None]:
# move everything to the GPU
device = torch.device('cuda')
print(device)
model = model.to(device)
criterion = criterion.to(device)
X_train = X_train.to(device)
y_train = y_train.to(device)
X_val = X_val.to(device)
y_val = y_val.to(device)
X_test = X_test.to(device)
y_test = y_test.to(device)

In [None]:
num_epochs = 15
train_losses = []
val_losses = []
val_loss_max = 100

# Check if GPU is available and move the model to GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Train the model
for epoch in tqdm(range(num_epochs)):
    train_loss = 0
    val_loss = 0
    model.train()
    for batch in tqdm(train_loader):
        texts, labels = batch
        # Move data to the same device as the model
        texts, labels = texts.to(device), labels.to(device)

        optimizer.zero_grad()
        output = model(texts)
        loss = criterion(output, labels.unsqueeze(1))
        loss.backward()

        # Prevent optimizer from updating the first embedding vector
        model.embedding.weight.data[0] = 0

        optimizer.step()
        train_loss += loss.item()

    train_losses.append(train_loss / len(train_loader))

    model.eval()
    with torch.no_grad():
        # Move validation data to the same device as the model
        X_val, y_val = X_val.to(device), y_val.to(device)
        output = model(X_val)
        loss = criterion(output, y_val.unsqueeze(1))
        val_loss = loss.item()
        val_losses.append(val_loss)
        
        if val_loss < val_loss_max:
            torch.save(model.state_dict(), 'best_model.pth')
            val_loss_max = val_loss
            count = 0
        else:
            count += 1
            if count > 5:
                print('Early stopping')
                break

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss / len(train_loader):.4f}, Val Loss: {val_loss:.4f}')

In [None]:

# use the best model to evaluate the test data
model.load_state_dict(torch.load('best_model.pth'))
model.eval()

with torch.no_grad():
    output = model(X_test)
    loss = criterion(output, y_test.unsqueeze(1))
    print(f'Test Loss: {loss.item():.4f}')

    preds = (output > 0.5).long()
    accuracy = (preds == y_test.unsqueeze(1)).sum().float() / len(y_test)
    print(f'Test Accuracy: {accuracy.item():.4f}')