In [71]:
import numpy as np
import torch.nn as nn
from torch.functional import F
from string import punctuation
from collections import Counter
import torch
from torch.utils.data import TensorDataset, DataLoader
import torch
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

class SentimentRNN(nn.Module):
    def __init__(self, word2int, embedding_dim, n_hidden, n_layers, output_size=1, drop_prob=0.5):
        super(SentimentRNN, self).__init__()
        self.word2int = word2int
        self.n_hidden = n_hidden
        self.n_layers = n_layers
        self.vocab_size = len(word2int) + 1
        self.embedding = nn.Embedding(self.vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, n_hidden, n_layers, dropout=drop_prob, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc = nn.Linear(n_hidden * 2, output_size)  # n_hidden * 2 because it's bidirectional
        self.sigmoid = nn.Sigmoid()
        # Set the device attribute I use cpu, don't have a gpu....yet
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.to(self.device)  # Move model to appropriate device

    def forward(self, x, hidden):
        x = self.embedding(x)
        x, hidden = self.lstm(x, hidden)
        x = x[:, -1]  # Take the output from the last timestep
        x = self.dropout(x)
        x = self.fc(x)
        x = self.sigmoid(x)
        return x, hidden

    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        return (
            weight.new(self.n_layers * 2, batch_size, self.n_hidden).zero_().to(self.device),
            weight.new(self.n_layers * 2, batch_size, self.n_hidden).zero_().to(self.device)
        )


    def init_weights(self):
        # Initialize the weights using uniform distribution, learned this from YT
        self.fc.weight.data.uniform_(-1, 1)
        # Initialize the weights using 0
        self.fc.bias.data.fill_(0)



In [72]:
def pre_process():
    # Read the reviews.txt and labels.txt, then store them in variables, dataset
    with open('/content/reviews.txt', 'r') as f:
        reviews_text = f.read()

    with open('/content/labels.txt', 'r') as f:
        sentiment = f.read()

    # Convert the reviews text to lowercase,
    # then remove all the punctuations
    reviews_text = reviews_text.lower()
    reviews_text = ''.join([c for c in reviews_text if c not in punctuation])
    print(reviews_text)

    # Each line contains one review. Create a list of reviews.
    reviews_list = reviews_text.split('\n')

    # Merge all the text in one string
    # This will be required for creating the char2int dict
    words = (' '.join(reviews_list)).split()

    # Build a dictionary that maps words to integers
    # Using Counter to set the values as number of word occurrences
    # High frequency words will have high occurrences
    counts = Counter(words)

    # Sort the dict based on # of occurrences from most to least
    sorted_dict = sorted(counts, key=counts.get, reverse=True)

    # Create the word2int dict with most frequent word starting from index 1
    # Reserve 0 for padding
    word2int = {word: index for index, word in enumerate(sorted_dict, 1)}

    # Encode the reviews text using word2int
    reviews_encoded = []
    for review in reviews_list:
        reviews_encoded.append([word2int[word] for word in review.split()])

    # Convert the labels to 0/1
    labels_encoded = []
    sentiment_list = sentiment.split('\n')
    for sentiment in sentiment_list:
        if sentiment == 'positive':
            labels_encoded.append(1)
        else:
            labels_encoded.append(0)

    return reviews_encoded, labels_encoded, word2int



In [73]:
def process_reviews(reviews_encoded, labels_encoded):
    # Remove entries with empty reviews, only interested in text so empty str aren't helpful
    count = 0
    for i, review in enumerate(reviews_encoded):
        if len(review) == 0:
            count = 1
            del reviews_encoded[i]
            del labels_encoded[i]

    print("Removed %d reviews" % count)

    # Convert the labels_encoded from list to Numpy Array
    labels_ndarray = np.array(labels_encoded)

    return reviews_encoded, labels_ndarray


In [74]:
def pad_features(reviews_encoded, seq_length):

    features = list()

    # Loop through the reviews
    for i, reviews in enumerate(reviews_encoded):

        if len(reviews) > seq_length:
            # cut off point in reviews
            features.append(reviews[0:seq_length])
        elif len(reviews) < seq_length:
            # Calculate padding and prepend, every input needs to be uniform
            difference = seq_length - len(reviews)
            arr = [0 for i in range(difference)]
            arr.extend(reviews)
            features.append(arr)
        else:
            features.append(reviews)

    return np.array(features)


In [75]:
def create_traing_test_val_set(reviews_ndarray, labels_ndarray, train_frac=0.8):

    # Find the index to split, in this case it 80% to train...
    split = int(len(reviews_ndarray) * train_frac)

    #get training data
    train_x = reviews_ndarray[:split, :]
    train_y = labels_ndarray[:split]

    remaining_x = reviews_ndarray[split:, :]
    remaining_y = labels_ndarray[split:]

    # Split the remaining data in half, rememeber the remaining will be allocated to validation and testing(please refer to the report)
    split = int(len(remaining_x) * 0.5)

    # Use first half for validation
    val_x = remaining_x[:split, :]
    val_y = remaining_y[:split]

    # Use 2nd half for Testing
    test_x = remaining_x[split:, :]
    test_y = remaining_y[split:]

    return train_x, train_y, val_x, val_y, test_x, test_y



In [76]:
def create_batches(train_x, train_y, val_x, val_y, test_x, test_y, batch_size):

    # Create TensorDataset, read this on Pytorch documentation, this is why we used numpy arrays from before
    train_data = TensorDataset(torch.from_numpy(train_x), torch.from_numpy(train_y))
    val_data = TensorDataset(torch.from_numpy(val_x), torch.from_numpy(val_y))
    test_data = TensorDataset(torch.from_numpy(test_x), torch.from_numpy(test_y))

    # Use the DataLoader. Also shuffle the data
    train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
    val_loader = DataLoader(val_data, shuffle=True, batch_size=batch_size)
    test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size)

    return train_loader, val_loader, test_loader



In [77]:
def train(model: SentimentRNN, train_loader, val_loader, batch_size=25, epochs=60, lr=0.1, clip=5, print_every=100):


    # Set the model to training mode
    model.train()

    # Define optimization process
    optimization = torch.optim.Adam(model.parameters(), lr=lr)

    # Define Loss function as Binary Cross Entropy, geeks for geeks said this would be optimal....:/
    error_func = nn.BCELoss()

    if torch.cuda.is_available():#remember first seeing this in class assignment
        model.cuda()

    training_loss = []
    validation_loss = [] #data struct to store our value

    # Loop through the epochs
    for i in range(epochs):

        # Initialize the hidden layers
        hidden = model.init_hidden(batch_size)

        # Loop though the batches [ Mini-Batch SGD ]
        for counter, (inputs, labels) in enumerate(train_loader):

            # Move the input and targets to GPU ( if available)
            if torch.cuda.is_available():
                inputs, labels = inputs.cuda(), labels.cuda()

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history, this took awhile for me to get right....
            hidden = tuple([each.data for each in hidden])

            # Remove the gradients from the model, minimize it!!
            model.zero_grad()

            # Forward pass
            output, hidden = model.forward(inputs, hidden)

            # Calculate the Loss
            loss = error_func(output.squeeze(), labels.float())

            # Back pass
            loss.backward()

            # Gradient clipping ( needed to avoid exploding gradients )
            nn.utils.clip_grad_norm_(model.parameters(), clip)

            optimization.step()

            if counter % print_every == 0:
                # Calculate validation loss

                val_hc = model.init_hidden(batch_size)
                val_losses = []

                # Set the model to evaluation state
                model.eval()

                for inputs, labels in val_loader:
                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_hc = tuple([each.data for each in val_hc])

                    # Move the input and targets to GPU ( if available )
                    if torch.cuda.is_available():
                        inputs, labels = inputs.cuda(), labels.cuda()

                    # Forward Propagation
                    output, val_hc = model.forward(inputs, val_hc)

                    val_loss = error_func(output.squeeze(), labels.float())
                    val_losses.append(val_loss.item())

                # Set the model to training mode again
                model.train()
                print("Epoch: {}/{}...".format(i + 1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.6f}...".format(loss.item()),
                      "Val Loss: {:.6f}".format(np.mean(val_losses)))
        validation_loss.append(np.mean(val_losses))
        training_loss.append(loss.item())



In [78]:
def test(model: SentimentRNN, test_loader, batch_size=50):

    if torch.cuda.is_available():
        model.cuda()
        train_on_gpu = True

    # Set the model to training mode
    model.eval()

    # Define Loss function as Binary Cross Entropy
    error_func = nn.BCELoss()

    h = model.init_hidden(batch_size)
    test_losses = []

    # Set the model to evaluation state
    model.eval()

    num_correct = 0

    for inputs, labels in test_loader:
        # we'd backprop through the entire training history
        h = tuple([each.data for each in h])

        # Move the input and targets to GPU ( if available )
        if torch.cuda.is_available():
            inputs, labels = inputs.cuda(), labels.cuda()

        # Forward Propagation
        output, h = model.forward(inputs, h)

        test_loss = error_func(output.squeeze(), labels.float())
        test_losses.append(test_loss.item())

        # convert output probabilities to predicted class (0 or 1)
        predictions = torch.round(output.squeeze())

        # Compare predictions with true labels
        correct_tensor = predictions.eq(labels.float().view_as(predictions))
        correct = np.squeeze(correct_tensor.numpy()) if not train_on_gpu else np.squeeze(correct_tensor.cpu().numpy())
        num_correct += np.sum(correct)

    print("Test loss: {:.3f}".format(np.mean(test_losses)))

    # accuracy over all test data
    test_acc = num_correct / len(test_loader.dataset)
    print("Test accuracy: {:.3f}".format(test_acc))


In [79]:
def prediction(model, input_text):
    model.eval()
    with torch.no_grad():
        input_text = input_text.lower()
        input_text = ''.join([char for char in input_text if char not in punctuation])
        inputs = [model.word2int.get(word, 0) for word in input_text.split()]
        inputs = torch.tensor(inputs, dtype=torch.long).unsqueeze(0)
        inputs = inputs.to(model.device)

        hidden = model.init_hidden(1)
        outputs, _ = model(inputs, hidden)

        if outputs.ndim == 3:  # In case of sequence outputs
            outputs = outputs[:, -1, :]  # Select the last timestep
        outputs = outputs.squeeze()  # Reduce to 1D tensor if needed
        pred_prob = torch.sigmoid(outputs)
        pred_label = torch.round(pred_prob).item()

        print(f'Raw output (pre-sigmoid): {outputs.item()}')
        print(f'Prediction value, pre-rounding: {pred_prob.item():.6f}')

        return "Positive" if pred_label == 1 else "Negative"


In [80]:
if __name__ == '__main__':
    batch_size = 50
    # Pre-process the data, using create_test_val_set
    reviews_encoded, labels_encoded, word2int = pre_process()
    reviews_encoded, labels_ndarray = process_reviews(reviews_encoded, labels_encoded)
    reviews_ndarray = pad_features(reviews_encoded, 200)
    train_x, train_y, val_x, val_y, test_x, test_y = create_traing_test_val_set(reviews_ndarray, labels_ndarray)
    train_loader, val_loader, test_loader = create_batches(train_x, train_y, val_x, val_y, test_x, test_y, batch_size)

    #init mode
    mode = "PREDICTION"  # Choose from TRAIN, TEST, PREDICTION

    #model details
    output_size = 1
    embedding_dim = 1024
    n_hidden = 512
    n_layers = 4
    model = SentimentRNN(word2int, embedding_dim, n_hidden, n_layers)

    if mode == "TRAIN":
        print("Training Mode")
        train(model, train_loader, val_loader, batch_size=batch_size, epochs=25)

    if mode == "PREDICTION":
        print("Prediction Mode")
        positive = "I enjoyed the movie and the seats were comfy"
        negative = "The worst movie I have seen; acting was terrible and I want my money back. This movie had bad acting and the dialogue was slow."
        inputs = [positive, negative]
        for input_text in inputs:
            prediction_result = prediction(model, input_text)
            print(prediction_result)

    if mode == "TEST":
        print("Testing Mode")
        test(model, test_loader)


    #these conditional statments were inspired by my time coding in Java, which was famously known for having switch cases in GE's.


IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



Removed 1 reviews
Prediction Mode
Raw output (pre-sigmoid): 0.5027230978012085
Prediction value, pre-rounding: 0.623099
Positive
Raw output (pre-sigmoid): 0.5018658638000488
Prediction value, pre-rounding: 0.622898
Positive


In [81]:
#Lowsy results but I did my best, followed online pytorch LSTM implmentation, and watch a few YT videos. Wasnt the easiest thing ITW (in the world)
def evaluate_model(model, test_loader):
    model.eval()
    y_true = []
    y_pred = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(model.device)
            labels = labels.to(model.device)
            predictions, _ = model(inputs, model.init_hidden(inputs.size(0)))

            predicted_labels = torch.round(predictions).cpu().numpy()
            y_pred.extend(predicted_labels)
            y_true.extend(labels.cpu().numpy())

    accuracy = accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)

    print(f'Accuracy: {accuracy*100:.2f}%') #2 decimal places bc why not
    print(f'Precision: {precision*100:.2f}%')
    print(f'Recall: {recall*100:.2f}%')
    print(f'F1-Score: {f1*100:.2f}%')

    return accuracy, precision, recall, f1
evaluate_model(model, test_loader)


Accuracy: 50.00%
Precision: 50.00%
Recall: 100.00%
F1-Score: 66.67%


(0.5, 0.5, 1.0, 0.6666666666666666)