In [1]:
%matplotlib inline

In [11]:
import pandas
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import sklearn
import sklearn.linear_model
from datetime import datetime, date
from sklearn_pandas import DataFrameMapper
import warnings
import timeit
from collections import defaultdict, OrderedDict
import tabulate
import time
import GPy
import glob
from functools import reduce
from IPython.display import display

timeit.template = """
def inner(_it, _timer{init}):
    {setup}
    _t0 = _timer()
    for _i in _it:
        retval = {stmt}
    _t1 = _timer()
    return _t1 - _t0, retval
"""

matplotlib.style.use('ggplot')

RANDOM_SEED = 33

## Markov Models
The code below trains a simple Markov model for given data. The transition probabilities are simply the observed transition counts normalized to valid probability simplexes on each row of the matrix. 

In (somewhat) conforming with `sklearn` API, the model implements `fit` and `score`, as well as helper methods with convenience interfaces. 

In [56]:
ALPHABET = ['A', 'o', 'e', 't', 'p', 'g', 'k']
TRAINING_FILE_PATTERN = 'data/markov/symbol/language-training-lang{lang}-*'
TEST_FILE_PATTERN = 'data/markov/symbol/language-test-*'


class MarkovModel:
    def __init__(self, alphabet=ALPHABET):
        self.map = {letter: index for (index, letter) in enumerate(ALPHABET)}
        self.transitions = np.zeros((len(ALPHABET), len(ALPHABET)))
        self.probabilities = np.ones_like(self.transitions)  # default initialization    
    
    def fit(self, sequence):
        for first, second in zip(sequence, sequence[1:]):
                self.transitions[self.map[first], self.map[second]] += 1
                
        self.probabilities = self.transitions / np.sum(self.transitions, axis=1)
    
    def fit_from_path(self, path):
        with open(path) as f:
            sequence = f.read()
            self.fit(sequence)

    def fit_from_glob(self, pattern):
        for path in glob.glob(pattern):
            self.fit_from_path(path)
    
    def score(self, sequence, log=False):
        log_prob = np.sum([np.log(self.probabilities[self.map[first], self.map[second]])
                       for first, second in zip(sequence, sequence[1:])])
        if not log:
            return np.exp(log_prob)

        return log_prob
    

## Genertive Markov Model classifier

The code below implements a simple generative Markov model classifier. For a new input sequence $v_{1:T}^*$, and a given class $c$, the model considers the posterior $P(c | v_{1:T}^*) \propto P(v_{1:T}^* | c) P(c)$, the product of the data's likelihood according to the class multiplied by the prior for that class - Bayes's rule. To normalize, we must divide by the sum of posterior probabilities over all classes: 

$$ P(c | v_{1:T}^*) = \frac{P(v_{1:T}^* | c) P(c)}{\sum_{c' \in C} P(v_{1:T}^* | c') P(c')} $$

To classify, the model selects the argmax class over all class probabilities. In conforming with the `sklearn` API, the model implements `predict_proba`, `predict`, and `score`.

In [63]:
LANGUAGES = ('A', 'B', 'C')


class GenerativeMarkovModelClassifier:
    def __init__(self, prior=None, langauges=LANGUAGES):
        self.n = len(langauges)
        self.classes = langauges
        self.models = [MarkovModel() for i in range(self.n)]
        
        if not prior:
            self.prior = np.ones((self.n,)) / self.n

        elif not type(prior) == dict:
            if not len(prior) == self.n:
                raise ValueError('Prior and languages should have the same length')
                
            if not 1.0 == sum(prior):
                raise ValueError('Prior should sum up to 1')
            
            if any(prior < 0) or any (prior > 1):
                raise ValueError('Prior should be a valid probability simplex')
                
            self.prior = prior
    
    def fit_from_pattern(self, training_file_pattern=TRAINING_FILE_PATTERN):
        for (lang, model) in zip(self.classes, self.models):
            model.fit_from_glob(training_file_pattern.format(lang=lang))

    def predict_proba(self, sequence):
        probs = np.array([model.score(sequence) for model in self.models]) * self.prior
        return probs / np.sum(probs)
            
    def predict(self, sequence):
        probs = self.predict_proba(sequence)
        return self.classes[np.argmax(probs)]
    
    def score(self, sequence):
        return np.max(self.predict_proba(sequence))
    
    def predict_from_pattern(self, test_file_pattern=TEST_FILE_PATTERN, probs=False):
        predictions = []
        
        for path in glob.glob(test_file_pattern):
            with open(path) as f:
                sequence = f.read()
                if probs:
                    predictions.append(self.predict_proba(sequence))
                else:
                    predictions.append(self.predict(sequence))
            
        return predictions
    

In [64]:
gmmc = GenerativeMarkovModelClassifier()
gmmc.fit_from_pattern()
gmmc.predict_from_pattern()



['C', 'C', 'A', 'B', 'A', 'A', 'B', 'A', 'C', 'A']