In [None]:
from google.colab import drive
drive.mount('/content/drive/')
path = "/content/drive/My Drive/COMP_551_Machine_Learning/Miniproject3/data"

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from sklearn import feature_extraction
from sklearn.feature_extraction.text import CountVectorizer
import sys
import random as rn
import torch
import os
import re

#Seed to stabilize outcomes
rn.seed(321)
np.random.seed(321)
torch.manual_seed(321)
torch.cuda.manual_seed(321)

In [None]:
rawTrain_x = []
rawTrain_y = []

rawTest_x = []
rawTest_y = []

# Old way to load files on Kaggle, you can safely ignore this.
# for dirname, _, filenames in os.walk(path):
#     for filename in filenames:
#         if ("train" in dirname) and os.path.isfile(os.path.join(dirname, filename)):  
#             f = open(os.path.join(dirname, filename), 'r')
#             rawTrain_x.append(f.read().lower())
#             f.close()
#             if ("pos" in dirname):
#                 rawTrain_y.append(int(1))
#             else:
#                 rawTrain_y.append(int(0))
#             #rawTrain_y.append(int(re.search("(?<=_)(.*?)(?=\.)",filename).group(1)))
            
#         if ("test" in dirname) and os.path.isfile(os.path.join(dirname, filename)):  
#             f = open(os.path.join(dirname, filename), 'r')
#             rawTest_x.append(f.read().lower())
#             f.close()
#             if ("pos" in dirname):
#                 rawTest_y.append(int(1))
#             else:
#                 rawTest_y.append(int(0))
#             #rawTest_y.append(int(re.search("(?<=_)(.*?)(?=\.)",filename).group(1)))
            
# rawTrain_x = np.asarray(rawTrain_x)
# rawTrain_y = np.asarray(rawTrain_y)
# rawTest_x = np.asarray(rawTest_x)
# rawTest_y = np.asarray(rawTest_y)

# print(rawTrain_x.shape)
# print(rawTrain_y.shape)
# print(rawTest_x.shape)
# print(rawTest_y.shape)

# print(np.max(rawTest_y))
# print(np.min(rawTest_y))

# Load data using tensorflow
import tensorflow as tf
import tensorflow_datasets as tfds

data = tfds.load('imdb_reviews', as_supervised=True)

In [None]:
train_data, test_data = data['train'], data['test']
rawTrain_x = []
rawTrain_y = []

for sentence, label in train_data:
    rawTrain_x.append(str(sentence.numpy()))
    rawTrain_y.append(int(str(label.numpy())))

for sentence, label in test_data:
    rawTest_x.append(str(sentence.numpy()))
    rawTest_y.append(int(str(label.numpy())))

rawTrain_x = np.asarray(rawTrain_x)
rawTrain_y = np.asarray(rawTrain_y)
rawTest_x = np.asarray(rawTest_x)
rawTest_y = np.asarray(rawTest_y)

print(rawTrain_x.shape)
print(rawTest_x.shape)

print(np.min(rawTest_y))
print(np.max(rawTest_y))

In [None]:
#Save data for later use
np.savez('zipData.npz',Train_X=rawTrain_x,Train_Y=rawTrain_y, Test_X=rawTest_x, Test_Y=rawTest_y)

In [None]:
Data_load = np.load('zipData.npz')
rawTrain_x = Data_load['Train_X']
rawTrain_y = Data_load['Train_Y']
rawTest_x = Data_load['Test_X']
rawTest_y = Data_load['Test_Y']

In [None]:
# Countvectorizer is a method to convert text to numerical data
# Each input is preprocessed, tokenized, and represented as a sparse matrix
# Bag-Of-Words representation: transforms the text into fixed-length vectors
# Bag-of-Words: doesn’t take into account the order and the structure of the words, but only number of times a word appears in the sentence

# Preprocess Data using NLTK for punctuation, stop words removal, lower case, stemming etc.
import nltk # Natural Language Toolkit 
from nltk.corpus import stopwords, movie_reviews, wordnet
from nltk import word_tokenize, wordnet
from nltk.stem import WordNetLemmatizer 

nltk.download(['stopwords', 'movie_reviews', 'punkt', 'wordnet'])

# Form Bag of Words Representation using CountVectorizer()
vectorizer = CountVectorizer()
vectorizer.fit(rawTrain_x)
vector = vectorizer.transform(rawTrain_x)

words_raw = np.asarray(sorted(vectorizer.vocabulary_)) # List of vocabulary present in Training Data
words_raw = np.char.lower(words_raw)
print(words_raw.shape)

stopwords = nltk.corpus.stopwords.words()
words = [w for w in words_raw if w.lower() not in stopwords]
print(np.asarray(words).shape)

# print(sorted(vectorizer.vocabulary_))
# print(vector.shape)

# Note: filter "out of vocabulary" (OOV) words out of test data before applying the model
# Also can filter out rare words from the training set, replace using an "UNKNOWN" token.

In [None]:
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import wordnet

nltk.download('wordnet')
nltk.download('punkt')
from nltk.stem import WordNetLemmatizer

wordnet_lemmatizer = WordNetLemmatizer()

sentence = "He was running and eating at same time. He has bad habit of swimming after playing long hours in the Sun."
punctuations="?:!.,;"
sentence_words = nltk.word_tokenize(sentence)
for word in sentence_words:
    if word in punctuations:
        sentence_words.remove(word)

sentence_words
print("{0:20}{1:20}".format("Word","Lemma"))
for word in sentence_words:
    print ("{0:20}{1:20}".format(word,wordnet_lemmatizer.lemmatize(word)))

Task 2 - Naive Bayes model

In [None]:
# 0 is negative and 1 is positive
class NBC:
    def __init__(self):
        # Create an instance of the CountVectorizer class
        self.vectorizer = CountVectorizer(strip_accents = 'unicode',        # Clean the data
                                 stop_words = 'english', 
                                 lowercase = True, 
                                 max_df = 0.5, #May want to tweak these last two
                                 min_df = 10)
        self.prior = [0,0] # positive, negative                            # Initialize Prior
        self.likelihood = [{},{}] # positive, negative                     # Initialize Liklihood

    # this function is only for classifying positive and negative sentiment.
    def fit(self,X,Y):
        for j in range(1000,X.shape[0]+1,1000):
          x = self.vectorizer.fit_transform(X[j-1000:j])
          x = np.array(x.toarray())
          positive_values = np.where(Y[j-1000:j] == 1, 1, 0)                # Convert Positive Values == 1   
          negative_values = np.where(Y[j-1000:j] == 0, 1, 0)                # Convert Negative Values == 1 
          self.prior[0] += (sum(positive_values)/x.shape[0])                
          self.prior[1] += ((x.shape[0]-sum(positive_values))/x.shape[0])   
          keys_to_pos = self.vectorizer.vocabulary_                         # Get dictionary (word : word position) in Tokenizer 
          keys = keys_to_pos.keys()                                         # Get words in dictionary 
          for key in keys:
              i = keys_to_pos[key]
              positive = sum(x[:,i].T * positive_values)
              negative = sum(x[:,i].T * negative_values)
              feature_name = key                  
              if feature_name not in self.likelihood[0]:                    
                self.likelihood[0][feature_name] = 0

              if feature_name not in self.likelihood[1]:
                self.likelihood[1][feature_name] = 0

              self.likelihood[0][feature_name] += positive
              self.likelihood[1][feature_name] += negative

        # laplace smoothing
        total_count = X.shape[0] + 2

        for key in self.likelihood[0].keys():                              # Find likelihood by determining if word is a positive or negative sentiment
            self.likelihood[0][key] += 1
            self.likelihood[1][key] += 1
            self.likelihood[0][key] /= total_count                         # Determine probablility for likelihoods
            self.likelihood[1][key] /= total_count
        
        self.prior[0] /= total_count                                       # Determine probablility for priors
        self.prior[1] /= total_count


#     # this function is only for classifying positive and negative sentiment.
#     def fit(self,X,Y):
#         x = self.vectorizer.fit_transform(X)
#         x = np.array(x.toarray())
#         positive_values = np.where(Y == 1, 1, 0)         
#         negative_values = np.where(Y == 0, 1, 0) 
#         self.prior.append(sum(positive_values)/x.shape[0])
#         self.prior.append((x.shape[0]-sum(positive_values))/x.shape[0])
#         for i in range(x.shape[1]):
#             # adding laplace smoothing
#             total_count = sum(x[:,i].T)
#             positive = sum(x[:,i].T * positive_values) 
#             negative = sum(x[:,i].T * negative_values)
#             feature_name = self.vectorizer.get_feature_names()[i]
#             if positive > 0:
#                 self.likelihood[0][feature_name] = positive/total_count
#             if negative > 0:
#                 self.likelihood[1][feature_name] = negative/total_count
        
    def predict(self,X):
        predictions = []
        for j in range(1000, X.shape[0]+1, 1000):
            x = self.vectorizer.fit_transform(X[j-1000:j])
            x = np.array(x.toarray())
            keys_to_pos = self.vectorizer.vocabulary_
            keys = keys_to_pos.keys()
            for i in range(x.shape[0]):
                
                positive_prediction = self.prior[0]
                for key in keys:
                    j = keys_to_pos[key]
                    if x[i,j] != 0 and key in self.likelihood[0]:
                        positive_prediction *= self.likelihood[0][key] * x[i,j]
                    
                negative_prediction = self.prior[1]
                for key in keys:
                    j = keys_to_pos[key]
                    if x[i,j] != 0 and key in self.likelihood[1]:
                        negative_prediction *= self.likelihood[1][key] * x[i,j]
                    
                if positive_prediction >= negative_prediction:
                    predictions.append(1)
                else:
                    predictions.append(0)
                
        return np.array(predictions)
    
    def evaluate_accuracy(self, true_y, pred_y):
        return sum(np.where(true_y == pred_y,1,0))/ true_y.shape[0]

In [None]:
#Train the model and get the test accuracy
NaiveBayesClassifier = NBC()
NaiveBayesClassifier.fit(rawTrain_x, rawTrain_y)
predictions = NaiveBayesClassifier.predict(rawTest_x)
print(f"Naive Bayes Classifier Accuracy = {NaiveBayesClassifier.evaluate_accuracy(rawTest_y, predictions)*100}%")

In [None]:
def confusion_matrix_values(predicted, true):
  TP = np.sum(np.where(np.where( predicted == 1 , 1, -1) == true, 1, 0))
  FP = np.sum(np.where(np.where( predicted == 1 , 1, -1) == np.where( true == 0 , 1, 0), 1, 0))
  TN = np.sum(np.where(np.where( predicted == 0 , 1, -1) == np.where( true == 0 , 1, 0), 1, 0))
  FN = np.sum(np.where(np.where( predicted == 0 , 1, -1) == np.where( true == 1 , 1, 0), 1, 0))
  return TP, FP, TN, FN

TP, FP, TN, FN = confusion_matrix_values(predictions, rawTest_y)
print("TP: ",TP, "\tFP: ", FP, "\tTN: ", TN, "\tFN: ", FN)

In [None]:
#Get the train accuracy
NaiveBayesClassifier = NBC()
NaiveBayesClassifier.fit(rawTrain_x, rawTrain_y)
predictions = NaiveBayesClassifier.predict(rawTrain_x)
print(f"Naive Bayes Classifier Accuracy = {NaiveBayesClassifier.evaluate_accuracy(rawTrain_y, predictions)*100}%")
TP, FP, TN, FN = confusion_matrix_values(predictions, rawTrain_y)
print("TP: ",TP, "\tFP: ", FP, "\tTN: ", TN, "\tFN: ", FN)

Logistic Regression Model using Sklearn

In [None]:
#Trying out Logistic Regression on this problem. 

from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, confusion_matrix

# Convert the text to a bag-of-words representation
vectorizer = CountVectorizer()
X_train = vectorizer.fit_transform(rawTrain_x)
X_test = vectorizer.transform(rawTest_x)

# Train the logistic regression model
LRSAGModel = LogisticRegression(penalty='l2', solver='saga', verbose=False)
LRSAGModel.fit(X_train, rawTrain_y)

# Make predictions on the test set
LRSAG_predTest = LRSAGModel.predict(X_test)

# Evaluate the accuracy of the classifier on test
accuracy = accuracy_score(rawTest_y, LRSAG_predTest)
print("Logisitc Regression SAG Accuracy: ", accuracy * 100 , "%")

# Make predictions on the train set
LRSAG_pred = LRSAGModel.predict(X_train)

# Evaluate the accuracy of the classifier on train
accuracy = accuracy_score(rawTrain_y, LRSAG_pred)
print("Logisitc Regression SAG Accuracy: ", accuracy * 100 , "%")

#Try again with the SAGA model
LRSAGAModel = LogisticRegression(penalty='l2', solver='saga', verbose=False)
LRSAGAModel.fit(X_train, rawTrain_y)

# Make predictions on the test set
LRSAGA_predTest = LRSAGAModel.predict(X_test)

# Evaluate the accuracy of the classifier
accuracy = accuracy_score(rawTest_y, LRSAGA_predTest)
print("Logisitc Regression SAGA Accuracy: ", accuracy * 100 , "%")

# Make predictions on the train set
LRSAGA_pred = LRSAGAModel.predict(X_train)

# Evaluate the accuracy of the classifier
accuracy = accuracy_score(rawTrain_y, LRSAGA_pred)
print("Logisitc Regression SAGA Accuracy: ", accuracy * 100 , "%")

In [None]:
def get_confusion_matrix_values(cm):
    return(cm[0][0], cm[0][1], cm[1][0], cm[1][1])

confMatrixSAG = confusion_matrix(rawTest_y, LRSAG_predTest)
TP, FP, FN, TN = get_confusion_matrix_values(confMatrixSAG)
print("TP: ",TP, "\tFP: ", FP, "\tTN: ", TN, "\tFN: ", FN)
confMatrixSAGA = confusion_matrix(rawTest_y, LRSAGA_predTest)
TP, FP, FN, TN = get_confusion_matrix_values(confMatrixSAGA)
print("TP: ",TP, "\tFP: ", FP, "\tTN: ", TN, "\tFN: ", FN)

Now we can test an SVM model, followed by a modified SVM model, NBSVM, which is described by Wang and Manning's paper. Uses Sklearn as above.

In [None]:
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
from sklearn.svm import LinearSVC

SVMModel = LinearSVC(C=0.5, random_state=42)
SVMModel.fit(X_train, rawTrain_y)

SVM_pred = SVMModel.predict(X_train)
print("SVM Model Train Accuracy: " , accuracy_score(rawTrain_y, SVM_pred) * 100 , "%")

SVM_pred = SVMModel.predict(X_test)
print("SVM Model Test Accuracy: " , accuracy_score(rawTest_y, SVM_pred) * 100 , "%")

confMatrixSVM = confusion_matrix(rawTest_y, SVM_pred)
TP, FP, FN, TN = get_confusion_matrix_values(confMatrixSVM)
print("TP: ",TP, "\tFP: ", FP, "\tTN: ", TN, "\tFN: ", FN)

Now for the NBSVM. Inspired by this tutorial: https://medium.com/@asmaiya/a-neural-implementation-of-nbsvm-in-keras-d4ef8c96cb7c which is based on the paper here: https://nlp.stanford.edu/pubs/sidaw12_simple_sentiment.pdf

In [None]:
from keras import backend as K
from keras.models import Model
from keras.layers.core import Activation
from keras.layers import Input, Embedding, Flatten, dot
from keras.optimizers import Adam

#NBSVM needs sequences of word IDs to work (just like we saw in class with the animal example). Do that here.
def matrixToWordIds(matrix, maxlen):
    x = []
    result = []
    for index, row in enumerate(matrix):
        seq = []
        indices = (row.indices + 1).astype(np.int64) #Ensure int conversion or get errors
        np.append(result, len(indices))
        data = (row.data).astype(np.int64) 
        count_dict = dict(zip(indices, data)) #Use zip to quickly make a dictionary
        for k,v in count_dict.items():
            seq.extend([k]*v)
        num_words = len(seq)
        result.append(num_words)
        # Pad up to the max length with zeroes
        if num_words < maxlen: 
            seq = np.pad(seq, (maxlen - num_words, 0),    
                         mode='constant')
        # Truncate down to max length
        else:                  
            seq = seq[-maxlen:]
        x.append(seq)
    result = np.array(result)
    #print('Sequence stats: Avg: ', result.mean(), '\tMax: ',result.max(), '\tMin: ', result.min())
    return np.array(x) #Better to return as nparray.

maxlen = 2000
x_train_NBSVM = matrixToWordIds(X_train, maxlen)
x_test_NBSVM = matrixToWordIds(X_test, maxlen)

In [None]:
#Need to get NB Log-count ratios. Use matrix form, not sequence form.
def getRatios(matrix, y, yi):
    p = matrix[y==yi].sum(0)
    return (p+1) / ((y==yi).sum()+1)

NBLogCountRatios = np.log(getRatios(X_train, rawTrain_y, 1)/getRatios(X_train, rawTrain_y, 0))
NBLogCountRatios = np.squeeze(np.asarray(NBLogCountRatios))

In [None]:
#Now we create the model. We'll need to get the amount of words in our vectorizer's dictionary.
numWords = len([v for k,v in vectorizer.vocabulary_.items()]) + 1

# Create the first embedding matrix which holds NB LogCount ratios
embedding_matrix = np.zeros((numWords, 1))
for i in range(1, numWords): # skip 0, the padding value
  embedding_matrix[i] = NBLogCountRatios[i-1]
# Setup the model and its parameters
inp = Input(shape=(maxlen,))
r = Embedding(numWords, 1, input_length=maxlen, weights=[embedding_matrix], trainable=False)(inp)
x = Embedding(numWords, 1, input_length=maxlen, embeddings_initializer='glorot_normal')(inp) #The second embedded matrix which is for the learned weights as the model runs
x = dot([r,x], axes=1)
x = Flatten()(x)
x = Activation('sigmoid')(x)
NBSVMModel = Model(inputs=inp, outputs=x)
NBSVMModel.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.001), metrics=['accuracy'])

In [None]:
NBSVMModel.fit(x_train_NBSVM, rawTrain_y,batch_size=32,epochs=3, validation_data=(x_test_NBSVM, rawTest_y))

In [None]:
NBSVM_pred= NBSVMModel.predict(x_test_NBSVM)
confMatrixNBSVM = confusion_matrix(rawTest_y, np.rint(NBSVM_pred))
TP, FP, FN, TN = get_confusion_matrix_values(confMatrixNBSVM)
print("TP: ",TP, "\tFP: ", FP, "\tTN: ", TN, "\tFN: ", FN)

Load data for transformer models. Note: For the below code and models, the intention is to run it on Kaggle with GPU acceleration due to memory constraints.

In [None]:
!pip install -q simpletransformers
from simpletransformers.classification import ClassificationModel
import sklearn
from datasets import load_dataset
import torch
cuda_available = torch.cuda.is_available()
print(cuda_available)
pandas_train = load_dataset('imdb',split='train')
pandas_train.rename_column('label', 'labels')
pandas_train = pd.DataFrame(pandas_train)

pandas_test = load_dataset('imdb',split='test')
pandas_test.rename_column('label', 'labels')
pandas_test = pd.DataFrame(pandas_test)

In [None]:
pandas_train

BERT Model using simpletransformers.

In [None]:
# Define the pretrained BERT transformer model
BERTModel = ClassificationModel('bert', 'bert-base-cased', use_cuda=True, num_labels=2, args={
    'reprocess_input_data': False,
    'overwrite_output_dir': True,
    'sliding_window': True,
    'max_seq_length': 64,
    'num_train_epochs': 1,
    'learning_rate': 0.00001,
    'weight_decay': 0.01,
    'train_batch_size': 128,
    'fp16': True,
    'output_dir': '/outputs/',
}) 

#Train and evaluate the model
BERTModel.train_model(pandas_train)

In [None]:
resultBertTrain, output, badPredictions = BERTModel.eval_model(pandas_train, acc=sklearn.metrics.accuracy_score)
print("Accuracy of BERT model on train: " , resultBertTrain, "%")

In [None]:
resultBertTest, output, badPredictions = BERTModel.eval_model(pandas_test, acc=sklearn.metrics.accuracy_score)
print("Accuracy of BERT model on test: " , resultBertTest, "%")

Getting the attention matrix results. Loaded from files saved previously (when running on Kaggle). We trained using Simpletransformers, but use the weights trained there in a tensorflow model to access its output_attentions feature. So, we have to re-load and clean the data a bit.

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification

tokenizer = AutoTokenizer.from_pretrained("/outputs/")
model = AutoModelForSequenceClassification.from_pretrained("/outputs/", output_attentions=True)

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds

data = tfds.load('imdb_reviews', as_supervised=True)

In [None]:
def clean_data(sentence):
    sentence = sentence.replace("\"","\'").replace("\'","'").replace('\\',"").replace("\\","").replace("xc2x96","").replace("xc2x97","").replace("xc2x85","").replace("xc2x9","").replace("xc2xa","").replace("xc2xa0","").replace("xc2xb4","").replace("xc2x91","").replace("xc2x84","").replace("xc3xa9","").replace("<br />", "")[1:-1]
    return sentence

In [None]:
import csv
def write_attention(filename, tensor):
    # Open the CSV file for writing
    with open(f"/kaggle/working/{filename}", 'w', newline='') as csvfile:
        # Create a CSV writer object
        writer = csv.writer(csvfile)
    
        # Write each row of the tensor as a separate row in the CSV file
        for row in tensor:
            writer.writerow(row.tolist())

In [None]:
train_x, train_y = [], []
test_x, test_y = [], []

train_x_len = {}
test_x_len = {}

for line in data["test"]:
    sentence, val = clean_data(str(line[0].numpy())), int(line[1].numpy())
    length = len(sentence.split())
    if length <= 500:
        test_x.append(sentence)
        test_y.append(val)

for line in data["train"]:
    sentence, val = clean_data(str(line[0].numpy())), int(line[1].numpy())
    length = len(sentence.split())
    if length <= 500:
        train_x.append(sentence)
        train_y.append(val)


test_x1 = test_x[:12500]
test_y1 = test_y[:12500]
train_x1 = train_x[:5000]
train_y1 = train_y[:5000]

In [None]:
Accuracy measuring functions. Can be ignored.

In [None]:
# Process the sentences. This section is just to test accuracy, it can be safely ignored.
outputs = []

for i in range(8000): #set to len(test_x) if you want full set
    sentence = test_x[i]
    inputs = tokenizer(sentence, add_special_tokens=True, return_tensors='pt', padding=True, truncation=True) #sentences[i]
    batch_outputs = model(inputs['input_ids'], attention_mask=inputs['attention_mask'])[0]
    predicted_classes = torch.argmax(batch_outputs, dim=1)
    for value in predicted_classes:
        outputs.append(int(value.numpy()))
    if (i+1)%1000 ==0:
        print(i+1)

print(len(outputs))
print(f"BERT accuracy = {sum(np.where(np.array(outputs) == np.array(test_y),1,0))/len(outputs) * 100}")

In [None]:
i = 0
true_not_done, false_not_done = True, True
for pred, true in zip(np.array(outputs),np.array(test_y)[:5386]):
    if pred == true and true_not_done:
        print(i)
        true_not_done = False
    elif pred != true and false_not_done:
        print(i)
        false_not_done = False
    i += 1
    if false_not_done == False and true_not_done == False:
        break

Try to save the pretrained model. For some reason, doesn't work.

In [None]:
save_dir = "kaggle/working/pretrainedmodel/"
tokenizer.save_pretrained(save_dir)
model.save_pretrained(save_dir)

Now we can get our attention. We test on two sentences, one which is correctly classifies and one which is wrong.

In [None]:
text = 'This was a great movie! Matt Damon was in it, he was terrific.'
tokens = tokenizer.tokenize(text)
print(tokens)
inputs = tokenizer(text, return_tensors="pt")
print(inputs)
outputs = model(**inputs)
predictions = outputs.logits.argmax(dim=1)

# Print the predicted label
print(predictions.item(), 1)

attentions = outputs.attentions
write_attention("attention_Correct_last_layer.csv", attentions[-1][-1][-1])
write_attention("attention_Correct_first_layer.csv", attentions[0][0][0])

def write_tokens(filename, tokensList):
    # Open the CSV file for writing
    with open(f"/kaggle/working/{filename}", 'w', newline='') as csvfile:
        # Create a CSV writer object
        writer = csv.writer(csvfile)
        writer.writerow(tokensList)
        
tokens.append('SEP')
tokens.insert(0, 'CLS')
print(tokens)

write_tokens("correctTokens.csv", tokens)

In [None]:
text = 'This was a wonderful movie! But it has problems. The lighting, props and sets were bad, terrible and horrible. I still like it.'
tokens = tokenizer.tokenize(text)
print(tokens)
inputs = tokenizer(text, return_tensors="pt")
print(inputs)
outputs = model(**inputs)
predictions = outputs.logits.argmax(dim=1)

# Print the predicted label
print(predictions.item(), 1)

attentions = outputs.attentions
write_attention("attention_Wrong_last_layer.csv", attentions[-1][-1][-1])
write_attention("attention_Wrong_first_layer.csv", attentions[0][0][0])

tokens.append('SEP')
tokens.insert(0, 'CLS')

write_tokens("wrongTokens.csv", tokens)

Since we had to run some parts on kaggle, we load from files (we just consoliated everything into this one notebook for simplicity). So this is not necessarily necessary if all running in one file.

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

tokens_correct = pd.read_csv("/kaggle/input/imdb-attention-max-bert/correctTokens.csv")
correct_tokens = tokens_correct.columns.values.tolist()
tokens_wrong = pd.read_csv("/kaggle/input/imdb-attention-max-bert/wrongTokens.csv")
wrong_tokens = tokens_wrong.columns.values.tolist()
print(correct_tokens)
print(wrong_tokens)

In [None]:
filename_attention_correct_fl = "/kaggle/input/imdb-attention-max-bert/attention_Correct_first_layer.csv"
df_correct_fl = pd.read_csv(filename_attention_correct_fl)

plt.subplots(figsize=(8, 6))
array_attention_correct_fl = df_correct_fl.to_numpy()
s1 = sns.heatmap(array_attention_correct_fl)
s1.set_xticks(np.arange(len(correct_tokens)), labels = correct_tokens)
s1.set_yticks(np.arange(len(correct_tokens)), labels = correct_tokens)
plt.title("First Layer Attention Matrix of Correctly Classified Sentence")
plt.xticks(rotation=90) 
plt.yticks(rotation=0) 
plt.show()

In [None]:
filename_attention_correct_ll = "/kaggle/input/imdb-attention-max-bert/attention_Correct_last_layer.csv"
df_correct_ll = pd.read_csv(filename_attention_correct_ll)

plt.subplots(figsize=(8, 6))
array_attention_correct_ll = df_correct_ll.to_numpy()
s2 = sns.heatmap(array_attention_correct_ll)
s2.set_xticks(np.arange(len(correct_tokens)), labels = correct_tokens)
s2.set_yticks(np.arange(len(correct_tokens)), labels = correct_tokens)
plt.title("Last Layer Attention Matrix of Correctly Classified Sentence")
plt.xticks(rotation=90) 
plt.yticks(rotation=0) 
plt.show()

In [None]:
filename_attention_wrong_fl = "/kaggle/input/imdb-attention-max-bert/attention_Wrong_first_layer.csv"
df_wrong_fl = pd.read_csv(filename_attention_wrong_fl)
array_attention_wrong_fl = df_wrong_fl.to_numpy()

plt.subplots(figsize=(8, 6))
s3 = sns.heatmap(array_attention_wrong_fl)
s3.set_xticks(np.arange(len(wrong_tokens)), labels = wrong_tokens)
s3.set_yticks(np.arange(len(wrong_tokens)), labels = wrong_tokens)
plt.title("First Layer Attention Matrix of Incorrect Classified Sentence")
plt.xticks(rotation=90) 
plt.yticks(rotation=0) 
plt.show()

In [None]:
filename_attention_wrong_ll = "/kaggle/input/imdb-attention-max-bert/attention_Wrong_last_layer.csv"
df_wrong_ll = pd.read_csv(filename_attention_wrong_ll)
array_attention_wrong_ll = df_wrong_ll.to_numpy()

plt.subplots(figsize=(8, 6))
s4 = sns.heatmap(array_attention_wrong_ll)
s4.set_xticks(np.arange(len(wrong_tokens)), labels = wrong_tokens)
s4.set_yticks(np.arange(len(wrong_tokens)), labels = wrong_tokens)
plt.title("Last Layer Attention Matrix of Incorrect Classified Sentence")
plt.xticks(rotation=90) 
plt.yticks(rotation=0) 
plt.show()

In [None]:
flights = sns.load_dataset("flights")
flights = flights.pivot("month", "year", "passengers")
f,(ax1,ax2,ax3, axcb) = plt.subplots(,2, 
            gridspec_kw={'width_ratios':[1,1,1,0.08]})
ax1.get_shared_y_axes().join(ax2,ax3)
g1 = sns.heatmap(array_attention_correct_fl,cbar=False,ax=ax1)
g1.set_ylabel('')
g1.set_xlabel('')
g2 = sns.heatmap(array_attention_correct_fl)
g2.set_ylabel('')
g2.set_xlabel('')
g2.set_yticks([])
g3 = sns.heatmap(array_attention_correct_fl, cbar=False,ax=ax2))
g3.set_ylabel('')
g3.set_xlabel('')
g3.set_yticks([])

# may be needed to rotate the ticklabels correctly:
for ax in [g1,g2,g3]:
    tl = ax.get_xticklabels()
    ax.set_xticklabels(tl, rotation=90)
    tly = ax.get_yticklabels()
    ax.set_yticklabels(tly, rotation=0)

In [None]:
f,(ax1,ax2) = plt.subplots(1,2,figsize=(11, 9),layout='constrained',
            gridspec_kw={'width_ratios':[1,1]})
f.suptitle("Attention Matrices of Correctly Classified Sentence", y=0.8)
ax1.get_shared_y_axes().join(ax2)
g1 = sns.heatmap(array_attention_correct_fl,cbar=False,ax=ax1)
g1.set_title('First Layer')
g1.set_ylabel('')
g1.set_xlabel('')
g1.set_yticks(np.arange(len(correct_tokens)), labels = correct_tokens, rotation=0)
g2 = sns.heatmap(array_attention_correct_ll,ax=ax2, cbar=True, cbar_kws={"shrink": .5})
g2.set_title('Last Layer')
g2.set_ylabel('')
g2.set_xlabel('')
g2.set_yticks([])

# may be needed to rotate the ticklabels correctly:
for ax in [g1,g2]:
    ax.set_xticks(np.arange(len(correct_tokens)), labels = correct_tokens, rotation=90)
    ax.set_aspect('equal')
    
plt.show()

In [None]:
f,(ax1,ax2) = plt.subplots(1,2, figsize=(11, 9),layout='constrained',
            gridspec_kw={'width_ratios':[1,1]})
f.suptitle("Attention Matrices of Incorrectly Classified Sentence", y=0.8)
ax1.get_shared_y_axes().join(ax2)
g1 = sns.heatmap(array_attention_wrong_fl,cbar=False,ax=ax1)
g1.set_title('First Layer')
g1.set_ylabel('')
g1.set_xlabel('')
g1.set_yticks(np.arange(len(wrong_tokens)), labels = wrong_tokens, rotation=0)
g2 = sns.heatmap(array_attention_wrong_ll,ax=ax2, cbar=True, cbar_kws={"shrink": .5})
g2.set_title('Last Layer')
g2.set_ylabel('')
g2.set_xlabel('')
g2.set_yticks([])

# may be needed to rotate the ticklabels correctly:
for ax in [g1,g2]:
    ax.set_xticks(np.arange(len(wrong_tokens)), labels = wrong_tokens, rotation=90)
    ax.set_aspect('equal')
    
plt.show()

ALBERT Model (also using simpletransformers)

In [None]:
# Define the pretrained ALBERT transformer model
ALBERTModel = ClassificationModel('albert', 'albert-base-v2', use_cuda=True, num_labels=2, args={
    'reprocess_input_data': False,
    'overwrite_output_dir': True,
    'sliding_window': True,
    'max_seq_length': 64,
    'num_train_epochs': 1,
    'learning_rate': 0.00001,
    'weight_decay': 0.01,
    'train_batch_size': 128,
    'fp16': True,
    'output_dir': '/outputs/',
}) 

#Train and evalutae the model
ALBERTModel.train_model(pandas_train)

In [None]:
ALBERTresult, ALBERToutput, ALBERTbadPredictions = ALBERTModel.eval_model(pandas_train, acc=sklearn.metrics.accuracy_score)
print("Accuracy of ALBERT model on test: " , ALBERTresult, "%")

In [None]:
ALBERTresult, ALBERToutput, ALBERTbadPredictions = ALBERTModel.eval_model(pandas_test, acc=sklearn.metrics.accuracy_score)
print("Accuracy of ALBERT model on test: " , ALBERTresult, "%")

XLNet Model (also using simpletransformers)

In [None]:
# Define the pretrained XLNet transformer model
XLNetModel = ClassificationModel('xlnet', 'xlnet-base-cased', use_cuda=True, num_labels=2, args={
    'reprocess_input_data': False,
    'overwrite_output_dir': True,
    'sliding_window': True,
    'max_seq_length': 64,
    'num_train_epochs': 1,
    'learning_rate': 0.00001,
    'weight_decay': 0.01,
    'train_batch_size': 128,
    'fp16': True,
    'output_dir': '/outputs/',
    'output_attention' : True
}) 

#Train and evaluate the model
XLNetModel.train_model(pandas_train)

In [None]:
XLNetResult, XLNetOutput, XLNetBadPredictions = XLNetModel.eval_model(pandas_train, acc=sklearn.metrics.accuracy_score)
print("Accuracy of XLNet model: " , XLNetResult, "%")

In [None]:
XLNetResult, XLNetOutput, XLNetBadPredictions = XLNetModel.eval_model(pandas_test, acc=sklearn.metrics.accuracy_score)
print("Accuracy of XLNet model: " , XLNetResult, "%")

MLP model

In [None]:
# Imports
import pickle 
import sys
import os
from os import listdir
from os.path import isfile, join
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import tensorflow as tf 
import tensorflow.keras as k
from tqdm import tqdm

In [None]:
import tensorflow as tf
import tensorflow_datasets as tfds

data = tfds.load('imdb_reviews', as_supervised=True)

In [None]:
def clean_data(sentence):
    sentence = sentence.replace("\"","\'").replace("\'","'").replace('\\',"").replace("\\","").replace("xc2x96","").replace("xc2x97","").replace("xc2x85","").replace("xc2x9","").replace("xc2xa","").replace("xc2xa0","").replace("xc2xb4","").replace("xc2x91","").replace("xc2x84","").replace("xc3xa9","").replace("<br />", "")[1:-1]
    return sentence

In [None]:
train_x, train_y = [], []
test_x, test_y = [], []

train_x_len = {}
test_x_len = {}

for line in data["test"]:
    sentence, val = clean_data(str(line[0].numpy())), int(line[1].numpy())
    length = len(sentence.split())
    if length <= 500:
        test_x.append(sentence)
        test_y.append(val)

for line in data["train"]:
    sentence, val = clean_data(str(line[0].numpy())), int(line[1].numpy())
    length = len(sentence.split())
    if length <= 500:
        train_x.append(sentence)
        train_y.append(val)


test_x1 = test_x[:15000]
test_y1 = test_y[:150000]
train_x1 = train_x[:15000]
train_y1 = train_y[:15000]

In [None]:
test_len = len(test_x1)
test_len

In [None]:
from sklearn.feature_extraction.text import CountVectorizer

In [None]:
vectorizer = CountVectorizer()

values = vectorizer.fit_transform(test_x1+train_x1)
values = values.toarray()

In [None]:
input_layer_len = len(values[0])
input_layer_len

In [None]:
def evaluate_accuracy(true_y, pred_y):
    return sum(np.where(true_y == pred_y, 1, 0))/ true_y.shape[0]

In [None]:
class LinearLayer():
    def __init__(self,x_size, y_size, reg=None, reg_strength=0.01):
        self.w = np.random.normal(0 , 0.001, size = (y_size, x_size))
        self.b = np.random.normal(0 , 0.001, size = (y_size,1))
        self.dw = None
        self.db = None
        self.reg = reg
        self.reg_strength = reg_strength
        self.input = None    

    def forward(self, x):
        self.input = x
        return (self.w @ x.T + self.b).T

    def backward(self, gradient):
        self.dw = (self.input.T @ gradient).T 
        self.db = np.sum(gradient)
        
        # Regularization
        if self.reg == "L1":
            self.dw += self.reg_strength * np.abs(self.w)
        elif self.reg == "L2":
            self.dw += self.reg_strength * np.square(self.w)
        
        return gradient.dot(self.w)

        
class ReLULayer():
    def __init__(self):
        self.gradient = None

    def forward(self, x):
        self.gradient = np.where(x > 0, 1.0, 0.0)
        return  np.maximum(0, x)

    def backward(self, gradient):
        return gradient * self.gradient
    

class LeakyReLULayer():
    def __init__(self, alpha=0.01):
        self.alpha = alpha
        self.gradient = None

    def forward(self, x):
        self.gradient = np.where(x > 0, 1.0, self.alpha)
        return  np.where(x > 0, x, x * self.alpha)

    def backward(self, gradient):
        return gradient * self.gradient
    
class TanHLayer():
    def __init__(self):
        self.gradient = None

    def forward(self, x):
        self.gradient = 1 - np.tanh(x)**2
        return np.tanh(df)

    def backward(self, gradient):
        return gradient * self.gradient
    
    
class SoftmaxLayer():
    def __init__(self):
        self.probabilities = None

    def forward(self, x):
        exps = np.exp(x)
        probs = exps / np.sum(exps, axis=-1)[:, None]
        self.probabilities = probs
        return probs

    def backward(self, target):
        return self.probabilities - target
    

def activation_func(f):
        if f == "Relu":
            return ReLULayer()
        elif f == "LeakyReLU":
            return LeakyReLULayer()
        elif f == "Tanh":
            return TanHLayer()
        
class MLP:
    def __init__(self, functions, layer_units, learning_rate, reg=None, reg_strength=0.01):
        self.layers = []
        self.learning_rate = learning_rate
        if len(layer_units) != 0:
            self.layers.append(LinearLayer(input_layer_len, layer_units[0],reg, reg_strength))
            self.layers.append(activation_func(functions[0]))
            for i in range(0,len(layer_units)-1):
                self.layers.append(LinearLayer(layer_units[i], layer_units[i+1],reg, reg_strength))
                self.layers.append(activation_func(functions[i+1]))
            self.layers.append(LinearLayer(layer_units[-1], 2 ,reg, reg_strength))
            self.layers.append(SoftmaxLayer())

        self.loss_array = []
        self.accuracy_array = []

    def forward(self, x):
        for layer in self.layers:
            x = layer.forward(x)
        return x

    def backward(self, target):
        for layer in self.layers[::-1]:
            target = layer.backward(target)

 
    def fit(self, data_x, data_y, iterations):
        labels = np.eye(2)[np.array(data_y)]
        x = np.split(data_x, 200)
        y = np.split(labels, 200)
        n = 0
        opt = GradientDescentOptimizer(self)
        for i in tqdm(range(iterations)):
            predictions = self.forward(x[n])
            loss = -(y[n] * np.log(predictions)).sum(axis=-1).mean()
            self.loss_array.append(loss)
            self.backward(y[n])
            opt.update()
            
            n=n+1
            if n == 200:
                n = 0
        
    def predict(self,data_x):
        predictions = self.forward(data_x)
        return np.argmax(predictions, axis = 1)

        
class GradientDescentOptimizer:
    def __init__(self, mlp):
        self.mlp = mlp
        
    def update(self):
        for i in range(len(self.mlp.layers)-2,-1,-2):    
            self.mlp.layers[i].b -= self.mlp.learning_rate * self.mlp.layers[i].db          
            self.mlp.layers[i].w -= self.mlp.learning_rate * self.mlp.layers[i].dw

In [None]:
print(len(values[test_len:]), len(train_y1))

In [None]:
mlp = MLP(["Relu", "Relu"], [6860, 256], 0.00001)
mlp.fit(values[test_len:], train_y1, 1000)

In [None]:
predictions = mlp.predict(values[:10000])
print(predictions)

In [None]:
print(np.sum(predictions == train_y1[:10000])/len(train_y1[:10000]) * 100)