In [None]:
# Sentiment analysis on the IMDB dataset using an LSTM model

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

from sklearn.model_selection import train_test_split
import tensorflow_datasets as tfds

# Use GPU if available
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# Load the IMDB review dataset with metadata
dataset, info = tfds.load('imdb_reviews', with_info=True, as_supervised=True)

# Convert TensorFlow tensors to Python strings and integers
reviews = []
sentiments = []
for review, sentiment in dataset['train']:
    reviews.append(review.numpy().decode('utf-8'))
    sentiments.append(sentiment.numpy())

# Save the dataset as a CSV file
df = pd.DataFrame({'review': reviews, 'sentiment': sentiments})
df.to_csv('IMDB_dataset.csv', index=False)

# Load the dataset and split into features and labels
df = pd.read_csv("IMDB_dataset.csv")
X = df['review'].values
y = df['sentiment'].values
train_sentences, test_sentences, train_labels, test_labels = train_test_split(X, y, test_size=0.2, random_state=0)

# Tokenize and pad sequences using Keras Tokenizer
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

vocab_size = 10000

tokenizer = Tokenizer(num_words=vocab_size, oov_token='<OOV>')
tokenizer.fit_on_texts(train_sentences)

train_sequences = tokenizer.texts_to_sequences(train_sentences)
test_sequences = tokenizer.texts_to_sequences(test_sentences)

# Plot histogram of sentence lengths
plt.hist([len(s) for s in train_sequences] + [len(s) for s in test_sequences], bins=50)
plt.show()

# Set max sequence length and apply padding
max_length = 300
train_padded = pad_sequences(train_sequences, maxlen=max_length, truncating='post', padding='post')
test_padded = pad_sequences(test_sequences, maxlen=max_length, truncating='post', padding='post')

# Check reverse conversion from sequences to words
text = train_padded[0]
print(' '.join([tokenizer.index_word.get(i, '<pad>') for i in text]))
print(train_sentences[0])

# Create TensorDataset and DataLoader
train_data = TensorDataset(torch.LongTensor(train_padded), torch.FloatTensor(train_labels))
valid_data = TensorDataset(torch.LongTensor(test_padded), torch.FloatTensor(test_labels))

batch_size = 50
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
valid_loader = DataLoader(valid_data, shuffle=False, batch_size=batch_size)

# Define LSTM model class
class SentimentRNN(nn.Module):
    def __init__(self, no_layers, vocab_size, embedding_dim, hidden_dim, output_dim, drop_prob=0.3):
        super(SentimentRNN, self).__init__()
        self.output_dim = output_dim
        self.hidden_dim = hidden_dim
        self.no_layers = no_layers

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim,
                            num_layers=no_layers, batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.sig = nn.Sigmoid()

    def forward(self, x, hidden):
        batch_size = x.size(0)
        embeds = self.embedding(x)
        lstm_out, hidden = self.lstm(embeds, hidden)
        out = self.dropout(lstm_out)
        out = self.fc(out)
        sig_out = self.sig(out)
        return sig_out[:, -1], hidden

    def init_hidden(self, batch_size):
        h0 = torch.zeros((self.no_layers, batch_size, self.hidden_dim)).to(device)
        c0 = torch.zeros((self.no_layers, batch_size, self.hidden_dim)).to(device)
        return (h0, c0)

# Instantiate the model and move to device
no_layers = 1
embedding_dim = 64
hidden_dim = 256
output_dim = 1
model = SentimentRNN(no_layers, vocab_size + 1, embedding_dim, hidden_dim, output_dim, drop_prob=0.3)
model.to(device)

# Define loss, optimizer, and accuracy function
lr = 0.001
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

def acc(pred, label):
    pred = torch.round(pred.squeeze())
    return torch.sum(pred == label.squeeze()).item()

# Training loop
import time
s = time.time()
clip = 5
epochs = 10
valid_loss_min = np.inf

LOSS, VAL_LOSS, ACC, VAL_ACC = [], [], [], []

for epoch in range(epochs):
    model.train()
    train_loss, train_acc = 0, 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        inputs, labels = inputs.to(device), labels.to(device)
        h = model.init_hidden(batch_size)
        output, hidden = model(inputs, h)
        loss = criterion(output.squeeze(), labels)
        train_loss += loss.item()
        train_acc += acc(output, labels)
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

    train_loss /= len(train_data)
    train_acc /= len(train_data)
    LOSS.append(train_loss)
    ACC.append(train_acc)

    model.eval()
    val_loss, val_acc = 0, 0
    with torch.inference_mode():
        for inputs, labels in valid_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            h = model.init_hidden(batch_size)
            output, hidden = model(inputs, h)
            loss = criterion(output.squeeze(), labels)
            val_loss += loss.item()
            val_acc += acc(output, labels)

    val_loss /= len(valid_data)
    val_acc /= len(valid_data)
    VAL_LOSS.append(val_loss)
    VAL_ACC.append(val_acc)

    print(f'epoch {epoch} ==> train loss: {train_loss:.5f},  validation loss: {val_loss:.5f}',
          f'train acc: {train_acc:.5f}, validation acc: {val_acc:.5f}')

    if val_loss <= valid_loss_min:
        torch.save(model.state_dict(), 'state_dict.pt')
        print('model saved.............')
        valid_loss_min = val_loss

print((time.time() - s)/60)

fig = plt.figure(figsize = (20, 6))
plt.subplot(1, 2, 1)
plt.plot(ACC, label='Train')
plt.plot(VAL_ACC, label='Validation')
plt.title("Accuracy")
plt.legend()
plt.grid()

plt.subplot(1, 2, 2)
plt.plot(LOSS, label='Train')
plt.plot(VAL_LOSS, label='Validation')
plt.title("Loss")
plt.legend()
plt.grid()
plt.show()

# Inference function
def predict_text(text):
    sequences = tokenizer.texts_to_sequences([text])
    padded = pad_sequences(sequences, maxlen=max_length, truncating='post', padding='post')
    inputs = torch.tensor(padded).to(device)
    h = model.init_hidden(1)
    output, hidden = model(inputs, h)
    return output.item()

# Test example sentences
texts = [
    "The storyline was predictable, but the acting saved the film.",
    "Absolutely terrible. I want my two hours back.",
    "One of the best movies I’ve seen this year – touching and inspiring."
]

for t in texts:
    pred = predict_text(t)
    print(f"Text: {t}")
    print(f"Predicted: {'positive' if pred > 0.5 else 'negative'} (score: {pred:.4f})\n")