In [1]:
!wget -P ./data/ -nc https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/robert_frost.txt
!wget -P ./data/ -nc https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/edgar_allan_poe.txt


File ‘./data/robert_frost.txt’ already there; not retrieving.

File ‘./data/edgar_allan_poe.txt’ already there; not retrieving.



In [2]:
import numpy as np 
import matplotlib.pyplot as plt
import string 
from sklearn.model_selection import train_test_split

In [3]:
input_files = [
    './data/robert_frost.txt',
    './data/edgar_allan_poe.txt',
]

In [4]:
!head -n 10 ./data/robert_frost.txt

Two roads diverged in a yellow wood,
And sorry I could not travel both
And be one traveler, long I stood
And looked down one as far as I could
To where it bent in the undergrowth; 

Then took the other, as just as fair,
And having perhaps the better claim
Because it was grassy and wanted wear,
Though as for that the passing there


In [7]:
!head -n 10 ./data/edgar_allan_poe.txt

LO! Death hath rear'd himself a throne
In a strange city, all alone,
Far down within the dim west
Where the good, and the bad, and the worst, and the best,
Have gone to their eternal rest.
 
There shrines, and palaces, and towers
Are not like any thing of ours
Oh no! O no! ours never loom
To heaven with that ungodly gloom!


In [8]:
# collect data 
input_texts = []
labels = []

for label, f in enumerate(input_files): 
    print(f"{f} corresponds to {label}")

    for line in open(f):
        # remove tabs / newlines 
        line = line.rstrip().lower() 

        if line:
            # remove punctuation
            line = line.translate(str.maketrans('', '', string.punctuation)) 

            input_texts.append(line) 
            labels.append(label) 



./data/robert_frost.txt corresponds to 0
./data/edgar_allan_poe.txt corresponds to 1


In [9]:
train_text, test_text, Ytrain, Ytest = train_test_split(input_texts, labels) 

In [10]:
len(Ytrain) , len(Ytest)

(1615, 539)

In [11]:
train_text[:5]

['more of the salt wherewith theyre to be salted',
 'or creaking with a buggy load of grain',
 'here once through an alley titanic',
 'though once we had journeyed down here',
 'at thy softmurmured words let there be light']

In [12]:
# train test split shuffles the dataset, so random authors here
Ytrain[:5]

[0, 0, 1, 1, 1]

In [13]:
idx = 1 
# unknown token idx is 0, known words start at 1
word2idx = {'<unk>' : 0}

In [14]:
# populate word2idx 
for text in train_text: 
    tokens = text.split() 

    for token in tokens: 
        if token not in word2idx: 
            word2idx[token] = idx
            idx += 1 

In [15]:
word2idx

{'<unk>': 0,
 'more': 1,
 'of': 2,
 'the': 3,
 'salt': 4,
 'wherewith': 5,
 'theyre': 6,
 'to': 7,
 'be': 8,
 'salted': 9,
 'or': 10,
 'creaking': 11,
 'with': 12,
 'a': 13,
 'buggy': 14,
 'load': 15,
 'grain': 16,
 'here': 17,
 'once': 18,
 'through': 19,
 'an': 20,
 'alley': 21,
 'titanic': 22,
 'though': 23,
 'we': 24,
 'had': 25,
 'journeyed': 26,
 'down': 27,
 'at': 28,
 'thy': 29,
 'softmurmured': 30,
 'words': 31,
 'let': 32,
 'there': 33,
 'light': 34,
 'i': 35,
 'cant': 36,
 'say': 37,
 'hes': 38,
 'much': 39,
 'help': 40,
 'gentlest': 41,
 'all': 42,
 'gentle': 43,
 'names': 44,
 'dost': 45,
 'take': 46,
 'gnaws': 47,
 'in': 48,
 'stable': 49,
 'aint': 50,
 'no': 51,
 'proof': 52,
 'me': 53,
 'but': 54,
 'wont': 55,
 'should': 56,
 'burned': 57,
 'stake': 58,
 'something': 59,
 'ill': 60,
 'tell': 61,
 'you': 62,
 'what': 63,
 'voices': 64,
 'really': 65,
 'own': 66,
 'makes': 67,
 'my': 68,
 'head': 69,
 'spin': 70,
 'not': 71,
 'long': 72,
 'ago': 73,
 'writer': 74,
 'these

In [16]:
len(word2idx)

2510

In [23]:
# convert data into integer format 
train_text_int = []
test_text_int = []

for text in train_text: 
    tokens = text.split() 

    # map each token to its corresponding word, line is array of ints 
    line_as_int = [word2idx[token] for token in tokens] 

    # array of ints 
    train_text_int.append(line_as_int)

for text in test_text: 
    tokens = text.split() 

    # possible that not every word in train appears in test, if it doesn't appear => return default value 0 (<unk>)
    line_as_int = [word2idx.get(token, 0) for token in tokens] 
    test_text_int.append(line_as_int)

In [24]:
train_text_int[100:105]

[[129, 371, 372, 373, 2, 374],
 [7, 375, 100, 376, 125, 377],
 [378, 256, 379, 120, 53, 35, 380, 35, 381],
 [303, 382, 175, 7, 62, 84, 25, 303, 383],
 [7, 13, 384, 209, 3, 385, 100, 386, 387]]

In [27]:
# initialize A and pi matrices for both classes 
# size = number of different words (to satisfy current_word, last_word match in the indexes)
V = len(word2idx) 

# as many models as classes (here edgar and robert)
A0 = np.ones((V, V))

# probability that a word will be at the start of a sentence 
pi0 = np.ones(V) 

A1 = np.ones((V, V))
pi1 = np.ones(V)

In [28]:
# compute counts for A and pi, A/pi object => modifications will be remembered
def compute_counts(text_as_int, A, pi): 
    for tokens in text_as_int: 
        last_idx = None

        for idx in tokens: 
            if last_idx is None: 
                # it's the first word in a sentence
                pi[idx] += 1 
            else: 
                # the last word exists, so count a transition 
                # between last_word, current_wod
                A[last_idx, idx] += 1 
        
            # update the last idx
            last_idx = idx 

# will still pass only train_text_int to the function 
# zip only needed to determine if y == 0 
compute_counts([t for t, y in zip(train_text_int, Ytrain) if y == 0 ], A0, pi0)
compute_counts([t for t, y in zip(train_text_int, Ytrain) if y == 1 ], A1, pi1)


In [31]:
# normalize A and pi so they are valid probability matrices 
# Create probability of each last_word, current_word combination 
A0 /= A0.sum(axis=1, keepdims=True) 

# it will sum to 1 (100%)
pi0 /= pi0.sum() 


# one markov matrix per class (here are 2, one for poe and one for frost)
A1 /= A1.sum(axis=1, keepdims=True) 
pi1 /= pi1.sum() 

In [32]:
# log A and pi since we don't need the actual probs 
# multiplication will remain above 0 
logA0 = np.log(A0) 
logpi0 = np.log(pi0) 

logA1 = np.log(A1) 
logpi1 = np.log(pi1) 

In [33]:
# compute priors, sentences written by frost and poe respectively
count0 = sum(y == 0 for y in Ytrain) 
count1 = sum(y == 1 for y in Ytrain) 

# total written by both together
total = len(Ytrain)

# chance a sentence is written by poe or frost 
p0 = count0 / total 
p1 = count1 / total 

# log because it will be multiplied and will become too small to handle
logp0 = np.log(p0) 
logp1 = np.log(p1)

# imbalanced
p0, p1

(0.6557275541795665, 0.3442724458204334)

In [34]:
# build classifier 
class Classifier: 
    def __init__(self, logAs, logpis, logpriors) -> None:
        # array of matrices
        self.logAs = logAs

        # array of arrays 
        self.logpis = logpis

        # array, the probability of each class (65% for frost here)
        self.logpriors = logpriors


        self.K = len(logpriors) # number of classes

    # line of text and which markov model to use
    def _compute_log_likelihood(self, input_, class_): 
        # take log values from the precomputed matrix
        logA = self.logAs[class_]
        logpi = self.logpis[class_]

        # loop through the idx words in a sentence 
        last_idx = None
        logprob  = 0 
        for idx in input_: 
            if last_idx is None: 
                # it's the first token (first word)
                logprob += logpi[idx]
            else: 
                # state transition matrix at index lastword, current word 
                logprob += logA[last_idx, idx]

            # update last_idx
            last_idx = idx

        # probability of this sequence (whole sentence)
        return logprob
    
    # pass list of sentences
    def predict(self, inputs): 
        # store predictions (1 elem per sentece)
        predictions = np.zeros(len(inputs))

        for i, input_ in enumerate(inputs): 
            # loop over all classes and compute the prob of the sentence for each class, save in array
            # at index 0 will be the probability of being frost 
            posteriors = [self._compute_log_likelihood(input_, c) + self.logpriors[c] for c in range(self.K)]

            # take the index of the highest probability 
            pred = np.argmax(posteriors) 

            # store in the array of all sentences
            predictions[i] = pred 

        return  predictions

In [35]:
# each array must be in order, index in array = class
clf = Classifier([logA0, logA1], [logpi0, logpi1], [logp0, logp1])

In [36]:
# trains the model using the precomputed A and pi matrices
Ptrain = clf.predict(train_text_int)

print(f"Train accuracy: {np.mean(Ptrain == Ytrain)}")

Train accuracy: 0.9950464396284829


In [37]:
# sentences already vectorized
Ptest = clf.predict(test_text_int) 

print(f"Test acc: {np.mean(Ptest == Ytest)}")

Test acc: 0.8256029684601113


In [38]:
from sklearn.metrics import confusion_matrix, f1_score

cm = confusion_matrix(Ytrain, Ptrain) 
cm

# classified as frost (truly frost, truly poe)
# classified as poe (truly frost, truly poe)

array([[1059,    0],
       [   8,  548]])

In [39]:
cm_test = confusion_matrix(Ytest, Ptest)
cm_test

array([[365,  12],
       [ 82,  80]])

In [40]:
f1_score(Ytrain, Ptrain)

0.9927536231884058

In [41]:
f1_score(Ytest, Ptest)

0.6299212598425197