In [1]:
import numpy as np
from nltk import sent_tokenize, word_tokenize as tokenize
from string import punctuation, whitespace
import re

punctuation += "«»—…“”–"
punct = set(punctuation)

In [2]:
class MarkovChain:
    START_TOKEN = "<START>"
    END_TOKEN = "<END>"
    
    def __init__(self, text: str, sequence_size:int):
        """
        Markov chain allows us to predict the next element of a sequence based
         on the current state. Here we predict the next character based of a
         given sequence of characters.
        
        parameters:
            text:          one string that contains the whole text on which the chain
                            is going to be based
            sequence_size: to predict the next element we take into account the last N
                            elements of a given sequence. The size of this sequence is
                            specified by this parameter.
        """
        self.seq_size = sequence_size
        self.text = text
        self.corpus = MarkovChain.normalize(text)

        self.all_chars = sorted(list(set(''.join(self.corpus)) - set(whitespace)))
        
        self.char_to_id = self.__char_to_id__()
        self.id_to_char = {v: k for k, v in self.char_to_id.items()}

        
        self.M = self.__coocurrance_matrix__()
        
    def predict_next_char(self, chars) -> str:
        assert len(chars) == self.seq_size
        
        id_tupl = tuple([self.char_to_id[char] for char in chars])
        next_id: int = np.argmax(self.M[id_tupl])
        if next_id == 0:
            # Every char has equal probability, most possibly 0, and <END> tag is returned
            return None 
        next_char: str = self.id_to_char[next_id]
        return next_char
    
    @staticmethod
    def normalize(self):
        tokens = [t.strip(punctuation) for t in tokenize(text)]
        return [t for t in tokens if len(t) > 0]
    
    def __coocurrance_matrix__(self):
        M = np.zeros(tuple(
                    [len(self.char_to_id)] * (self.seq_size+1)
                 ))
        for word in self.corpus:
            # srez has size `self.seq_size` + 1
            for srez in self.__char_groups__(word):
                ids = tuple([self.char_to_id[char] for char in srez])
                M[ids] += 1
        return M
                
        
    def __char_to_id__(self):
        char_to_id = {
            MarkovChain.START_TOKEN: 0,
            MarkovChain.END_TOKEN: 1,
        }
        for i, char in enumerate(self.all_chars):
            i += 2
            char_to_id[char] = i
        return char_to_id
        
    def __char_groups__(self, word):
        word: list = list(word)
        size = self.seq_size

        for i in range(size):
            word.append(MarkovChain.END_TOKEN)
            word.insert(0, MarkovChain.START_TOKEN)
        
        for i in range(len(word)-size):
            yield tuple(word[i:i+size+1])

In [3]:
text = open("../dictation_text.txt").read()

In [4]:
chain = MarkovChain(sequence_size=3,text=text)

In [5]:
for i in range(10000):
    rand_id = np.random.randint(0, len(chain.id_to_char))
    char = chain.id_to_char[rand_id]
    assert rand_id == chain.char_to_id[char]

In [6]:
for i in range(10000):
    pad = 3
    a = np.random.randint(pad, chain.M.shape[0]-pad) # ignoring START and AND tags
    b = np.random.randint(pad, chain.M.shape[1]-pad)
    c = np.random.randint(pad, chain.M.shape[2]-pad)
    d = np.random.randint(pad, chain.M.shape[3]-pad)
    together = chain.id_to_char[a] + \
                chain.id_to_char[b] + \
                chain.id_to_char[c] + \
                chain.id_to_char[d]
    count = sum(
                map(lambda word: word.count(together),
                chain.corpus
                   )
            )
    
    assert int(chain.M[a,b,c,d]) == count