# Imports and Configurations

In [None]:
# Feature Generation
import os
from sklearn.feature_extraction.text import CountVectorizer

# Feature Selection
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.stem import PorterStemmer
from nltk.tokenize import word_tokenize

# Data Split
from sklearn.model_selection import train_test_split

# Naive Bayes
import numpy as np
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import accuracy_score

# Logistic Regression
from sklearn.linear_model import LogisticRegression

# SVM
from sklearn.svm import SVC

# BERT done on Colab
from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification, Trainer, TrainingArguments
import torch
import evaluate


In [None]:
# # COLAB
# from google.colab import drive
# drive.mount('/content/drive')

# Download necessary packages
# import nltk
# nltk.download('stopwords')
# nltk.download('wordnet')
# nltk.download('omw-1.4')
# %pip install transformers[torch]
# %pip install accelerate -U
# %pip install evaluate

# Load data

In [None]:
# USE CORRECT DATA FOLDER PATH

# COLAB
# data_folder_path = '/content/drive/MyDrive/Colab Notebooks/data'
# LAPTOP
data_folder_path = 'data'

# Reading positive reviews
pos_folder = os.path.join(data_folder_path, 'pos')
pos_reviews = [open(os.path.join(pos_folder, file), 'r', encoding='utf-8').read() for file in os.listdir(pos_folder)]

# Reading negative reviews
neg_folder = os.path.join(data_folder_path, 'neg')
neg_reviews = [open(os.path.join(neg_folder, file), 'r', encoding='utf-8').read() for file in os.listdir(neg_folder)]

combined_reviews = pos_reviews + neg_reviews

# Data Splits

In [None]:
def split_data(reviews):

    # Create list of labels for the combined list of positive and negative reviews
    labels = ['positive'] * len(pos_reviews) + ['negative'] * len(neg_reviews)

    # Splitting into training (70%) and temp (30%)
    train_data, temp_data, train_labels, temp_labels = train_test_split(reviews, labels, test_size=0.3, random_state=1)

    # Splitting temp into development (50%) and test (50%)
    dev_data, test_data, dev_labels, test_labels = train_test_split(temp_data, temp_labels, test_size=0.5, random_state=1)

    # Train_data, dev data and test data in ratio of 70:15:15
    return train_data, dev_data, test_data, train_labels, dev_labels, test_labels


# Feature Selection

In [None]:
# Preprocesses text according to parameters
def preprocess_text(doc, remove_stopwords=False, lemmatize=False, stem=False):
    words = doc.lower().split()
    if remove_stopwords:
        stop_words = set(stopwords.words('english'))
        words = [word for word in words if word not in stop_words]
    # (Can't have both lemmatization and stemming)
    if lemmatize:
        lemmatizer = WordNetLemmatizer()
        words = [lemmatizer.lemmatize(word) for word in words]
    elif stem:
        porter = PorterStemmer()
        words = [porter.stem(word) for word in words]

    # Joins the words again to create a preprocessed doc
    preprocessed_words = ' '.join(words)

    return preprocessed_words

# Uses tfidf for feature selection in the sparse matrix
def tfidf(feature_vec, N):

    # Compute Inverse Document Frequency (IDF)
    document_frequency = np.sum(feature_vec > 0, axis=0)
    idf = np.log((N) / (1 + document_frequency))

    # Calculate TF-IDF
    X_feature_vec_tfidf = feature_vec * idf

    return X_feature_vec_tfidf


# Preprocess the text data
# Feature set 1 has stopwords removed
feature_set1_data = [preprocess_text(doc, remove_stopwords=True) for doc in combined_reviews]
# Feature set 2 has stemming
feature_set2_data = [preprocess_text(doc, stem=True) for doc in combined_reviews]
# Feature set 3 has lematization
feature_set3_data = [preprocess_text(doc, lemmatize=True) for doc in combined_reviews]
# Feature set 4 has stopwords removed and lemmatization
feature_set4_data = [preprocess_text(doc, remove_stopwords=True, lemmatize=True) for doc in combined_reviews]
# Feature set 5 has stopwords removed, lemmatization and TFIDF (after vectorization)
feature_set5_data = [preprocess_text(doc, remove_stopwords=True, lemmatize=True) for doc in combined_reviews]


# Split the data
feature_set1_train_data, feature_set1_dev_data, feature_set1_test_data, feature_set1_train_labels, feature_set1_dev_labels, feature_set1_test_labels = split_data(feature_set1_data)
feature_set2_train_data, feature_set2_dev_data, feature_set2_test_data, feature_set2_train_labels, feature_set2_dev_labels, feature_set2_test_labels = split_data(feature_set2_data)
feature_set3_train_data, feature_set3_dev_data, feature_set3_test_data, feature_set3_train_labels, feature_set3_dev_labels, feature_set3_test_labels = split_data(feature_set3_data)
feature_set4_train_data, feature_set4_dev_data, feature_set4_test_data, feature_set4_train_labels, feature_set4_dev_labels, feature_set4_test_labels = split_data(feature_set4_data)
feature_set5_train_data, feature_set5_dev_data, feature_set5_test_data, feature_set5_train_labels, feature_set5_dev_labels, feature_set5_test_labels = split_data(feature_set5_data)

# Creates a vectorizer to convert preprocessed data to a sparse matrix
# where each row represents a document (review), and each column represents
# a unique n-gram (a single word if unigram)
vectorizer = CountVectorizer(ngram_range=(1, 3))
        
# Fit assigns an index to each n-gram and transform constructs the matrix using the n-gram to index mappings
# REMOVE TOARRAY FOR FEATURE SET 1-4 FOR FASTER LR & SVM.
X_feature_set1_train = vectorizer.fit_transform(feature_set1_train_data).toarray()
X_feature_set1_dev = vectorizer.transform(feature_set1_dev_data).toarray()
X_feature_set1_test = vectorizer.transform(feature_set1_test_data).array()

X_feature_set2_train = vectorizer.fit_transform(feature_set2_train_data).toarray()
X_feature_set2_dev = vectorizer.transform(feature_set2_dev_data).toarray()
X_feature_set2_test = vectorizer.transform(feature_set2_test_data).toarray()

X_feature_set3_train = vectorizer.fit_transform(feature_set3_train_data).toarray()
X_feature_set3_dev = vectorizer.transform(feature_set3_dev_data).toarray()
X_feature_set3_test = vectorizer.transform(feature_set3_test_data).toarray()

X_feature_set4_train = vectorizer.fit_transform(feature_set4_train_data).toarray()
X_feature_set4_dev = vectorizer.transform(feature_set4_dev_data).toarray()
X_feature_set4_test = vectorizer.transform(feature_set4_test_data).toarray()

X_feature_set5_train = vectorizer.fit_transform(feature_set5_train_data).toarray()
X_feature_set5_dev = vectorizer.transform(feature_set5_dev_data).toarray()
X_feature_set5_test = vectorizer.transform(feature_set5_test_data).toarray()

# Feature selection using tfidf
X_feature_set5_train = tfidf(X_feature_set5_train, len(feature_set5_train_data))
X_feature_set5_dev = tfidf(X_feature_set5_dev, len(feature_set5_dev_data))
X_feature_set5_test = tfidf(X_feature_set5_test, len(feature_set5_test_data))

# Naive Bayes

## My Naive Bayes

In [None]:
# P (class | word) = P(word | class) * P(class) / P(word)
class NaiveBayesClassifier:
    def __init__(self):
        # P(class)
        self.class_probabilities = {}
        # P(word | class)
        self.word_probabilities_given_class = []  #[[pos, neg]]
        # P(word)
        self.word_probabilities = []

    # X is the feature matrix and y are the class labels
    def fit(self, X, y):

        y = np.array(y)
        num_features = X.shape[1]
        num_docs = X.shape[0]

        # Calculate probability of a random document belonging to a class: P(class)
        # np.unique gets the count of each label
        label_counts = np.unique(y, return_counts=True)
        classes, count = label_counts[0], label_counts[1]
        for i in range(len(classes)):
            self.class_probabilities[classes[i]] = count[i] / len(y)

        # Calculate probability of a random word appearing in a document: P(word)
        total_word_count = np.sum(X)
        # axis=0 means the sum should be taken vertically down the columns
        self.word_probabilities = np.sum(X, axis=0) / total_word_count

        # For each feature calculate P(word | class) by dividing the number of times that word appears
        # in documents belonging to each class by the total number that word appears
        for i in range(num_features):
            pos_count, neg_count = 0, 0
            # Loops through documents
            for j in range(num_docs):
                # If the word appears add to the class count
                if X[j,i] > 0 and y[j] == "positive":
                    pos_count += X[j,i]
                elif X[j,i] > 0 and y[j] == "negative":
                    neg_count += X[j,i]

            # P(word | class)
            self.word_probabilities_given_class.append([pos_count/(neg_count+pos_count), neg_count/(neg_count+pos_count)])

    # P (class | word) = P(word | class) * P(class) / P(word)
    # Returns the highest P (class | word) for document
    def predict(self, X):

        predictions = []
        # Class probabilities
        pos_prob, neg_prob = self.class_probabilities["positive"], self.class_probabilities["negative"]

        for doc in X:
            pos_prob, neg_prob = self.class_probabilities["positive"], self.class_probabilities["negative"]
            # P(class | word) for every word in doc
            for i, word_count in enumerate(doc):
                if word_count > 0:
                    # Can ignore diving by P(word) since this is the same for both pos and neg and so won't change ratio.
                    pos_prob += self.word_probabilities_given_class[i][0]
                    neg_prob += self.word_probabilities_given_class[i][1]

            # Choose the class with the higher probability
            prediction = "positive" if pos_prob > neg_prob else "negative"
            predictions.append(prediction)

        return predictions


In [None]:
# Naive Bayes Model
clf = NaiveBayesClassifier()

# Training the model
clf.fit(X_feature_set1_train, feature_set1_train_labels)

# Predictions on the development set
dev_predictions = clf.predict(X_feature_set1_dev)
# Accuracy on the development set
dev_accuracy = accuracy_score(feature_set1_dev_labels, dev_predictions)
# Predictions on the test set
test_predictions = clf.predict(X_feature_set1_test)
# Accuracy on the test set
test_accuracy = accuracy_score(feature_set1_test_labels, test_predictions)

# Print accuracies
print(f"Development Set Accuracy: {dev_accuracy:.2f}")
print(f"Test Set Accuracy: {test_accuracy:.2f}")

## Scikit-learn Naive Bayes

In [None]:
# Naive Bayes Model
clf = MultinomialNB()

# Training the model
clf.fit(X_feature_set4_train, feature_set4_train_labels)

# Predictions on the development set
dev_predictions = clf.predict(X_feature_set4_dev)
# Accuracy on the development set
dev_accuracy = accuracy_score(feature_set4_dev_labels, dev_predictions)
# Predictions on the test set
test_predictions = clf.predict(X_feature_set4_test)
# Accuracy on the test set
test_accuracy = accuracy_score(feature_set4_test_labels, test_predictions)


# Print accuracies
print(f"Development Set Accuracy: {dev_accuracy:.2f}")
print(f"Test Set Accuracy: {test_accuracy:.2f}")

# Logistic Regression

In [None]:
lr_model = LogisticRegression(
    penalty='l2',             # Regularisation type: {'none', 'l1', 'l2', 'elasticnet'}
    C=0.1,                    # Inverse of regularisation strength.
    max_iter=1000,            # Max iterations for optimisation algorithm
    solver='lbfgs',           # Optimisation algorithm: {‘lbfgs’, ‘liblinear’, ‘newton-cg’, ‘newton-cholesky’, ‘sag’, ‘saga’}
)

# Training the model
lr_model.fit(X_feature_set1_train, feature_set1_train_labels)

# Predictions on the development set
dev_predictions = lr_model.predict(X_feature_set1_dev)
# Accuracy on the development set
dev_accuracy = accuracy_score(feature_set1_dev_labels, dev_predictions)
# Predictions on the test set
test_predictions = lr_model.predict(X_feature_set1_test)
# Accuracy on the test set
test_accuracy = accuracy_score(feature_set1_test_labels, test_predictions)

# Print accuracies
print(f"Development Set Accuracy: {dev_accuracy:.2f}")
print(f"Test Set Accuracy: {test_accuracy:.2f}")

# Average:
# 1 0.84 0.84
# 2 0.82 0.83
# 3 0.82 0.83
# 4 0.82 0.83
# 5 0.85 0.85

# SVM

In [None]:
svm_model = SVC(
    C=5.0,               # Regularisation parameter
    kernel='rbf',        # Kernel type: {‘linear’, ‘poly’, ‘rbf’, ‘sigmoid’}
)

# Training the model
svm_model.fit(X_feature_set1_train, feature_set1_train_labels)

# Predictions on the development set
dev_predictions = svm_model.predict(X_feature_set1_dev)
# Accuracy on the development set
dev_accuracy = accuracy_score(feature_set1_dev_labels, dev_predictions)
# Predictions on the test set
test_predictions = svm_model.predict(X_feature_set1_test)
# Accuracy on the test set
test_accuracy = accuracy_score(feature_set1_test_labels, test_predictions)

# Print accuracies
print(f"Development Set Accuracy: {dev_accuracy:.2f}")
print(f"Test Set Accuracy: {test_accuracy:.2f}")

# Average:
# 1 0.83 0.82
# 2 0.81 0.82
# 3 0.82 0.82
# 4 0.82 0.83
# 5 0.84 0.84

# BERT

## Dataset

In [None]:
# Data split
train_data, dev_data, test_data, train_labels, dev_labels, test_labels = split_data(combined_reviews)
# Change labels to 0 if negative and 1 for positive
train_labels = [0 if label == "negative" else 1 for label in train_labels]
dev_labels = [0 if label == "negative" else 1 for label in dev_labels]
test_labels = [0 if label == "negative" else 1 for label in test_labels]

# Initialise DistilBert tokenizer
tokenizer = DistilBertTokenizerFast.from_pretrained('distilbert-base-uncased')

# Tokenize data
train_encodings = tokenizer(train_data, truncation=True, padding=True, return_tensors="pt")
dev_encodings = tokenizer(dev_data, truncation=True, padding=True)
test_encodings = tokenizer(test_data, truncation=True, padding=True)

# Function to compute accuracy of model
metric = evaluate.load("accuracy")
def compute_metrics(eval_pred):
    logits = eval_pred[0]
    labels = eval_pred[1]
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)

# Turn labels and encodings into a PyTorch Dataset object
class IMDbDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = IMDbDataset(train_encodings, train_labels)
dev_dataset = IMDbDataset(dev_encodings, dev_labels)
test_dataset = IMDbDataset(test_encodings, test_labels)

## Training

In [None]:
training_args = TrainingArguments(
    output_dir='./results',          # output directory
    num_train_epochs=3,              # total number of training epochs
    per_device_train_batch_size=16,  # batch size per device during training
    per_device_eval_batch_size=64,   # batch size for evaluation
    warmup_steps=500,                # number of warmup steps for learning rate scheduler
    weight_decay=0.01,               # strength of weight decay
    logging_dir='./logs',            # directory for storing logs
    logging_steps=10,
)

model = DistilBertForSequenceClassification.from_pretrained("distilbert-base-uncased")

trainer = Trainer(
    model=model,                         # the instantiated model to be trained
    args=training_args,                  # training arguments, defined above
    train_dataset=train_dataset,         # training dataset
    eval_dataset=dev_dataset,            # evaluation dataset
    compute_metrics=compute_metrics      # computes accuracy
)

trainer.train()
results = trainer.predict(test_dataset)
compute_metrics(results)