In [21]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import re
from collections import Counter
import random

In [22]:
# ====================================
# Step 1: Create a Simple Dataset Class
# ====================================
class MovieReviewDataset(Dataset):
    """
    A custom Dataset class for handling movie reviews.
    This helps PyTorch efficiently load and batch our data.
    """
    def __init__(self, texts, labels, vocab, max_length=100):
        self.texts = texts          # List of review texts
        self.labels = labels        # List of labels (0 for negative, 1 for positive)
        self.vocab = vocab          # Dictionary mapping words to unique indices
        self.max_length = max_length # Maximum length of each review

    def __len__(self):
        """Returns the total number of reviews"""
        return len(self.texts)

    def __getitem__(self, idx):
        """
        Converts a text review into a sequence of numbers and returns it with its label
        idx: Index of the review to fetch
        """
        # Get the review text and its label
        text = self.texts[idx]
        label = self.labels[idx]

        # Convert text to list of tokens (words)
        tokens = self.tokenize(text)

        # Convert tokens to numerical indices using vocabulary
        indices = [self.vocab.get(token, self.vocab['<unk>']) for token in tokens]

        # Pad or truncate to max_length
        if len(indices) < self.max_length:
            indices = indices + [self.vocab['<pad>']] * (self.max_length - len(indices))
        else:
            indices = indices[:self.max_length]

        return torch.tensor(indices), torch.tensor(label, dtype=torch.float32)

    @staticmethod
    def tokenize(text):
        """
        Converts text to lowercase and splits it into words
        Removes punctuation for simplicity
        """
        text = re.sub(r'[^\w\s]', '', text.lower())
        return text.split()

In [23]:
# ====================================
# Step 2: Define the LSTM Model
# ====================================
class SentimentLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim=100, hidden_dim=128, n_layers=2, dropout=0.3):
        super(SentimentLSTM, self).__init__()

        # Word embedding layer with uniform initialization
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        nn.init.uniform_(self.embedding.weight, -0.1, 0.1)

        # Batch normalization for embeddings
        self.embed_bn = nn.BatchNorm1d(embedding_dim)

        # Bidirectional LSTM
        self.lstm = nn.LSTM(
            embedding_dim,
            hidden_dim,
            n_layers,
            batch_first=True,
            bidirectional=True,
            dropout=dropout if n_layers > 1 else 0
        )

        # Dropout layer
        self.dropout = nn.Dropout(dropout)

        # Additional linear layers
        self.fc1 = nn.Linear(hidden_dim * 2, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 1)

        # Activation functions
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

        # Initialize weights for linear layers only
        self._init_weights()

    def _init_weights(self):
        """Initialize weights for linear layers only"""
        for layer in [self.fc1, self.fc2]:
            if isinstance(layer, nn.Linear):
                nn.init.xavier_uniform_(layer.weight)
                nn.init.zeros_(layer.bias)

    def forward(self, text):
        # Embed and normalize
        embedded = self.embedding(text)

        # Reshape for batch norm
        batch_size, seq_len, embed_dim = embedded.shape
        embedded = embedded.view(-1, embed_dim, seq_len)
        embedded = self.embed_bn(embedded)
        embedded = embedded.view(batch_size, seq_len, embed_dim)

        # LSTM layers
        lstm_out, _ = self.lstm(embedded)

        # Get final output using both directions
        lstm_out = lstm_out[:, -1, :]

        # Dense layers with activations
        out = self.dropout(lstm_out)
        out = self.relu(self.fc1(out))
        out = self.dropout(out)
        out = self.sigmoid(self.fc2(out))

        return out

In [24]:
# ====================================
# Step 3: Training Function
# ====================================
def train_model(model, train_loader, criterion, optimizer, device):
    """
    Performs one epoch of training
    """
    model.train()  # Set model to training mode
    total_loss = 0
    correct_predictions = 0
    total_predictions = 0

    for batch_idx, (text, labels) in enumerate(train_loader):
        # Move data to device (CPU/GPU)
        text, labels = text.to(device), labels.to(device)

        # Clear previous gradients
        optimizer.zero_grad()

        # Forward pass
        predictions = model(text)

        # Calculate loss
        loss = criterion(predictions, labels.unsqueeze(1))

        # Backward pass
        loss.backward()

        # Update weights
        optimizer.step()

        # Accumulate statistics
        total_loss += loss.item()
        rounded_preds = (predictions > 0.5).float()
        correct_predictions += (rounded_preds == labels.unsqueeze(1)).sum().item()
        total_predictions += labels.size(0)

        # Print progress
        if batch_idx % 10 == 0:
            print(f'Batch {batch_idx}: Loss = {loss.item():.4f}')

    # Calculate epoch statistics
    avg_loss = total_loss / len(train_loader)
    accuracy = correct_predictions / total_predictions
    return avg_loss, accuracy

def evaluate_model(model, data_loader, criterion, device):
    """
    Evaluate the model on given data
    """
    model.eval()  # Set model to evaluation mode
    total_loss = 0
    correct_predictions = 0
    total_predictions = 0

    with torch.no_grad():  # Disable gradient calculation
        for text, labels in data_loader:
            text, labels = text.to(device), labels.to(device)
            predictions = model(text)
            loss = criterion(predictions, labels.unsqueeze(1))

            total_loss += loss.item()
            rounded_preds = (predictions > 0.5).float()
            correct_predictions += (rounded_preds == labels.unsqueeze(1)).sum().item()
            total_predictions += labels.size(0)

    return total_loss / len(data_loader), correct_predictions / total_predictions

In [25]:
# ====================================
# Step 4: Example Usage
# ====================================
def predict_sentiment(model, text, vocab, max_length=100, device='cpu'):
    """
    Predict sentiment for a single text review
    Returns: probability of positive sentiment and predicted class
    """
    # Prepare model for evaluation
    model.eval()

    # Tokenize and convert to indices
    tokens = MovieReviewDataset.tokenize(text)
    indices = [vocab.get(token, vocab['<unk>']) for token in tokens]

    # Pad sequence
    if len(indices) < max_length:
        indices = indices + [vocab['<pad>']] * (max_length - len(indices))
    else:
        indices = indices[:max_length]

    # Convert to tensor and add batch dimension
    with torch.no_grad():
        input_tensor = torch.tensor([indices]).to(device)
        prediction = model(input_tensor)
        probability = prediction.item()
        predicted_class = 1 if probability >= 0.5 else 0

    return probability, predicted_class

def test_model_predictions(model, vocab, device):
    """Test the model with some sample reviews"""
    test_reviews = [
        "This movie was absolutely fantastic and I loved every minute of it!",
        "What a terrible waste of time, one of the worst movies ever.",
        "The acting was good but the plot was a bit confusing.",
        "A masterpiece of modern cinema, brilliant in every way.",
        "I fell asleep during this boring and predictable film."
    ]

    print("\nTesting model predictions:")
    print("-" * 60)

    for review in test_reviews:
        prob, pred = predict_sentiment(model, review, vocab, device=device)
        sentiment = "Positive" if pred == 1 else "Negative"
        confidence = prob if pred == 1 else (1 - prob)

        print(f"\nReview: {review}")
        print(f"Sentiment: {sentiment} (confidence: {confidence:.2%})")

def main():
    torch.manual_seed(42)
    random.seed(42)

    # Expanded sample data
    sample_reviews = [
        "this movie was amazing and I loved it",
        "terrible waste of time, awful movie",
        "great acting and wonderful story",
        "boring and predictable plot",
        "fantastic performances by all actors",
        "waste of money and time",
        "incredible cinematography and effects",
        "disappointing and confusing plot",
        "brilliant direction and storytelling",
        "poorly written and badly acted",
        "masterpiece of modern cinema",
        "complete disaster of a film",
        "outstanding performance by the cast",
        "worst movie I've ever seen",
        "beautiful cinematography and score",
        "awful dialogue and weak plot"
    ] * 2  # Duplicate data for more training samples

    sample_labels = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0] * 2

    # Split data with larger training set
    split_idx = int(len(sample_reviews) * 0.8)
    train_reviews = sample_reviews[:split_idx]
    train_labels = sample_labels[:split_idx]
    val_reviews = sample_reviews[split_idx:]
    val_labels = sample_labels[split_idx:]

    # Build vocabulary
    vocab = {'<pad>': 0, '<unk>': 1}
    words = set(' '.join(sample_reviews).lower().split())
    for idx, word in enumerate(words, start=len(vocab)):
        vocab[word] = idx

    # Create datasets and dataloaders with smaller batch size
    train_dataset = MovieReviewDataset(train_reviews, train_labels, vocab)
    val_dataset = MovieReviewDataset(val_reviews, val_labels, vocab)
    train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=4)

    # Model initialization with smaller dimensions
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model = SentimentLSTM(
        vocab_size=len(vocab),
        embedding_dim=32,  # Reduced from 64
        hidden_dim=32,    # Reduced from 64
        n_layers=1,       # Reduced from 2
        dropout=0.2       # Reduced from 0.3
    ).to(device)

    # Loss and optimizer
    criterion = nn.BCELoss()
    optimizer = optim.AdamW(model.parameters(), lr=0.005, weight_decay=0.001)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, mode='min', factor=0.7, patience=3, verbose=True
    )

    # Training loop
    num_epochs = 20
    best_val_accuracy = 0

    print("Starting training...")
    for epoch in range(num_epochs):
        train_loss, train_acc = train_model(model, train_loader, criterion, optimizer, device)
        val_loss, val_acc = evaluate_model(model, val_loader, criterion, device)

        # Learning rate scheduling
        scheduler.step(val_loss)

        # Print statistics
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        print(f"Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.2%}")
        print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.2%}")

        if val_acc > best_val_accuracy:
            best_val_accuracy = val_acc
            torch.save(model.state_dict(), 'best_model.pth')

    print("\nTraining completed!")
    print(f"Best validation accuracy: {best_val_accuracy:.2%}")

    # After training, test the model
    print("\nTesting the trained model:")
    test_model_predictions(model, vocab, device)

    # Save the model and vocabulary for later use
    torch.save({
        'model_state_dict': model.state_dict(),
        'vocab': vocab,
        'model_config': {
            'vocab_size': len(vocab),
            'embedding_dim': 32,
            'hidden_dim': 32,
            'n_layers': 1,
            'dropout': 0.2
        }
    }, 'sentiment_model.pth')

# Add standalone prediction script
if __name__ == "__main__":
    if torch.cuda.is_available():
        print("Using GPU")
    else:
        print("Using CPU")

    main()

# Example of loading and using the saved model:
def load_and_predict(text, model_path='sentiment_model.pth'):
    """Load the saved model and make a prediction"""
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Load the saved model and vocabulary
    checkpoint = torch.load(model_path, map_location=device)
    vocab = checkpoint['vocab']
    config = checkpoint['model_config']

    # Initialize model with saved configuration
    model = SentimentLSTM(**config).to(device)
    model.load_state_dict(checkpoint['model_state_dict'])

    # Make prediction
    prob, pred = predict_sentiment(model, text, vocab, device=device)
    sentiment = "Positive" if pred == 1 else "Negative"
    confidence = prob if pred == 1 else (1 - prob)

    return sentiment, confidence


Using GPU
Starting training...
Batch 0: Loss = 0.6641

Epoch 1/20
Train Loss: 0.7961 | Train Acc: 48.00%
Val Loss: 0.6928 | Val Acc: 57.14%
Batch 0: Loss = 0.6762

Epoch 2/20
Train Loss: 0.6534 | Train Acc: 56.00%
Val Loss: 0.7147 | Val Acc: 42.86%
Batch 0: Loss = 0.8833

Epoch 3/20
Train Loss: 0.7376 | Train Acc: 52.00%
Val Loss: 0.7024 | Val Acc: 42.86%
Batch 0: Loss = 0.7421

Epoch 4/20
Train Loss: 0.7335 | Train Acc: 44.00%
Val Loss: 0.7017 | Val Acc: 42.86%
Batch 0: Loss = 0.7045

Epoch 5/20
Train Loss: 0.6789 | Train Acc: 64.00%
Val Loss: 0.6908 | Val Acc: 57.14%
Batch 0: Loss = 0.6321

Epoch 6/20
Train Loss: 0.7038 | Train Acc: 60.00%
Val Loss: 0.6882 | Val Acc: 57.14%
Batch 0: Loss = 0.7695

Epoch 7/20
Train Loss: 0.6919 | Train Acc: 52.00%
Val Loss: 0.6852 | Val Acc: 57.14%
Batch 0: Loss = 0.6616

Epoch 8/20
Train Loss: 0.6777 | Train Acc: 72.00%
Val Loss: 0.6885 | Val Acc: 57.14%
Batch 0: Loss = 0.7265

Epoch 9/20
Train Loss: 0.7036 | Train Acc: 44.00%
Val Loss: 0.7101 | Val 