In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import random
import re
from collections import Counter
import time

In [None]:


# Set random seeds for reproducibility
torch.manual_seed(42)
random.seed(42)
np.random.seed(42)

# Define the RNN model
class SentimentRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout, pad_idx):
        super().__init__()
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
        
        # RNN layer (using GRU which is a more advanced RNN variant)
        self.rnn = nn.GRU(embedding_dim, 
                          hidden_dim, 
                          num_layers=n_layers, 
                          bidirectional=True, 
                          dropout=dropout if n_layers > 1 else 0,
                          batch_first=True)
        
        # Fully connected layer
        self.fc = nn.Linear(hidden_dim * 2, output_dim)  # *2 for bidirectional
        
        # Dropout layer
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text, text_lengths):
        # Embed the text
        embedded = self.dropout(self.embedding(text))
        
        # Pack the sequence (for efficiency with padded sequences)
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths.cpu(), 
                                                          batch_first=True, enforce_sorted=False)
        
        # Pass through the RNN
        packed_output, hidden = self.rnn(packed_embedded)
        
        # Get the final hidden state from both directions
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
        
        # Pass through the fully connected layer
        return self.fc(hidden)

# Custom dataset class
class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return self.texts[idx], self.labels[idx]

# Function to preprocess text
def preprocess_text(text):
    # Convert to lowercase
    text = text.lower()
    
    # Remove special characters and digits
    text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'\d+', '', text)
    
    # Tokenize (simple space-based tokenization)
    tokens = text.split()
    
    return tokens

# Function to create batches
def collate_batch(batch):
    texts, labels = zip(*batch)
    text_lengths = torch.tensor([len(text) for text in texts])
    
    # Sort by length in descending order
    sorted_indices = torch.argsort(text_lengths, descending=True)
    sorted_texts = [texts[i] for i in sorted_indices]
    sorted_lengths = text_lengths[sorted_indices]
    sorted_labels = [labels[i] for i in sorted_indices]
    
    # Pad sequences
    padded_texts = nn.utils.rnn.pad_sequence([torch.tensor(text) for text in sorted_texts], 
                                            batch_first=True, 
                                            padding_value=PAD_IDX)
    
    return padded_texts, torch.tensor(sorted_labels, dtype=torch.float), sorted_lengths

# Training function
def train(model, iterator, optimizer, criterion):
    model.train()
    epoch_loss = 0
    epoch_acc = 0
    
    for batch in iterator:
        texts, labels, text_lengths = batch
        texts, labels = texts.to(device), labels.to(device)
        
        optimizer.zero_grad()
        predictions = model(texts, text_lengths).squeeze(1)
        loss = criterion(predictions, labels)
        
        loss.backward()
        optimizer.step()
        
        predictions = torch.round(torch.sigmoid(predictions))
        correct = (predictions == labels).float()
        acc = correct.sum() / len(correct)
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

# Evaluation function
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    epoch_acc = 0
    
    with torch.no_grad():
        for batch in iterator:
            texts, labels, text_lengths = batch
            texts, labels = texts.to(device), labels.to(device)
            
            predictions = model(texts, text_lengths).squeeze(1)
            loss = criterion(predictions, labels)
            
            predictions = torch.round(torch.sigmoid(predictions))
            correct = (predictions == labels).float()
            acc = correct.sum() / len(correct)
            
            epoch_loss += loss.item()
            epoch_acc += acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

# Main execution
if __name__ == "__main__":
    # Check if GPU is available
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Constants
    MAX_VOCAB_SIZE = 10000
    MAX_SEQ_LENGTH = 100
    BATCH_SIZE = 64
    EMBEDDING_DIM = 100
    HIDDEN_DIM = 256
    OUTPUT_DIM = 1
    N_LAYERS = 2
    DROPOUT = 0.5
    PAD_IDX = 0
    UNK_IDX = 1
    LR = 0.001
    N_EPOCHS = 5
    
    # Create a simple synthetic dataset
    print("Creating synthetic dataset...")
    
    # Positive words for our synthetic data
    positive_words = ['good', 'great', 'excellent', 'fantastic', 'wonderful', 'amazing', 'love', 
                      'best', 'happy', 'enjoyed', 'favorite', 'recommend', 'brilliant', 'perfect']
    
    # Negative words for our synthetic data
    negative_words = ['bad', 'terrible', 'awful', 'horrible', 'disappointing', 'waste', 'hate', 
                      'worst', 'boring', 'poor', 'dislike', 'avoid', 'failed', 'annoying']
    
    # Common words
    common_words = ['the', 'a', 'an', 'and', 'but', 'or', 'because', 'as', 'of', 'for', 'in', 
                   'to', 'with', 'on', 'at', 'by', 'this', 'that', 'these', 'those', 'is', 'was']
    
    # Create vocabulary
    vocab = {}
    vocab['<pad>'] = PAD_IDX
    vocab['<unk>'] = UNK_IDX
    
    for i, word in enumerate(common_words + positive_words + negative_words):
        vocab[word] = i + 2
    
    vocab_size = len(vocab)
    print(f"Vocabulary size: {vocab_size}")
    
    # Function to generate a synthetic review
    def generate_review(is_positive, min_length=5, max_length=50):
        length = random.randint(min_length, max_length)
        sentiment_words = positive_words if is_positive else negative_words
        
        # Generate more sentiment words for stronger signal
        sentiment_word_count = random.randint(max(1, length // 8), max(3, length // 4))
        common_word_count = length - sentiment_word_count
        
        # Select random words
        selected_sentiment = [random.choice(sentiment_words) for _ in range(sentiment_word_count)]
        selected_common = [random.choice(common_words) for _ in range(common_word_count)]
        
        # Mix them together
        review = selected_common + selected_sentiment
        random.shuffle(review)
        
        # Convert to indices
        return [vocab.get(word, UNK_IDX) for word in review]
    
    # Generate dataset
    num_samples = 10000  # 5000 each for train and test
    train_texts = []
    train_labels = []
    test_texts = []
    test_labels = []
    
    for i in range(num_samples // 2):
        # Positive samples
        train_texts.append(generate_review(True))
        train_labels.append(1)
        test_texts.append(generate_review(True))
        test_labels.append(1)
        
        # Negative samples
        train_texts.append(generate_review(False))
        train_labels.append(0)
        test_texts.append(generate_review(False))
        test_labels.append(0)
    
    # Create datasets
    train_dataset = TextDataset(train_texts, train_labels)
    test_dataset = TextDataset(test_texts, test_labels)
    
    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, collate_fn=collate_batch)
    
    # Initialize model
    print("Initializing model...")
    model = SentimentRNN(vocab_size, EMBEDDING_DIM, HIDDEN_DIM, OUTPUT_DIM, N_LAYERS, DROPOUT, PAD_IDX).to(device)
    
    # Initialize optimizer and criterion
    optimizer = optim.Adam(model.parameters(), lr=LR)
    criterion = nn.BCEWithLogitsLoss()
    
    # Training loop
    print("Starting training...")
    best_val_loss = float('inf')
    
    for epoch in range(N_EPOCHS):
        start_time = time.time()
        
        train_loss, train_acc = train(model, train_loader, optimizer, criterion)
        val_loss, val_acc = evaluate(model, test_loader, criterion)
        
        end_time = time.time()
        epoch_mins, epoch_secs = divmod(end_time - start_time, 60)
        
        print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs:.2f}s')
        print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
        print(f'\t Val. Loss: {val_loss:.3f} |  Val. Acc: {val_acc*100:.2f}%')
        
        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best-model.pt')
            print("\tBest model saved!")
    
    # Load best model and evaluate
    model.load_state_dict(torch.load('best-model.pt'))
    test_loss, test_acc = evaluate(model, test_loader, criterion)
    print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')
    
    # Function to predict sentiment of a new text
    def predict_sentiment(model, text):
        model.eval()
        tokens = preprocess_text(text)
        indices = [vocab.get(token, UNK_IDX) for token in tokens]
        tensor = torch.tensor(indices).unsqueeze(0).to(device)
        length_tensor = torch.tensor([len(indices)])
        
        with torch.no_grad():
            prediction = torch.sigmoid(model(tensor, length_tensor))
        
        return prediction.item()
    
    # Example usage
    sample_texts = [
        "this movie was good and amazing i enjoyed it",
        "this film was terrible boring and a waste of time",
        "the acting was brilliant perfect and wonderful",
        "i hate this awful movie because it was horrible"
    ]
    
    print("\nTesting on sample texts:")
    for text in sample_texts:
        sentiment = predict_sentiment(model, text)
        print(f"Text: '{text}'")
        print(f"Sentiment: {'Positive' if sentiment > 0.5 else 'Negative'} (Score: {sentiment:.4f})")
        print()

Using device: cpu
Creating synthetic dataset...
Vocabulary size: 52
Initializing model...
Starting training...
Epoch: 01 | Time: 0.0m 43.45s
	Train Loss: 0.149 | Train Acc: 93.83%
	 Val. Loss: 0.005 |  Val. Acc: 99.99%
	Best model saved!
Epoch: 02 | Time: 0.0m 43.78s
	Train Loss: 0.025 | Train Acc: 99.07%
	 Val. Loss: 0.001 |  Val. Acc: 100.00%
	Best model saved!
Epoch: 03 | Time: 0.0m 43.24s
	Train Loss: 0.010 | Train Acc: 99.60%
	 Val. Loss: 0.001 |  Val. Acc: 99.98%
Epoch: 04 | Time: 0.0m 39.32s
	Train Loss: 0.009 | Train Acc: 99.68%
	 Val. Loss: 0.000 |  Val. Acc: 100.00%
	Best model saved!
Epoch: 05 | Time: 0.0m 37.70s
	Train Loss: 0.004 | Train Acc: 99.90%
	 Val. Loss: 0.000 |  Val. Acc: 100.00%
	Best model saved!
Test Loss: 0.000 | Test Acc: 100.00%

Testing on sample texts:
Text: 'this movie was good and amazing i enjoyed it'
Sentiment: Positive (Score: 1.0000)

Text: 'this film was terrible boring and a waste of time'
Sentiment: Negative (Score: 0.0000)

Text: 'the acting was 