<a href="https://colab.research.google.com/github/Mozzer2310/COMP34711-Deep-Learning/blob/main/task2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Colab generated code to mount drive, remove if not needed
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [8]:
from os import listdir
import glob
import random
import string
import nltk
from nltk.tokenize import word_tokenize
import tensorflow as tf
import numpy as np

# Download nltk punkt for tokenizer
nltk.download('punkt')


class NeuralNetwork:

    def __init__(self) -> None:
        self.vocab = set()
        self.reviews = []
        self.classification = []

    def read_data(self, path: str) -> list:
        # Find all the .txt files at the path, remove the README from the list
        file_paths = glob.glob(path + "/*.txt")
        file_paths.remove(path + "/README.txt")

        corpora = []
        # Read each file in the list of files
        for file_path in file_paths:
            f = open(file_path, "r")
            # Add the data to an array of corpora
            corpora.append(f.read())

        return corpora

    def preprocess(self, corpora: list):
        self.reviews = []
        self.classification = []
        # process the raw data of each corpus in the list
        for corpus in corpora:
            self.process_raw(corpus)

        # Flatten the processed reviews, to get a single list, convert to a set to get the vocab list
        self.vocab = set(
            [item for sublist in self.reviews for item in sublist.split(" ")])

    def process_raw(self, raw: str):
        # split over the lines (## defines a line and is on each new line as defined by README)
        lines = raw.splitlines()
        # remove '[t]' tags
        lines = [ele for ele in lines if ele != "[t]"]

        # process each line in the text, add the result to an array and add review class to an array
        for line in lines:
            # Check that the line isn't empty
            if len(line) != 0:
                # Process the line, get returned processed line and its review info for classifying
                processed_review, review_info = self.process_line(line)
                # Only consider reviews which can be classified, i.e. have been classified in text file
                if len(review_info) != 0:
                    # Consider weights of reviews, in the case that a review is part positive and part negative
                    # the weights will help when classifying a review if it is 'more' postive than negative, and vice versa
                    num_pos = 3 * review_info.count("+3") + 2 * review_info.count(
                        "+2") + review_info.count("+1") + review_info.count("+")
                    num_neg = 3 * review_info.count("-3") + 2 * review_info.count(
                        "-2") + review_info.count("-1") + review_info.count("-")
                    # 1 for postive and 0 for negative review, add to list
                    if num_pos > num_neg:
                        self.classification.append(1)
                        # add the review to an array
                        self.reviews.append(processed_review)
                    elif num_pos < num_neg:
                        self.classification.append(0)
                        # add the review to an array
                        self.reviews.append(processed_review)

    def process_line(self, line: str):
        # Get the substring before the ## delimiter, if not present return empty values for error handling
        try:
            delim_index = line.index("##")
        except ValueError:
            delim_index = None
        if delim_index == None:
            return [], ""
        # sub-string before the delimiter is the information about the class of review
        review_info = line[:delim_index]
        # sub-string after the delimiter is the review
        line = line[delim_index+2:]

        # Convert to lower case
        line_lwr = line.lower()
        # # Remove everything except punctuation
        line_clean = line_lwr.translate(
            str.maketrans('', '', string.punctuation))
        # Tokenize the review and rejoin to remove whitespace
        line_tokens = " ".join(word_tokenize(line_clean))

        return line_tokens, review_info

    def test(self, train_data: list, train_class: list, test_data: list, test_class: list):
        ######### USING ENCODER ######################
        # convert the train and test data and classes to a tensor flow dataset
        train_dataset = tf.data.Dataset.from_tensor_slices(
            (train_data, train_class))
        test_dataset = tf.data.Dataset.from_tensor_slices(
            (test_data, test_class))

        # Create batches of the train and test data
        # to be passed to the model
        BUFFER_SIZE = 10000
        BATCH_SIZE = 32
        train_dataset = train_dataset.shuffle(BUFFER_SIZE).batch(
            BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
        test_dataset = test_dataset.batch(
            BATCH_SIZE).prefetch(tf.data.AUTOTUNE)

        # Set a vocab size
        VOCAB_SIZE = 5000
        # set the encoding layer to keras TextVectorization
        encoder = tf.keras.layers.TextVectorization(
            max_tokens=VOCAB_SIZE)
        # Set the layers vocab from the dataset text data
        encoder.adapt(train_dataset.map(lambda text, label: text))

        # Create the model
        # encoding layer
        # embedding layer, input dimension the vocab size
        # bidirection LSTM layer with dropout 0.2
        # Converts the output of the LSTM to a single value, activation RELU
        model = tf.keras.Sequential([
            encoder,
            tf.keras.layers.Embedding(
                input_dim=len(encoder.get_vocabulary()),
                output_dim=32,
                # Use masking to handle the variable sequence lengths
                mask_zero=True),
            tf.keras.layers.Bidirectional(
                tf.keras.layers.LSTM(32, recurrent_dropout=0.2)),
            # tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(1, activation='relu')
        ])

        # Compile the model to configure the training process
        model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
                      optimizer=tf.keras.optimizers.Adam(1e-4),
                      metrics=['accuracy'])

        # train the model on the train data set with 10 epochs and no output
        history = model.fit(
            train_dataset,
            epochs=10,
            batch_size=BATCH_SIZE,
            verbose=0)

        # Evaluate the model on the test dataset
        test_loss, test_acc = model.evaluate(test_dataset, verbose=0)

        return test_acc

    def nfold_cv(self, n: int = 5) -> list:
        # get the indices of the positve and negative reviews seperately
        pos_inds = np.where(np.array(self.classification) == 1)
        neg_inds = np.where(np.array(self.classification) == 0)

        # get the postive and negative reviews in separate lists
        pos_reviews = list(np.array(self.reviews)[pos_inds])
        neg_reviews = list(np.array(self.reviews)[neg_inds])

        # shuffle the lists randomly
        shuffled_pos = pos_reviews.copy()
        shuffled_neg = neg_reviews.copy()
        random.shuffle(shuffled_pos)
        random.shuffle(shuffled_neg)

        # instantiate empty lists in a dict
        groups = {"test": [], "test_class": [], "train": [], "train_class": []}
        # for the number of folds
        for split in range(n):
            # get the lower position of the positive and negative reviews
            lower_pos = (len(shuffled_pos)//n) * split
            lower_neg = (len(shuffled_neg)//n) * split
            # edge case, for when upper is not needed (at end of lists)
            if split == n-1:
                # split the two lists into testing (from lower bound to end of list)
                # and training (the rest of the lists)
                test_pos = shuffled_pos[lower_pos:]
                test_neg = shuffled_neg[lower_neg:]
                train_pos = shuffled_pos[:lower_pos]
                train_neg = shuffled_neg[:lower_neg]
            else:
                # split the two lists into testing (between the two bounds)
                # and training (the rest of the lists)
                upper_pos = (len(shuffled_pos)//n) * (split+1)
                upper_neg = (len(shuffled_neg)//n) * (split+1)
                test_pos = shuffled_pos[lower_pos:upper_pos]
                test_neg = shuffled_neg[lower_neg:upper_neg]
                train_pos = shuffled_pos[:lower_pos] + shuffled_pos[upper_pos:]
                train_neg = shuffled_neg[:lower_neg] + shuffled_neg[upper_neg:]
            # add the relevant sections of the lists to the dict
            groups["test"].append(test_pos + test_neg)
            groups["train"].append(train_pos + train_neg)
            # create two class list the with 1s for positive and 0s for negative
            groups["test_class"].append(
                [1] * len(test_pos) + [0] * len(test_neg))
            groups["train_class"].append(
                [1] * len(train_pos) + [0] * len(train_neg))

        accuracies = []
        # pass the data for each fold to the model for training and testing
        for fold in range(n):
            accuracies.append(self.test(
                groups["train"][fold], groups["train_class"][fold], groups["test"][fold], groups["test_class"][fold]))
        
        return accuracies


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [9]:
def main():
    neural = NeuralNetwork()
    # specify the directory path to the review files
    corpora = neural.read_data("/content/drive/MyDrive/COMP34711-Deep-Learning/product_reviews")

    neural.preprocess(corpora)
    accuracies = neural.nfold_cv()
    print(f"Accuracies: {accuracies}")
    print(f"Mean of Accuracies: {np.mean(accuracies)}")
    print(f"Standard Deviation of Accuracies: {np.std(accuracies)}")
    # TODO: hyper-parameter selection: batch size, recurrent drop out parameter


test = main()


Accuracies: [0.7129186391830444, 0.7416267991065979, 0.720095694065094, 0.7535884976387024, 0.7037914395332336]
Mean of Accuracies: 0.7264042139053345
Standard Deviation of Accuracies: 0.018459187930538502
