In [128]:
import unittest
import re
import os
import numpy as np

from fractions import Fraction
from io import StringIO

from sklearn import svm
from scipy.sparse import csr_matrix, vstack

In [None]:
class TestCorpusSet(unittest.TestCase):
    def setUp(self):
        self.negative = StringIO('I hated that so much')
        self.negative_corpus = Corpus(self.negative, 'negative')
        self.positive = StringIO('loved movie!! loved')
        self.positive_corpus = Corpus(self.positive, 'positive')
    
    def test_trivial(self):
        """Consumes multiple files and turns it into sparse vector."""
        self.assertEqual('negative', self.negative_corpus.sentiment)
        
    def test_tokenize(self):
        """Downcases all the word tokens."""
        self.assertListEqual(['quick', 'brown', 'fox'],
                             Corpus.tokenize('Quick Brown Fox'))
    
    def test_tokenize2(self):
        """Ignores all stop symbols."""
        self.assertListEqual(['hello'], 
                             Corpus.tokenize('"\'hello!?!?!.\'"   '))
    
    def test_tokenize3(self):
        """Ignores all unicode space."""
        self.assertListEqual(['hello', 'bob'], 
                             Corpus.tokenize(u'hello\u00A0bob'))
    
    def test_positives(self):
        """Consumes a positive training set."""
        self.assertEqual('positive', 
                         self.positive_corpus.sentiment)
    
    def test_words(self):
        """Consumes a positive training set and unique set of words."""
        self.assertEqual({'loved', 'movie'}, 
                         self.positive_corpus.get_words())
    
    def test_sentiment_code_1(self):
        """Defines a sentiment_code of 1 for positives."""
        self.assertEqual(1, Corpus(StringIO(''), 'positive').sentiment_code)
    
    def test_sentiment_code_0(self):
        """Defines a sentiment code of -1 for negatives."""
        self.assertEqual(-1, Corpus(StringIO(''), 'negative').sentiment_code)

In [None]:
class Corpus:
    skip_regex = re.compile(r'[\'"\.\?\!]+')
    space_regex = re.compile(r'\s', re.UNICODE)
    stop_words = []
    stop_words = [x.strip() for x in open('./data/support_vector_machines/stopwords.txt').readlines()]
    sentiment_to_number = {'positive': 1, 'negative': -1}
    
    @classmethod
    def tokenize(cls, text):
        cleared_text = cls.skip_regex.sub('', text)
        parts = cls.space_regex.split(cleared_text)
        parts = [part.lower() for part in parts]
        return [p for p in parts if len(p) > 0 and p not in cls.stop_words]
    
    def __init__(self, io, sentiment):
#         with open('./data/support_vector_machines/stopwords.txt') as f:
#             self.stop_words = [x.strip() for x in f.readlines()]
#         print(self.stop_words)
        self._io = io
        self._sentiment = sentiment
        self._words = None
    
    @property
    def sentiment(self):
        return self._sentiment

    @property
    def sentiment_code(self):
        return self.sentiment_to_number[self._sentiment]
    
    def get_words(self):
        if self._words is None:
            self._words = set()
            for line in self._io:
                for word in Corpus.tokenize(line):
                    self._words.add(word)
            self._io.seek(0)
        return self._words
    
    def get_sentences(self):
        for line in self._io:
            yield line

In [None]:
class TestCorpusSet(unittest.TestCase):
    def setUp(self):
        self.positive = StringIO('I love this country')
        self.negative = StringIO('I hate this man')
        
        self.positive_corp = Corpus(self.positive, 'positive')
        self.negative_corp = Corpus(self.negative, 'negative')
        
        self.corpus_set = CorpusSet([self.positive_corp, self.negative_corp])
        
    def test_compose(self):
        """Composes two corpuses together."""
        self.assertEqual({'love', 'country', 'hate', 'man'},
                         self.corpus_set.words)
    
    def test_sparse(self):
        """Returns a set of sparse vectors to train on."""
        expected_ys = [1, -1]
        expected_xes = csr_matrix(np.array([
            [1, 1, 0, 0],
            [0, 0, 1, 1]
        ]))
        
        self.corpus_set.calculate_sparse_vectors()
        ys = self.corpus_set.yes
        xes = self.corpus_set.xes
        
        self.assertListEqual(expected_ys, ys)
        self.assertListEqual(list(expected_xes.data),
                             list(xes.data))
        self.assertListEqual(list(expected_xes.indices),
                             list(xes.indices))
        self.assertListEqual(list(expected_xes.indptr),
                             list(xes.indptr))

In [None]:
class CorpusSet:
    def __init__(self, corpora):
        self._yes = None
        self._xes = None
        self._corpora = corpora
        self._words = set()
        for corpus in self._corpora:
            self._words.update(corpus.get_words())
        
    @property
    def words(self):
        return self._words
    
    @property
    def xes(self):
        return self._xes
    
    @property
    def yes(self):
        return self._yes
    
    def calculate_sparse_vectors(self):
        self._yes = []
        self._xes = None
        for corpus in self._corpora:
            vectors = self.feature_matrix(corpus)
            if self._xes is None:
                self._xes = vectors
            else:
                self._xes = vstack((self._xes, vectors))
            self._yes.extend([corpus.sentiment_code] * vectors.shape[0])
        
    def feature_matrix(self, corpus):
        data = []
        indices = []
        indptr = [0]
        
        for sentence in corpus.get_sentences():
            sentence_indices = self._get_indices(sentence)
            indices.extend(sentence_indices)
            data.extend([1] * len(sentence_indices))
            indptr.append(len(indices))
            
        feature_matrix = csr_matrix((data, indices, indptr), 
                                    shape=(len(indptr) - 1,
                                           len(self._words)),
                                    dtype=np.float64)
        feature_matrix.sort_indices()
        return feature_matrix
    
    def feature_vector(self, sentence):
        indices = self._get_indices(sentence)
        data = [1] * len(indices)
        indptr = [0, len(indices)]
        vector = csr_matrix((data, indices, indptr),
                            shape=(1, len(self._words)),
                            dtype=np.float64)
        return vector
    
    def _get_indices(self, sentence):
        word_list = list(self._words)
        indices = []
        for token in Corpus.tokenize(sentence):
            if token in self._words:
                index = word_list.index(token)
                indices.append(index)
        return indices

In [None]:
POS_DATA = './data/support_vector_machines/rt-polaritydata/rt-polarity.pos'
NEG_DATA = './data/support_vector_machines/rt-polaritydata/rt-polarity.neg'

class TestSentimentClassifier(unittest.TestCase):
    def setUp(self): 
        pass
    
    def test_validate(self):
        """Cross validate with an error of 35% or less."""
        neg = self.split_file(POS_DATA)
        pos = self.split_file(NET_DATA)
        
        classifier = SentimentClassifier.build([
            neg['training'],
            pos['training']
        ])
        c = 2 ** 7
        classifier.c = c
        classifier.reset_model()
        
        n_er = self.validate(classifier, neg['validation'], 'negative')
        p_er = self.validate(classifier, pos['validation'], 'positive')
        total = Fraction(n_er.numerator + p_er.numerator,
                         n_er.denominator + p_er.denominator)
        print(total)
        self.assertLess(total, 0.35)
    
    def test_validate_itself(self):
        """Yields a zero error when it uses itself."""
        classifier = SentimentClassifier.build([
            POS_DATA, NEG_DATA
        ])
        
        c = 2 ** 7
        classifier.c = c
        classifier.reset_model()
        
        n_er = self.validate(classifier, NEG_DATA, 'negative')
        p_er = self.validate(classifier, POS_DATA, 'positive')
        total = Fraction(n_er.numerator + p_er.numerator,
                         n_er.denominator + p_er.denominator)
        
        print(total)
        self.assertEqual(total, 0)
        
    def validate(self, classifier, file, sentiment):
        total = 0
        misses = 0
        
        with open(file, 'r', encoding='latin-1') as f:
            for line in f:
                if classifier.classify(line) != sentiment:
                    misses += 1
                total += 1
        return Fraction(misses, total)

    def split_file(self, filepath):
        ext = os.path.splitext(filepath)[1]
        
        counter = 0
        training_filename = './data/support_vector_machines/training%s' % ext
        validation_filename = './data/support_vector_machines/validation%s' % ext
        
        with open(filepath, 'r', encoding='latin-1') as input_file:
            with open(validation_filename, 'w') as val_file:
                with open(training_filename, 'w') as train_file:
                    for line in input_file:
                        if counter % 2 == 0:
                            val_file.write(line)
                        else:
                            train_file.write(line)
                        counter += 1
        return {'training': training_filename,
                'validation': validation_filename}

In [None]:
class SentimentClassifier(object):
    ext_to_sentiment = {'.pos': 'positive',
                        '.neg': 'negative'}
    number_to_sentiment = {-1: 'negative',
                           1: 'positive'}
    
    @classmethod
    def present_answer(cls, answer):
        if isinstance(answer, ndarray):
            answer = answer[0]
        return cls.number_to_sentiment[answer]
    
    @classmethod
    def build(cls, files):
        corpora = []
        for file in files:
            ext = os.path.splitext(file)[1]
            corpus = Corpus(open(file, 'r', encoding='latin-1'), cls.ext_to_sentiment[ext])
            corpora.append(corpus)
        corpus_set = CorpusSet(corpora)
        return SentimentClassifier(corpus_set)

    def __init__(self, corpus_set):
        self._trained = False
        self._corpus_set = corpus_set
        self._c = 2 ** 7
        self._model = None
    
    @property
    def c(self):
        return self._c
    
    @c.setter
    def c(self, cc):
        self._c = cc
        
    def reset_model(self):
        self._model = None
        
    def words(self):
        return self._corpus_set.words
    
    def classify(self, string):
        if self._model is None:
            self._model = self.fit_model()
        prediction = self._model.predict(self._corpus_set.feature_vector(string))
        return self.present_answer(prediction)

    def fit_model(self):
        self._corpus_set.calculate_sparse_vectors()
        y_vec = self._corpus_set.yes
        x_mat = self._corpus_set.xes
        clf = svm.SVC(C=self.c,
                      cache_size=1000,
                      gamma=1.0/len(y_vec),
                      kernel='linear',
                      total=0.001)
        clf.fit(x_mat, y_vec)
        return clf

In [None]:
if __name__ == '__main__':
    unittest.main(argv=['excluded'], exit=False)

.FE