In [None]:
# This model was trained in google colab with T4 GPU
!pip install torch==2.2.2 torchtext==0.17.2 torchvision torchaudio
!pip install torchdata==0.7.1 --quiet
!pip install 'portalocker>=2.0.0'
!pip install spacy nltk gensim

In [None]:
from torchtext import _extension
import torch
import torch.nn as nn
import torch.optim as optim
import torchtext
from torchtext.datasets import IMDB
from sklearn.model_selection import train_test_split
import spacy
import re
from nltk.corpus import stopwords as nltk_stopwords
from spacy.lang.en.stop_words import STOP_WORDS as spacy_stopwords
from gensim.models import Word2Vec
import numpy as np
import nltk
nltk.download('stopwords')

In [None]:
SEED = 2222
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])

stop_words = set(nltk_stopwords.words('english')).union(spacy_stopwords)
negation_words = {
    'not', 'no', 'nor', 'never', 'ain', 'aren', "aren't", 'couldn', "couldn't", 
    'didn', "didn't", 'doesn', "doesn't", 'hadn', "hadn't", 'hasn', "hasn't", 
    'haven', "haven't", 'isn', "isn't", 'mightn', "mightn't", 'mustn', "mustn't", 
    'needn', "needn't", 'shan', "shan't", 'shouldn', "shouldn't", 'wasn', 
    "wasn't", 'weren', "weren't", 'won', "won't", 'wouldn', "wouldn't"
}
stop_words = stop_words - negation_words

def preprocess_text(text):
    text = re.sub(r'<.*?>', '', text)
    tokenizer = re.compile(r"\w+'?\w+|\w+")
    tokens = tokenizer.findall(text.lower())
    lemmatized_tokens = [
        token.lemma_ for token in nlp(" ".join(tokens)) if token.text not in stop_words
    ]
    return lemmatized_tokens

In [None]:
print("Loading and processing IMDb dataset...")
train_iter, test_iter = IMDB(split=('train', 'test'))

reviews = []
labels = []

for label, text in list(train_iter) + list(test_iter):
    reviews.append(preprocess_text(text))
    labels.append(label - 1)

print(f"Processed {len(reviews)} reviews.")
print("-" * 20)

In [None]:
print("Training Word2Vec model...")
embedding_dimension = 100
word2vec_model = Word2Vec(
    sentences=reviews,
    vector_size=embedding_dimension,
    window=5,
    min_count=5,
    workers=4
)
word_vectors = word2vec_model.wv
del word2vec_model
print(f"Word2Vec model trained. Vocabulary size: {len(word_vectors.key_to_index)}")
print("-" * 20)

In [None]:
def word2idx(embedding_model, review):
    index_review = []
    for word in review:
        if word in embedding_model.key_to_index:
            index_review.append(embedding_model.key_to_index[word])
    return torch.tensor(index_review)

padding_value = 0
word_vectors.add_vector('<pad>', np.zeros(embedding_dimension))

index_reviews = [word2idx(word_vectors, review) for review in reviews]
embedding_weights = torch.FloatTensor(word_vectors.vectors)

In [None]:
X_train, X_temp, y_train, y_temp = train_test_split(
    index_reviews, labels, test_size=0.2, random_state=SEED
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=SEED
)

print(f"Training samples: {len(X_train)}")
print(f"Validation samples: {len(X_val)}")
print(f"Testing samples: {len(X_test)}")
print("-" * 20)

def iterator_func(X, y, batch_size=64):
    size = len(X)
    permutation = np.random.permutation(size)
    iterator = []
    for i in range(0, size, batch_size):
        indices = permutation[i:i + batch_size]
        batch_texts = [X[i] for i in indices]
        batch_labels = [y[i] for i in indices]

        sorted_batch = sorted(zip(batch_texts, batch_labels), key=lambda x: len(x[0]), reverse=True)
        batch_texts, batch_labels = zip(*sorted_batch)

        batch = {
            "text": list(batch_texts),
            "label": list(batch_labels)
        }

        batch["length"] = torch.IntTensor([len(review) for review in batch["text"]])

        batch["text"] = nn.utils.rnn.pad_sequence(
            batch["text"], padding_value=padding_value, batch_first=False
        )
        batch["label"] = torch.FloatTensor(batch["label"])

        iterator.append(batch)
    return iterator

In [None]:
class RNN(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim, n_layers, bidirectional, dropout, embedding_weights):
        super().__init__()
        self.embedding = nn.Embedding.from_pretrained(embedding_weights, padding_idx=padding_value)
        self.rnn = nn.LSTM(
            embedding_dim,
            hidden_dim,
            num_layers=n_layers,
            bidirectional=bidirectional,
            dropout=dropout
        )
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, text, text_lengths):
        embedded = self.embedding(text)
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths.to('cpu'))
        packed_output, (hidden, cell) = self.rnn(packed_embedded)
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
        return self.fc(hidden)

In [None]:
INPUT_DIM = len(word_vectors.key_to_index)
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
OUTPUT_DIM = 1
N_LAYERS = 2
BIDIRECTIONAL = True
DROPOUT = 0.5
N_EPOCHS = 5
BATCH_SIZE = 64

model = RNN(
    INPUT_DIM, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS,
    BIDIRECTIONAL, DROPOUT, embedding_weights
)
optimizer = optim.Adam(model.parameters())
criterion = nn.BCEWithLogitsLoss()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = criterion.to(device)

def binary_accuracy(preds, y):
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds == y).float()
    acc = correct.sum() / len(correct)
    return acc

def train(model, iterator, optimizer, criterion):
    epoch_loss = 0
    epoch_acc = 0
    model.train()
    for batch in iterator:
        text, text_lengths = batch["text"].to(device), batch["length"].to(device)
        labels = batch["label"].to(device)

        optimizer.zero_grad()
        predictions = model(text, text_lengths).squeeze(1)
        loss = criterion(predictions, labels)
        acc = binary_accuracy(predictions, labels)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        epoch_acc += acc.item()
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def evaluate(model, iterator, criterion):
    epoch_loss = 0
    epoch_acc = 0
    model.eval()
    with torch.no_grad():
        for batch in iterator:
            text, text_lengths = batch["text"].to(device), batch["length"].to(device)
            labels = batch["label"].to(device)
            predictions = model(text, text_lengths).squeeze(1)
            loss = criterion(predictions, labels)
            acc = binary_accuracy(predictions, labels)
            epoch_loss += loss.item()
            epoch_acc += acc.item()
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
train_iterator = iterator_func(X_train, y_train, BATCH_SIZE)
valid_iterator = iterator_func(X_val, y_val, BATCH_SIZE)
test_iterator = iterator_func(X_test, y_test, BATCH_SIZE)

print("\nStarting model training...")
for epoch in range(N_EPOCHS):
    train_loss, train_acc = train(model, train_iterator, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, valid_iterator, criterion)
    print(f'Epoch: {epoch+1:02} | Train Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}% | Val. Loss: {valid_loss:.3f} | Val. Acc: {valid_acc*100:.2f}%')

print("\nTraining finished.")
print("-" * 20)

In [None]:
test_loss, test_acc = evaluate(model, test_iterator, criterion)
print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

def predict_sentiment(sentence, model, word_vectors, device):
    model.eval()
    tokenized = preprocess_text(sentence)
    indexed = [word_vectors.key_to_index.get(t, -1) for t in tokenized]
    indexed = [i for i in indexed if i != -1]
    if not indexed:
        return 0.5

    length = torch.LongTensor([len(indexed)])
    tensor = torch.LongTensor(indexed).to(device).unsqueeze(1)
    prediction = torch.sigmoid(model(tensor, length))
    return prediction.item()

In [None]:
print("-" * 20)
review1 = "This movie was absolutely fantastic, I loved every minute of it!"
score1 = predict_sentiment(review1, model, word_vectors, device)
print(f"Review: '{review1}'\nSentiment Score: {score1:.4f} ({'Positive' if score1 > 0.5 else 'Negative'})")

review2 = "It was a complete waste of time, the plot was boring and the acting was terrible."
score2 = predict_sentiment(review2, model, word_vectors, device)
print(f"Review: '{review2}'\nSentiment Score: {score2:.4f} ({'Positive' if score2 > 0.5 else 'Negative'})")

In [None]:
torch.save(model.state_dict(), 'sentiment_model.pth')
word_vectors.save('word_vectors.kv')

print("Model and word vectors have been saved successfully!")