In [1]:
import os
import pandas as pd
from nltk import pos_tag, word_tokenize
import numpy as np
from sklearn.svm import SVC

from IPython.core.debugger import set_trace

In [2]:
negative_reviews_train = []
to_search = './Datasets/aclImdb/train/neg/'
for f in os.listdir(to_search):
    path = to_search + f
    with open(path, 'r', encoding='latin-1') as fin:
            negative_reviews_train.append(fin.read().strip())
            
positive_reviews_train = []
to_search = './Datasets/aclImdb/train/pos/'
for f in os.listdir(to_search):
    path = to_search + f
    with open(path, 'r', encoding='latin-1') as fin:
            positive_reviews_train.append(fin.read().strip())
            
negative_reviews_test = []
to_search = './Datasets/aclImdb/test/neg/'
for f in os.listdir(to_search):
    path = to_search + f
    with open(path, 'r', encoding='latin-1') as fin:
            negative_reviews_test.append(fin.read().strip())
            
positive_reviews_test= []
to_search = './Datasets/aclImdb/test/pos/'
for f in os.listdir(to_search):
    path = to_search + f
    with open(path, 'r', encoding='latin-1') as fin:
            positive_reviews_test.append(fin.read().strip())

In [3]:
len(negative_reviews_train)

12500

In [11]:
def tokenise_document(review, add_not_tag=False, unigram=False, bigram=False, pos=False, position=False):
    """
    Given a review (string) we return a list of tokens
    """
    
    review_split = word_tokenize(review)
    tokens = []
    
    if add_not_tag:
        negation_word = {'not': True, "isn't": True, "doesn't": True, "wasn't":True, "couldn't": True, "wouldn't": True, 
                         "didn't": True}
        punctuation = {'?': True, '!': True, '.': True, ',': True, ':': True, ';':True}

        convert_word = False
        for word in review_split:
            if word in punctuation:
                convert_word = False
                tokens.append(word)
                continue

            if convert_word:
                tokens.append('NOT_'+word)
                continue

            if word in negation_word:
                convert_word = True
                tokens.append(word)
                continue
            tokens.append(word)
                
                
    if unigram:
        tokens.extend(review_split)
        
        
    if pos:
        tags = pos_tag(review_split) # tags is a list of tuples: [(token, pos tag)]
        # Append the POS tag to each word in 'tokens' to form a string. E.g. 'Peter' and 'NN' becomes 'Peter_NN'
        # tokens have NOT tag added to them
        tokens = [token+'_'+tag[1] for token, tag in zip(tokens, tags)]

        
    if bigram:
        # Since one of the above two conditions will be fulfilled, 'tokens' will always have entries
        for index in range(len(tokens) - 1):
            word_1 = tokens[index]
            word_2 = tokens[index + 1]
            tokens[index] = word_1 + ' ' + word_2
        tokens.pop()  # Take the last unigram word at end of tokens list

        
    if position:
        # 'tokens' will already have been filled by this point by either 'unigram' or 'add_not_tag'
        first_quarter_end = np.ceil(len(tokens) * 0.25)
        middle_end = np.ceil(len(tokens) * 0.75)
        # last_quarter = len(tokens)
        
        for index in range(len(tokens)):
            if index < first_quarter_end:
                to_append = 1
            elif index < middle_end:
                to_append = 2
            else:
                to_append = 3
            tokens[index] = tokens[index]+'_'+str(to_append)
        
        
    return tokens

In [16]:
def get_vocabulary(add_not_tag=False, unigram=False, bigram=False, pos=False, adjectives=False, position=False, length=16162):
    neg_flatlist = negative_reviews_train + negative_reviews_test
    pos_flatlist = positive_reviews_train + positive_reviews_test
    data = neg_flatlist + pos_flatlist  # We have a list of textual reviews (strings)

    freq = {}  # Since we are using dict, vocabulary will be unique
    for review in data:
        review_tokens = tokenise_document(review, unigram=unigram, add_not_tag=add_not_tag, bigram=bigram)

        for token in review_tokens:
            if token in freq:
                freq[token] += 1
            else:
                freq[token] = 1
                
    if length is None:
        cutoff = 7 if bigram else 4
        vocabulary = set([token for token, count in freq.items() if count >= cutoff])
    else:
        sorted_freq = sorted([(count, token) for token, count in freq.items()], reverse=True)
        vocabulary = set([token for _, token in sorted_freq[:length]])  # We use set to exploit O(1) lookup time
    
    
    if pos or position:
#         print("Entering POS/Position")
        vocabulary_pos = []
        for i, review in enumerate(data):
            
#             if i % 100 == 0:
#                 print(i)
            
            unigram_tokens = np.array(tokenise_document(review, unigram=unigram, add_not_tag=add_not_tag))
            pos_tokens = np.array(tokenise_document(review, unigram=unigram, add_not_tag=add_not_tag, pos=pos, position=position))
            mask = list(map(lambda token: token in vocabulary, unigram_tokens))
            vocabulary_pos.extend(list(pos_tokens[mask]))
        
        freq = {}
        for token in vocabulary_pos:
            if token in freq:
                freq[token] += 1
            else:
                freq[token] = 1

        if length is None:
            vocabulary = set([token for token, count in freq.items() if count >= 4])
        else:
            sorted_freq = sorted([(count, token) for token, count in freq.items()], reverse=True)
            vocabulary = set([token for _, token in sorted_freq[:length]])
            
        
    if adjectives:
        # Filter the 'sored_freq' list to only contains words with tags that end in JJ/JJR/JJS
        freq = {}
        for review in data:
            review_tokens = tokenise_document(review, unigram=unigram, add_not_tag=add_not_tag, pos=True)

            for pos_token in review_tokens:
                
                if pos_token.endswith('JJ'):
                    token = pos_token[:-3]
                elif pos_token.endswith('JJR'):
                    token = pos_token[:-4]
                elif pos_token.endswith('JJS'):
                    token = pos_token[:-4]
                
                if token in freq:
                    freq[token] += 1
                else:
                    freq[token] = 1

        sorted_freq = sorted([(count, token) for token, count in freq.items()], reverse=True)
        vocabulary = set([token for _, token in sorted_freq[:length]])
    
    return vocabulary

In [22]:
class NaiveBayes:
    
    def __init__(self, vocabulary, tokenisation):
        self.vocabulary = vocabulary
        self.tokenisation = tokenisation

        
    def get_document_vector(self, document_tokens):
        freq = {v:0 for v in self.vocabulary}
        for word in document_tokens:
            if word in self.vocabulary:
                freq[word] = 1
        return freq
    
    
    def get_class_probabilities(self, class_data):
        """
        class_data: A list containing all the tokens in a training data document. P(f_i|c) from training documents
        """
        # We start with the add-one smoothing
        log_prob_dict = {v:1 for v in self.vocabulary}  # We encode the add one smoothing count at start
        
        total_class_tokens = 0  # Tokens contained in vocabulary
        for word in class_data:
            if word in self.vocabulary:
                log_prob_dict[word] += 1
                total_class_tokens += 1
                
        # We divide by the denominator and log the values
        for key in log_prob_dict.keys():
            log_prob_dict[key] = np.log(log_prob_dict[key] / (total_class_tokens + len(self.vocabulary)))

        return log_prob_dict

    
    def train_probabilities(self):
        neg_train_tokens = []
        for neg_review in negative_reviews_train:
            neg_review = self.tokenisation(neg_review)
            neg_train_tokens.extend(neg_review)
        negative_log_probs = self.get_class_probabilities(neg_train_tokens)
        
        pos_train_tokens = []
        for pos_review in positive_reviews_train:
            pos_review = self.tokenisation(pos_review)
            pos_train_tokens.extend(pos_review)
        positive_log_probs = self.get_class_probabilities(pos_train_tokens)
    
        return negative_log_probs, positive_log_probs
    
    
    def test_documents(self, negative_log_probs, positive_log_probs):
        correct = 0
        for i, pos_review in enumerate(positive_reviews_test):
            
            if i%1000 == 0:
                print(i)
            
            pos_review = self.tokenisation(pos_review)
            document_vector = self.get_document_vector(pos_review)
            
            neg_sum = 0   # For negative class
            pos_sum = 0   # For positive class
            for word in document_vector.keys():
                neg_sum += document_vector[word] * negative_log_probs[word]
                pos_sum += document_vector[word] * positive_log_probs[word]

            if pos_sum > neg_sum:
                correct += 1
                
        print("Calculated test positives")

        for i, neg_review in enumerate(negative_reviews_test):
            
            if i%1000 == 0:
                print(i)
            
            neg_review = self.tokenisation(neg_review)
            document_vector = self.get_document_vector(neg_review)
            
            neg_sum = 0   # For negative class
            pos_sum = 0   # For positive class
            for word in document_vector.keys():
                neg_sum += document_vector[word] * negative_log_probs[word]
                pos_sum += document_vector[word] * positive_log_probs[word]
                
            if neg_sum > pos_sum:
                correct += 1
        
        print("Calculated test negatives")

        return correct / (len(positive_reviews_test) + len(negative_reviews_test))
    
    
    def get_statistics(self):
        neg_log_prob, pos_log_prob = self.train_probabilities()
        print("Calculated train probabilities")
        accuracy = self.test_documents(neg_log_prob, pos_log_prob)
        return accuracy

In [None]:
vocabulary = get_vocabulary(add_not_tag=True)
print(len(vocabulary))
tokenisation = lambda x: tokenise_document(x, add_not_tag=True)

In [24]:
NaiveBayes(vocabulary, tokenisation).get_statistics()

Calculated train probabilities
0
1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
Calculated test positives
0
1000
2000
3000
4000
5000
6000
7000
8000
9000
10000
11000
12000
Calculated test negatives


0.83112

In [27]:
class SVM:
    
    def __init__(self, vocabulary, tokenisation):
        self.dimensions = sorted(list(vocabulary))
        self.tokenisation = tokenisation
        
    
    def get_feature_vector(self, review_tokens):
        review_tokens = set(review_tokens)
        feature_vector = []
        for word in self.dimensions:
            if word in review_tokens:
                feature_vector.append(1)
            else:
                feature_vector.append(0)
        xs = np.array(feature_vector)

        # Normalisation
        denom = np.linalg.norm(xs)

        return xs / denom
    
    
    def get_train_test_data(self):
        train_xs = []
        train_ys = []

        test_xs = []
        test_ys = []

        for neg_review in negative_reviews_train:
            neg_review_tokens = self.tokenisation(neg_review)
            xs = self.get_feature_vector(neg_review_tokens)
            train_xs.append(xs)
            train_ys.append(-1)  # Label -1 is for negative sentiment
            
        print("DONE 1")

        for pos_review in positive_reviews_train:
            pos_review_tokens = self.tokenisation(pos_review)
            xs = self.get_feature_vector(pos_review_tokens)
            train_xs.append(xs)
            train_ys.append(1)  # Label 1 for positive sentiment
            
        print("DONE 2")

        for pos_review in positive_reviews_test:
            pos_review_tokens = self.tokenisation(pos_review)
            xs = self.get_feature_vector(pos_review_tokens)
            test_xs.append(xs)
            test_ys.append(1)  # Label 1 for positive sentiment
            
        print("DONE 3")

        for neg_review in negative_reviews_test:
            neg_review_tokens = self.tokenisation(neg_review)
            xs = self.get_feature_vector(neg_review_tokens)
            test_xs.append(xs)
            test_ys.append(-1)  # Label -1 for negatuve sentiment
            
        print("DONE 4")

        return train_xs, train_ys, test_xs, test_ys
    
    
    def get_statistics(self):
        classifier = SVC(kernel='linear')
        train_xs, train_ys, test_xs, test_ys = self.get_train_test_data()
        print("Gotten train/test split")
        classifier.fit(train_xs, train_ys)
        print("Fitted data")
        accuracy = classifier.score(test_xs, test_ys)

        return accuracy
    
    
    def use_svm_light(self, train_xs, train_ys, test_xs, test_ys):
        with open('./train.txt', 'w') as fout:
            for vector, label in zip(train_xs, train_ys):
                vector_ls = [str(label)]
                for index, value in enumerate(vector):
                    # Model needs feature numbers to start from 1
                    vector_ls.append(str(index+1)+':'+str(value))
                # NEED NEWLINE CHARACTER AT END. PYTHON AUTOMATICALLY CONVERTS THIS TO APPROPRIATE ENDING
                line = ' '.join(vector_ls)+'\n'
                fout.write(line)

        with open('./test.txt', 'w') as fout:
            for vector, label in zip(test_xs, test_ys):
                vector_ls = [str(label)]
                for index, value in enumerate(vector):
                    vector_ls.append(str(index+1)+':'+str(value))
                line = ' '.join(vector_ls)+'\n'
                fout.write(line)

    
    def get_statistics_svmlight(self):
        train_xs, train_ys, test_xs, test_ys = self.get_train_test_data()
        self.use_svm_light(train_xs, train_ys, test_xs, test_ys)
        # Manually call from bash after writing training and testing document using 
        # ./svm_learn train.txt model.txt
        # ./svm_classify test.txt model.txt output.txt

In [None]:
# We use same vocabulary as NB above, so no need to recalculate
SVM(vocabulary, tokenisation).get_statistics()

DONE 1
DONE 2
DONE 3
DONE 4
Gotten train/test split
