# The Markov Model: Text Classifier

In [2]:
import string
import typing as t
from collections import defaultdict

import numpy as np
import pandas as pd
import requests
from sklearn.model_selection import train_test_split

In [3]:
EDGAR_ALLAN_POE = 'https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/edgar_allan_poe.txt'
ROBERT_FROST = 'https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/robert_frost.txt'

In [4]:
# eap = list(filter(None, requests.get(EDGAR_ALLAN_POE).text.splitlines()))
# rf = list(filter(None, requests.get(ROBERT_FROST).text.splitlines()))

eap_input = []
rf_input = []
for line in filter(str.strip, open('files/edgar_allan_poe.txt').read().splitlines()):
    eap_input.append(line.lower().translate(str.maketrans('', '', string.punctuation)))
for line in filter(str.strip, open('files/robert_frost.txt').read().splitlines()):
    rf_input.append(line.lower().translate(str.maketrans('', '', string.punctuation)))
eap_input[:5], rf_input[:5]

(['lo death hath reard himself a throne',
  'in a strange city all alone',
  'far down within the dim west',
  'where the good and the bad and the worst and the best',
  'have gone to their eternal rest'],
 ['two roads diverged in a yellow wood',
  'and sorry i could not travel both',
  'and be one traveler long i stood',
  'and looked down one as far as i could',
  'to where it bent in the undergrowth '])

In [5]:
eap_train, eap_test = train_test_split(eap_input, random_state=1234)
rf_train, rf_test = train_test_split(rf_input, random_state=1234)
eap_train[:5], rf_train[:5]

(['i replied  this is nothing but dreaming',
  'at sight of thee and thine at once awake',
  'lying down to die have suddenly arisen',
  'was it not fate that on this july midnight ',
  'in the fever of a minute'],
 ['the cellar windows were banked up with sawdust',
  'but you see dont you we take care of him ',
  'yes but he should have married her ',
  'except always johnjoe',
  'come out here if you want to hear me talk '])

In [6]:
total_train = len(eap_train) + len(rf_train)
eap_prior = len(eap_train) / total_train
rf_prior = len(rf_train) / total_train
eap_prior, rf_prior

(0.3331269349845201, 0.6668730650154798)

In [7]:
class MarkovModel:
    def __init__(self):
        self.A: 'np.ndarray[t.Any, np.dtype[np.ndarray[t.Any, np.float64]]]' = None
        self.pi: 'np.ndarray[t.Any, np.float64]' = None
        self.vocab: defaultdict[str, int] = defaultdict(lambda: len(self.vocab))
        self.reverse: t.Dict[int, str] = {}

    def train(self, documents: t.List[str]) -> 'MarkovModel':
        """
        Train the Markov model with the documents.
        :param documents: The documents to learn the A and pi matrices from.
        :returns: self
        """
        self.vocab.clear()
        self.reverse.clear()
        for document in documents:
            for token in filter(None, document.split()):
                self.reverse[self.vocab[token]] = token
        M = len(self.vocab) + 1
        self.A = np.zeros((M, M), np.float64)
        self.pi = np.zeros((M,), np.float64)
        count = 0
        for document in documents:
            generator = filter(None, document.split())
            first = next(generator)
            last = self.vocab[first]
            self.pi[last] += 1
            for token in generator:
                idx = self.vocab[token]
                self.A[last][idx] += 1
                count += 1
                last = idx
        self.A = np.log((self.A + 1) / (np.array([np.sum(self.A, axis=1)]).T + M))
        # self.A = np.log(self.A)
        self.pi = np.log((self.pi + 1) / (len(documents) + M))
        # self.pi = np.log(self.pi)
        return self

    def probability(self, document: str) -> float:
        """
        Calculate the relative probability of document existing.
        :param document: The document to calculate.
        :returns: The log probability of the document.
        """
        generator = filter(None, document.split())
        first = next(generator)
        last = self.vocab.get(first, len(self.vocab) - 1)
        result = self.pi[last]
        for token in generator:
            idx = self.vocab.get(token, len(self.vocab) - 1)
            result += self.A[last, idx]
            last = idx
        return result

In [8]:
eap = MarkovModel().train(eap_train)
rf = MarkovModel().train(rf_train)

In [9]:
eap.probability(eap_train[0])

-42.98739406076299

In [10]:
def predict(data):
    if eap.probability(data) + eap_prior > rf.probability(data) + rf_prior:
        return 'eap'
    return 'rf'

In [11]:
def run_test(dataset, expected):
    count = 0
    for data in dataset:
        if predict(data) == expected:
            count += 1
    return count / len(dataset)

In [12]:
eap_train_score = run_test(eap_train, 'eap')
rf_train_score = run_test(rf_train, 'rf')
eap_train_score, rf_train_score

(1.0, 0.9628597957288765)

In [13]:
eap_test_score = run_test(eap_test, 'eap')
rf_test_score = run_test(rf_test, 'rf')
eap_test_score, rf_test_score

(0.9722222222222222, 0.2590529247910863)

In [14]:
(
    eap_train_score * rf_prior + rf_train_score * eap_prior,
    eap_test_score * rf_prior + rf_test_score * eap_prior
)

(0.9876275975864616, 0.7346463200439242)