## Part 1: Parsing the dataset

In [141]:
import os.path
import tarfile
from urllib.request import urlretrieve

from spyder.utils.snippets.lexer import tokenize

url = """http://www.cs.cornell.edu/people/pabo/movie-review-data/review_polarity.tar.gz"""
if not os.path.exists(url.split("/")[-1]):
  urlretrieve(url, url.split("/")[-1])
  print("Downloaded", url)
  with tarfile.open('review_polarity.tar.gz', 'r:gz') as tar:
    tar.extractall()
  print("Extracted archive")

In [142]:
import os
import numpy as np

In [143]:
# takes a path and returns a list with the content of all files at that path
def get_reviews(path):
    filenames_lst = os.listdir(path)
    reviews_lst = []
    for i in range(len(filenames_lst)):
        path_full = os.path.join(path, filenames_lst[i])
        with open(path_full, 'r') as f:
            reviews_lst.append(f.read())
    return reviews_lst

folder_name = 'txt_sentoken'
neg_path = os.path.join(folder_name, 'neg')
pos_path = os.path.join(folder_name, 'pos')
neg_reviews = get_reviews(neg_path)
pos_reviews = get_reviews(pos_path)

In [144]:
# check that both lists are the correct length
assert len(neg_reviews) == 1000
assert len(neg_reviews) == len(pos_reviews)

In [145]:
# create the full list
X_full = neg_reviews + pos_reviews

# create the training list and testing list
split_point = int(len(neg_reviews)*0.8)
X_train = neg_reviews[:split_point] + pos_reviews[:split_point]
X_test = neg_reviews[split_point:] + pos_reviews[split_point:]

# create the corresponding outcome lists
y_full = [-1 for i in range(int(len(X_full)/2))] + [1 for i in range(int(len(X_full)/2))]
y_train = [-1 for i in range(split_point)] + [1 for i in range(split_point)]
y_test = [-1 for i in range(int(len(X_test)/2))] + [1 for i in range(int(len(X_test)/2))]

In [146]:
# check the train and test lists are the correct lengths
assert len(X_train) == len(y_train)
assert len(X_test) == len(y_test)

# check the train and test lists are the correct types / values
assert np.all([isinstance(x, str) for x in X_train])
assert np.all([isinstance(x, str) for x in X_test])
assert len(np.unique(y_train))==2
assert min(y_train) == -1
assert max(y_train) == 1
assert len(np.unique(y_test))==2
assert min(y_test) == -1
assert max(y_test) == 1

## Part 2: Feature extraction

In [147]:
class Vectorizer:

    # the instance will only store the vocabulary once the method fit has been used
    def __init__(self):
        self.vocabulary = None

    @staticmethod
    def tokenize(txt):
        return [token for line in txt.split("\n") for token in line.split()]

    # takes a list of texts and creates the vocabulary (list of unique tokens)
    def make_vocabulary(self, data):
        """ not efficient: many tokens appear in many reviews so the 2nd for loop will iterate over the same token many times """
        vocab = []
        for review in data:
            for token in self.tokenize(review):
                if token not in vocab:
                    vocab.append(token)
        self.vocabulary = vocab

    # takes a list of texts and returns its corresponding bag-of-words vector, according to the vocabulary
    def transform_to_bow(self, data_list):
        matrix_size = (len(data_list), len(self.vocabulary))
        X = np.zeros(matrix_size)
        for i, txt in enumerate(data_list):
            for token in self.tokenize(txt):
                X[i, self.vocabulary.index(token)] = 1
        return X

In [148]:
vectorizer = Vectorizer()

# create the vocabulary
""" both train and test sets are included in the vocabulary, to avoid unseen words later """
vectorizer.make_vocabulary(X_full)

# turns the lists into bag_of_words matrices
X_train = vectorizer.transform_to_bow(X_train)
X_test = vectorizer.transform_to_bow(X_test)

# transform the y lists into np arrays for an easier time down the line
y_train = np.array(y_train)
y_test = np.array(y_test)

In [149]:
# get the vocabulary
ordered_vocabulary = vectorizer.vocabulary

In [150]:
assert X_test.shape[1] == X_train.shape[1]

## Part 3: Learning framework

In [180]:
class Classifier:
    def __init__(self, vocabulary, n_max_iter=10, regularizer_dampening=0.001, learning_rate=0.1):
        self.param_vector = np.random.normal(size=len(vocabulary)+1)
        self.n_max_iter = n_max_iter
        self.lambduh = regularizer_dampening
        self.gamma = learning_rate
        self.loss = []

    def _score(self, X):
        z = np.matmul(X, self.param_vector)
        return z

    def _predict(self, X):
        z = self._score(X)
        y_hat = np.sign(z).reshape((z.shape[0],1))
        return y_hat

    def _loss(self, X, y):
        """ l1 or l2? loss sum instead of loss average? intercept also regularised? """
        z = self._score(X)
        loss_vector = np.maximum(0,1-np.multiply(y,z)) # hinge function
        regularization = self.lambduh * np.sum(np.absolute(self.param_vector))
        loss = regularization + np.sum(loss_vector)
        self.loss.append(loss)
        return loss

    def _gradient(self, X, y):
        v1 = np.multiply(y, self._score(X))
        index = v1 < 1
        v = np.zeros(X.shape)
        v[index] = -y[index].reshape((-1,1)) * X[index]
        return self.lambduh * np.sign(self.param_vector) + np.sum(v, axis=0)

    def fit(self, X, y):
        X = np.insert(X, 0, 1, 1) # add the pseudo-input
        i = 0
        while i < self.n_max_iter:
            self._loss(X, y)
            self.param_vector = self.param_vector - self.gamma * self._gradient(X, y)
            i += 1
        return self

In [181]:
model = Classifier(ordered_vocabulary)
model.fit(X_train, y_train)

<__main__.Classifier at 0x169f91550>