In [40]:
import numpy as np
import pandas as pd
def word_tokenizer(input : str):
  '''
  Converts input string into an array of word tokens, splitting at any whitespace.
  Parameters:
    input (str): input with whitespace
  Returns:
    tokens (list of str)
  '''
  return input.split()
class MarkovChain:
  '''
  MarkovChain - can be trained to get transition probabilities, can be evaluated to get next n words.
  The format is kind of silly for markov chains, but should be something which can be
  relatively unmodified as the model complexity and training sample size grow
  '''
  def __init__(self):
    return
  def train(self, X : np.ndarray, pseudocount : int=0):
    '''
    Trains markov chain (populates transition probability matrix)
    Parameters:
      X (np.ndarray): tokens. This should already be of a reasonable vocabulary size (i.e. you should replace extra words with <unk> in the tokenizer!)
      pseudocount (int): amount to add to each element of count matrix (as a form of regularization) 
    '''
    unique = np.unique(X)
    # I honestly am not sure if there's a way to do this which relies more on built-in numpy functions
    # but its big-O runtime is constrained by O(v^2 + n) anyways, which I don't think we can improve upon.
    token_to_idx = dict()
    for i in range(len(unique)):
      word = unique[i]
      token_to_idx[word] = i
    print(token_to_idx)
    # we could use a sparse matrix here but that only works if we have 0 for pseudocount; probably fine either way.
    # It's a toy model!
    transition_matrix = np.zeros([len(unique), len(unique)])
    prev_token = None
    for token in X:
      if prev_token is not None:
        prev_idx = token_to_idx[prev_token]
        cur_idx = token_to_idx[token]
        transition_matrix[prev_idx, cur_idx] += 1
      prev_token = token
    for i in range(len(unique)):
      transition_matrix[i] /= np.sum(transition_matrix[i])
    print(transition_matrix)
    self.transition_matrix = transition_matrix
    self.token_to_idx = token_to_idx
  def predict(self, X : np.ndarray, n : int=1):
    '''
    Continues input token array for next n tokens (cannot specify empty X)
    Parameters:
      X (np.ndarray): tokens to continue
      n (int): number of tokens to output
    Returns:
      continuation (np.ndarray): tokens to output
    '''
    idx_to_token = {idx: token for token, idx in self.token_to_idx.items()}
    token_to_idx = self.token_to_idx
    prev_idx = token_to_idx[X[-1]]
    res_tokens = []
    for i in range(n):
      transition_probs = self.transition_matrix[prev_idx]
      next_idx = np.random.choice(len(token_to_idx), size=1, p=transition_probs)[0]
      res_tokens.append(idx_to_token[next_idx])
      prev_idx = next_idx
    return np.array(res_tokens)
  # TODO maybe worth adding a get_log_likelihood thing here to compare loss with other models

In [47]:
testStr = 'a a a b b b a a b b b b b b a a a a b b b c c a c c c a b b b c a a a c'
tokens = word_tokenizer(testStr)
model = MarkovChain()
model.train(tokens)
model.predict(tokens, 50)

{'a': 0, 'b': 1, 'c': 2}
[[0.57142857 0.28571429 0.14285714]
 [0.13333333 0.73333333 0.13333333]
 [0.5        0.         0.5       ]]


array(['c', 'a', 'a', 'a', 'a', 'a', 'a', 'b', 'b', 'c', 'a', 'a', 'a',
       'b', 'b', 'a', 'a', 'c', 'a', 'a', 'a', 'a', 'a', 'b', 'b', 'b',
       'b', 'c', 'c', 'c', 'a', 'a', 'b', 'a', 'b', 'b', 'b', 'c', 'c',
       'a', 'c', 'c', 'c', 'c', 'c', 'a', 'b', 'b', 'b', 'c'], dtype='<U1')