In [9]:
# Import necessary libraries
import os
import re
import numpy as np
from string import punctuation
from collections import Counter

# PyTorch libraries
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader

# For word embeddings
from gensim.models import KeyedVectors


In [10]:
def load_reviews_from_directory(directory_path):
    """
    Loads all text files from a given directory and returns a list of their contents.
    """
    reviews = []
    for root, _, files in os.walk(directory_path):
        for file in files:
            if file.endswith(".txt"):  # Only process .txt files
                file_path = os.path.join(root, file)
                with open(file_path, 'r', encoding='utf8') as f:
                    reviews.append(f.read().strip())
    return reviews
    
# Directory paths (adjust these paths as needed)
train_dir = 'train'  # Assumes the 'train' directory is in the current working directory
test_dir = 'test'    # Assumes the 'test' directory is in the current working directory

# Load negative and positive reviews from training data
train_reviews_neg = load_reviews_from_directory(os.path.join(train_dir, 'neg'))
train_reviews_pos = load_reviews_from_directory(os.path.join(train_dir, 'pos'))

# Combine negative and positive training reviews and labels
reviews_train = train_reviews_neg + train_reviews_pos
train_labels = [0]*len(train_reviews_neg) + [1]*len(train_reviews_pos)  # 0: negative, 1: positive

# Load negative and positive reviews from testing data
test_reviews_neg = load_reviews_from_directory(os.path.join(test_dir, 'neg'))
test_reviews_pos = load_reviews_from_directory(os.path.join(test_dir, 'pos'))

# Combine negative and positive testing reviews and labels
reviews_test = test_reviews_neg + test_reviews_pos
test_labels = [0]*len(test_reviews_neg) + [1]*len(test_reviews_pos)  # 0: negative, 1: positive

# Regular expressions for text cleaning
REPLACE_NO_SPACE = re.compile(r"[\.;:!\?',\"()\[\]]")
REPLACE_WITH_SPACE = re.compile(r"(<br\s*/><br\s*/>)|[-/]")

def preprocess_reviews(reviews):
    """
    Cleans the input text by:
    - Converting to lowercase
    - Removing certain punctuation marks
    - Replacing some patterns with space
    """
    reviews = [REPLACE_NO_SPACE.sub("", line.lower()) for line in reviews]
    reviews = [REPLACE_WITH_SPACE.sub(" ", line) for line in reviews]
    return reviews

# Clean training and testing data
reviews_train_clean = preprocess_reviews(reviews_train)
reviews_test_clean = preprocess_reviews(reviews_test)

# Import gensim library
from gensim.models import KeyedVectors

# Load pre-trained Word2Vec model
embed_lookup = KeyedVectors.load_word2vec_format(
    'GoogleNews-vectors-negative300-SLIM.bin', binary=True
)


In [14]:
def tokenize_reviews(embed_lookup, reviews):
    """
    Tokenizes the reviews using the pre-trained Word2Vec model's vocabulary.
    Words not in the vocabulary are mapped to index 0.
    """
    tokenized_reviews = []
    for review in reviews:
        tokens = []
        for word in review.split():
            if word in embed_lookup.key_to_index:
                tokens.append(embed_lookup.key_to_index[word])
            else:
                tokens.append(0)  # Unknown words mapped to 0
        tokenized_reviews.append(tokens)
    return tokenized_reviews

# Tokenize training and testing reviews
tokenized_reviews_train = tokenize_reviews(embed_lookup, reviews_train_clean)
tokenized_reviews_test = tokenize_reviews(embed_lookup, reviews_test_clean)

# Remove zero-length reviews from training data
non_zero_idx_train = [idx for idx, review in enumerate(tokenized_reviews_train) if len(review) != 0]
tokenized_reviews_train = [tokenized_reviews_train[idx] for idx in non_zero_idx_train]
train_labels = [train_labels[idx] for idx in non_zero_idx_train]

# Remove zero-length reviews from testing data
non_zero_idx_test = [idx for idx, review in enumerate(tokenized_reviews_test) if len(review) != 0]
tokenized_reviews_test = [tokenized_reviews_test[idx] for idx in non_zero_idx_test]
test_labels = [test_labels[idx] for idx in non_zero_idx_test]

def pad_features(tokenized_reviews, seq_length):
    """
    Return features of tokenized_reviews, where each review is padded with 0's 
    or truncated to the input seq_length.
    """
    features = np.zeros((len(tokenized_reviews), seq_length), dtype=int)
    
    for i, review in enumerate(tokenized_reviews):
        if len(review) <= seq_length:
            features[i, -len(review):] = np.array(review)
        else:
            features[i, :] = np.array(review[:seq_length])
    return features

# Set sequence length (e.g., 200)
seq_length = 200

# Pad training and testing data
features_train = pad_features(tokenized_reviews_train, seq_length)
features_test = pad_features(tokenized_reviews_test, seq_length)

In [17]:
# Convert labels to NumPy arrays
train_labels = np.array(train_labels)
test_labels = np.array(test_labels)

# Split training data into training and validation sets
split_frac = 0.8
split_idx = int(len(features_train) * split_frac)
train_x, val_x = features_train[:split_idx], features_train[split_idx:]
train_y, val_y = train_labels[:split_idx], train_labels[split_idx:]

# Print the shapes of the datasets
print("\t\t\tFeature Shapes:")
print("Train set: \t\t{}".format(train_x.shape), 
      "\nValidation set: \t{}".format(val_x.shape),
      "\nTest set: \t\t{}".format(features_test.shape))

# Create Tensor datasets
train_data = TensorDataset(torch.from_numpy(train_x), torch.from_numpy(train_y))
valid_data = TensorDataset(torch.from_numpy(val_x), torch.from_numpy(val_y))
test_data = TensorDataset(torch.from_numpy(features_test), torch.from_numpy(test_labels))

# Dataloaders
batch_size = 50

# Create DataLoaders
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
valid_loader = DataLoader(valid_data, shuffle=True, batch_size=batch_size)
test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size)


			Feature Shapes:
Train set: 		(20000, 200) 
Validation set: 	(5000, 200) 
Test set: 		(25000, 200)


In [19]:
class SentimentCNN(nn.Module):
    """
    The embedding layer + CNN model used to perform sentiment analysis.
    """

    def __init__(self, embed_model, output_size, num_filters=100, kernel_sizes=[3, 4, 5], 
                 freeze_embeddings=True, drop_prob=0.5):
        """
        Initialize the model by setting up the layers.
        """
        super(SentimentCNN, self).__init__()

        # 1. Embedding layer
        # Get embeddings from the pre-trained model
        embedding_dim = embed_model.vector_size
        self.embedding = nn.Embedding.from_pretrained(
            torch.FloatTensor(embed_model.vectors), freeze=freeze_embeddings
        )
        
        # 2. Convolutional layers
        self.convs_1d = nn.ModuleList([
            nn.Conv2d(in_channels=1, out_channels=num_filters, kernel_size=(k, embedding_dim), padding=(k-2, 0)) 
            for k in kernel_sizes
        ])

        # 3. Fully-connected layer
        self.fc = nn.Linear(len(kernel_sizes) * num_filters, output_size)

        # 4. Dropout and activation
        self.dropout = nn.Dropout(drop_prob)
        self.sigmoid = nn.Sigmoid()
    
    def conv_and_pool(self, x, conv):
        """
        Convolutional layer with ReLU activation and max pooling.
        """
        x = F.relu(conv(x)).squeeze(3)  # Remove last dimension
        x = F.max_pool1d(x, x.size(2)).squeeze(2)  # Max pooling
        return x

    def forward(self, x):
        """
        Defines how the model processes the input data.
        """
        x = self.embedding(x)  # Embedding layer
        x = x.unsqueeze(1)  # Add channel dimension for convolutional layer

        # Convolutional and pooling layers
        x = [self.conv_and_pool(x, conv) for conv in self.convs_1d]

        # Concatenate outputs and apply dropout
        x = torch.cat(x, 1)
        x = self.dropout(x)

        # Fully-connected layer and sigmoid activation
        x = self.fc(x)
        x = self.sigmoid(x)
        return x


In [20]:
# Instantiate the model
output_size = 1  # Binary classification
num_filters = 100
kernel_sizes = [3, 4, 5]
dropout_prob = 0.5
freeze_embeddings = True  # Set to False to fine-tune embeddings

net = SentimentCNN(
    embed_model=embed_lookup, 
    output_size=output_size, 
    num_filters=num_filters, 
    kernel_sizes=kernel_sizes, 
    freeze_embeddings=freeze_embeddings, 
    drop_prob=dropout_prob
)

print(net)


No GPU available, training on CPU.
SentimentCNN(
  (embedding): Embedding(299567, 300)
  (convs_1d): ModuleList(
    (0): Conv2d(1, 100, kernel_size=(3, 300), stride=(1, 1), padding=(1, 0))
    (1): Conv2d(1, 100, kernel_size=(4, 300), stride=(1, 1), padding=(2, 0))
    (2): Conv2d(1, 100, kernel_size=(5, 300), stride=(1, 1), padding=(3, 0))
  )
  (fc): Linear(in_features=300, out_features=1, bias=True)
  (dropout): Dropout(p=0.5, inplace=False)
  (sigmoid): Sigmoid()
)


In [21]:
# Loss and optimization functions
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(net.parameters(), lr=0.001)
def train(net, train_loader, valid_loader, epochs, print_every=100):
    """
    Train the CNN model.
    """
    # Move model to GPU if available
    if train_on_gpu:
        net.cuda()
    
    net.train()
    counter = 0  # For printing

    for e in range(epochs):
        for inputs, labels in train_loader:
            counter += 1

            if train_on_gpu:
                inputs, labels = inputs.cuda(), labels.cuda()

            # Zero accumulated gradients
            net.zero_grad()

            # Get output from the model
            outputs = net(inputs)

            # Calculate loss and perform backpropagation
            loss = criterion(outputs.squeeze(), labels.float())
            loss.backward()
            optimizer.step()

            # Print loss statistics
            if counter % print_every == 0:
                net.eval()
                val_losses = []
                for val_inputs, val_labels in valid_loader:
                    if train_on_gpu:
                        val_inputs, val_labels = val_inputs.cuda(), val_labels.cuda()
                    val_outputs = net(val_inputs)
                    val_loss = criterion(val_outputs.squeeze(), val_labels.float())
                    val_losses.append(val_loss.item())
                net.train()
                print(f"Epoch: {e+1}/{epochs}, Step: {counter}, "
                      f"Loss: {loss.item():.6f}, Val Loss: {np.mean(val_losses):.6f}")


In [22]:
# Training parameters
epochs = 2  # Adjust as needed
print_every = 100

# Train the model
train(net, train_loader, valid_loader, epochs, print_every)


Epoch: 1/2, Step: 100, Loss: 0.525663, Val Loss: 0.850869
Epoch: 1/2, Step: 200, Loss: 0.444582, Val Loss: 0.919653
Epoch: 1/2, Step: 300, Loss: 0.442624, Val Loss: 0.690815
Epoch: 1/2, Step: 400, Loss: 0.367176, Val Loss: 0.428920
Epoch: 2/2, Step: 500, Loss: 0.292224, Val Loss: 0.465606
Epoch: 2/2, Step: 600, Loss: 0.347272, Val Loss: 0.440414
Epoch: 2/2, Step: 700, Loss: 0.365542, Val Loss: 0.553944
Epoch: 2/2, Step: 800, Loss: 0.272099, Val Loss: 0.519187


In [23]:
# Test the model
def test(net, test_loader):
    """
    Evaluate the model on the test data.
    """
    test_losses = []
    num_correct = 0

    net.eval()
    with torch.no_grad():
        for inputs, labels in test_loader:
            if train_on_gpu:
                inputs, labels = inputs.cuda(), labels.cuda()

            outputs = net(inputs)
            test_loss = criterion(outputs.squeeze(), labels.float())
            test_losses.append(test_loss.item())

            # Convert output probabilities to predicted class (0 or 1)
            preds = torch.round(outputs.squeeze())

            # Compare predictions to true labels
            correct_tensor = preds.eq(labels.float())
            correct = np.squeeze(correct_tensor.cpu().numpy()) if not train_on_gpu else np.squeeze(correct_tensor.numpy())
            num_correct += np.sum(correct)

    print(f"Test Loss: {np.mean(test_losses):.3f}")
    print(f"Test Accuracy: {num_correct / len(test_loader.dataset):.3f}")

# Evaluate the model
test(net, test_loader)


Test Loss: 0.349
Test Accuracy: 0.846


In [24]:
def tokenize_review(embed_lookup, review):
    """
    Tokenizes a single review using the pre-trained Word2Vec model.
    """
    review = review.lower()
    review = ''.join([c for c in review if c not in punctuation])
    tokens = []
    for word in review.split():
        if word in embed_lookup.key_to_index:
            tokens.append(embed_lookup.key_to_index[word])
        else:
            tokens.append(0)  # Unknown words mapped to 0
    return tokens

def predict(net, embed_lookup, review, seq_length=200):
    """
    Predicts the sentiment of a single review.
    """
    net.eval()
    tokens = tokenize_review(embed_lookup, review)
    features = pad_features([tokens], seq_length)
    feature_tensor = torch.from_numpy(features)

    if train_on_gpu:
        feature_tensor = feature_tensor.cuda()

    with torch.no_grad():
        output = net(feature_tensor)

    # Convert output probability to predicted class (0 or 1)
    pred = torch.round(output.squeeze())
    print(f"Prediction value (before rounding): {output.item():.6f}")

    if pred.item() == 1:
        print("Positive review detected!")
    else:
        print("Negative review detected.")


In [26]:
# Create Tensor dataset for test data
test_data = TensorDataset(torch.from_numpy(features_test), torch.from_numpy(test_labels))

# Dataloader for test data
batch_size = 50  # Use the same batch size as during training
test_loader = DataLoader(test_data, shuffle=False, batch_size=batch_size)
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def evaluate_model(net, test_loader):
    """
    Evaluate the model on the test data and compute evaluation metrics.
    """
    all_preds = []
    all_labels = []

    net.eval()  # Set the model to evaluation mode

    with torch.no_grad():
        for inputs, labels in test_loader:
            if train_on_gpu:
                inputs = inputs.cuda()
            outputs = net(inputs)
            preds = torch.round(outputs.squeeze())  # Get the predicted class (0 or 1)
            preds = preds.cpu().numpy() if train_on_gpu else preds.numpy()
            labels = labels.numpy()
            all_preds.extend(preds)
            all_labels.extend(labels)

    # Compute metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)

    print(f"Test Accuracy: {accuracy:.3f}")
    print(f"Test Precision: {precision:.3f}")
    print(f"Test Recall: {recall:.3f}")
    print(f"Test F1 Score: {f1:.3f}")

# Call the evaluation function
evaluate_model(net, test_loader)


Test Accuracy: 0.846
Test Precision: 0.906
Test Recall: 0.773
Test F1 Score: 0.834


In [27]:
from sklearn.metrics import classification_report

def classification_report_model(net, test_loader):
    """
    Generate a classification report for the test data.
    """
    all_preds = []
    all_labels = []

    net.eval()  # Set the model to evaluation mode

    with torch.no_grad():
        for inputs, labels in test_loader:
            if train_on_gpu:
                inputs = inputs.cuda()
            outputs = net(inputs)
            preds = torch.round(outputs.squeeze())  # Get the predicted class (0 or 1)
            preds = preds.cpu().numpy() if train_on_gpu else preds.numpy()
            labels = labels.numpy()
            all_preds.extend(preds)
            all_labels.extend(labels)

    # Generate classification report
    target_names = ['Negative', 'Positive']
    report = classification_report(all_labels, all_preds, target_names=target_names)
    print(report)

# Call the classification report function
classification_report_model(net, test_loader)


              precision    recall  f1-score   support

    Negative       0.80      0.92      0.86     12500
    Positive       0.91      0.77      0.83     12500

    accuracy                           0.85     25000
   macro avg       0.85      0.85      0.85     25000
weighted avg       0.85      0.85      0.85     25000

