In [1]:
import requests
raw_poe = requests.get('https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/edgar_allan_poe.txt').content
raw_frost = requests.get('https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/robert_frost.txt').content

In [2]:
import numpy as np
import dask.array as da
import spacy
nlp = spacy.load("en_core_web_sm")

def decode_text(text):
    return bytes(text.decode("utf-8"), "utf-8").decode("unicode_escape")

def pad_array(jagged_array):
    width = max((len(row) for row in jagged_array))
    right_pad = lambda row: row + [""]*(width-len(row))
    return [right_pad(row) for row in jagged_array]

def tokenize_line(line):
    return np.array(["$BEGINCHAR$"] + [token.lemma_ for token in nlp(line)] + ["$ENDCHAR$"])

def process_text(text, chunksize=500, pad=False):
    enc_dec = decode_text(text)
    jagged_array = [tokenize_line(line) for line in enc_dec.split("\n")]
    array = pad_array(jagged_array) if pad else jagged_array
    return np.array(array, dtype=str if pad else object)

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
from sklearn.model_selection import train_test_split
def test_split_text(text):
    array = process_text(text)
    return train_test_split(array, random_state=1)
    
poe_train, poe_test = test_split_text(raw_poe)
frost_train, frost_test = test_split_text(raw_frost)

In [4]:
from itertools import chain
def flatten_array(array, jagged=True):
    if jagged:
        return np.array(list(chain.from_iterable(array)), dtype=str)
    return array.ravel()

In [34]:
base = ["$NEWCHAR$"]
vocab_p = np.unique(np.concatenate([base, flatten_array(poe_train)]))
vocab_f = np.unique(np.concatenate([base, flatten_array(frost_train)]))
total_vocab = np.unique(np.concatenate([vocab_p, vocab_f]))
vocab_size = len(total_vocab)
print(f"Vocab counts - poe: {len(vocab_p)}, frost: {len(vocab_f)}, total: {vocab_size}")

Vocab counts - poe: 1047, frost: 1587, total: 2238


In [35]:
idx2word = total_vocab
word2idx = {word:idx for idx, word in enumerate(idx2word)}
del word2idx

idx2word_p = vocab_p
word2idx_p = {word:idx for idx, word in enumerate(idx2word_p)}

idx2word_f = vocab_f
word2idx_f = {word:idx for idx, word in enumerate(idx2word_f)}

In [36]:
def vectorize_arr(array, word2idx, ragged=True):
    w2i_map = lambda word: word2idx.get(word, word2idx.get("$NEWCHAR$"))
    if not ragged:
        return np.vectorize(w2i_map)(array)
    return np.array([
        np.array(
            [w2i_map(word) for word in line], 
            dtype=int
        ) for line in array
    ], dtype=object)

In [37]:
poe_vec_t = vectorize_arr(poe_train, word2idx_p)
frost_vec_t = vectorize_arr(frost_train, word2idx_f)

In [11]:
def count_corpora(corpora, vocab_size, ragged = True):
    count = np.zeros(vocab_size, dtype=int)
    if ragged:
        def count_corpus(corpus):
            for idx, row in enumerate(corpus):
                np.add.at(count, row, 1)
    
        for corpus in corpora:
            count_corpus(corpus)
    else:
        all_values = np.concatenate([corpus.ravel() for corpus in corpora])
        np.add.at(count, all_values, 1)
    return count

In [82]:
corpora = [poe_vec_t, frost_vec_t]
word_counter = count_corpora(corpora, vocab_size)

In [38]:
def get_transition_matrix(vec_arr, vocab_size, epsilon_smoothing=0.2, ragged=True):

    # Account for unknown words
    vocab_size += 1
    
    # Initialize matrices with zeros
    trans_mat = np.zeros((vocab_size, vocab_size))
    init_mat = np.zeros(vocab_size)

    # For the initial states and transitions
    if ragged:
        for sentence in vec_arr:
            if len(sentence) == 0:
                continue
            init_mat[sentence[0]] += 1
            
            for i in range(len(sentence)-1):
                trans_mat[sentence[i], sentence[i+1]] += 1
    else:
        # For the initial states
        starts = vec_arr[:, 0]
        init_counts, _ = np.histogram(starts, bins=np.arange(vocab_size + 1))
        init_mat += init_counts
    
        # For the transitions
        y, x = vec_arr[:, :-1].ravel(), vec_arr[:, 1:].ravel()
        hist, _, _ = np.histogram2d(x, y, bins=(vocab_size, vocab_size))
        trans_mat += hist

    
    
    # Apply epsilon smoothing
    trans_mat += epsilon_smoothing
    init_mat += epsilon_smoothing

    # Normalize
    normalized_trans_mat = trans_mat / trans_mat.sum(axis=1, keepdims=True)
    normalized_init_mat = init_mat / init_mat.sum()
    
    return normalized_trans_mat, normalized_init_mat

In [39]:
poe_t_mat, summy = get_transition_matrix(poe_vec_t, len(idx2word_p)) #vocab_size)
frost_t_mat, _ = get_transition_matrix(frost_vec_t, len(idx2word_f)) #vocab_size)

In [57]:
def verify_probabilities(t_mat, max_err=10**-10):
    return [i for i in np.sum(t_mat, axis=1) if abs(i-1)>max_err] == []

In [58]:
print(verify_probabilities(poe_t_mat))
print(verify_probabilities(frost_t_mat))

True
True


In [42]:
import math
def get_logprob_mat(t_mat):
    vec_log = np.vectorize(math.log)
    return vec_log(t_mat)

In [43]:
poe_logprob_t = get_logprob_mat(poe_t_mat)
frost_logprob_t = get_logprob_mat(frost_t_mat)

In [44]:
def get_argprob(sequence, logprob_mat, word2idx):
    encoded_sequence = vectorize_arr(sequence, word2idx, ragged=False)
    argsum = 0
    for i in range(len(encoded_sequence)-1):
        argsum += logprob_mat[i, i+1]
    return argsum

In [55]:
def estimate_posterior(line, logprobs, word2idx):
    # seq = np.array(tokenize_line(line))
    best = -1*2**32
    best_label = ""
    for label, logprob_tmat in logprobs.items():
        logprob = get_argprob(line, logprob_tmat, word2idx)
        print("Label:", label, " logprob:", logprob)
        if logprob > best:
            best = logprob
            best_label = label
    return best_label

In [52]:
logprobs = {
    "Poe": poe_logprob_t,
    "Frost": frost_logprob_t
}
line = tokenize_line("These were days when my heart was volcanic As the scoriac rivers that roll")
result = estimate_posterior(line, logprobs, word2idx_p)
print(result)

Poe


In [None]:
#   is_poe  is_frost
# |_______|_________| label_poe
# |_______|_________| label_frost
#
results = {
    "Poe": {}, 
    "Frost": {}
}
test_data = {
    "Poe": poe_test,
    "Frost": frost_test
}
labeled_w2i = {
    "Poe": word2idx_p,
    "Frost": word2idx_f
}
for label, data in test_data.items():
    for line in data:
        prediction = estimate_posterior(line, logprobs, labeled_w2i[label])
        results[label][prediction] = results[label].get(prediction, 0) + 1

In [54]:
results

{'Poe': {'Poe': 200}, 'Frost': {'Poe': 396}}