In [36]:
# !wget -nc https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/edgar_allan_poe.txt
# !wget -nc https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/robert_frost.txt

In [13]:
import pandas as pd
import unicodedata
import random
from sklearn.model_selection import train_test_split
import re
import nltk
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
import numpy as np
from sklearn.metrics import confusion_matrix, f1_score


In [2]:
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('omw-1.4')
nltk.download('stopwords')

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/sabyrkabylbek/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/sabyrkabylbek/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     /Users/sabyrkabylbek/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/sabyrkabylbek/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [7]:
with open("edgar_allan_poe.txt", "r", encoding="utf-8") as f:
    lines_edg = [line.strip() for line in f]
with open("robert_frost.txt", "r", encoding="utf-8") as f:
    lines_rob = [line.strip() for line in f]

# Data preparation

In [9]:
# remove blank or white-space lines
lines_edg = [i for i in lines_edg if i.strip() != '']
lines_rob = [i for i in lines_rob if i.strip() != '']

# make all lower case
lines_edg = [i.lower() for i in lines_edg]
lines_rob = [i.lower() for i in lines_rob]

# unicode normalization
lines_edg = [unicodedata.normalize("NFKC", i) for i in lines_edg]
lines_rob = [unicodedata.normalize("NFKC", i) for i in lines_rob]

In [11]:
lines_edg

["lo! death hath rear'd himself a throne",
 'in a strange city, all alone,',
 'far down within the dim west',
 'where the good, and the bad, and the worst, and the best,',
 'have gone to their eternal rest.',
 'there shrines, and palaces, and towers',
 'are not like any thing of ours',
 'oh no! o no! ours never loom',
 'to heaven with that ungodly gloom!',
 'time-eaten towers that tremble not!',
 'resemble nothing that is ours.',
 'around, by lifting winds forgot,',
 'resignedly beneath the sky',
 'the melancholy waters lie.',
 'no holy rays from heaven come down',
 'on the long night-time of that town,',
 'but light from out the lurid sea',
 'streams up the turrets silently',
 'up thrones up long-forgotten bowers',
 "of scultur'd ivy and stone flowers",
 'up domes up spires up kingly halls',
 'up fanes up babylon-like walls',
 'up many a melancholy shrine',
 'whose entablatures intertwine',
 'the mask the viol and the vine.',
 'there open temples open graves',
 'are on a level with th

In [7]:
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))

def tokenize(line):
    return re.findall(r'\w+|[^\w\s]', line, re.UNICODE)

def lemmatize_tokens(tokens):
    return [lemmatizer.lemmatize(token) if token.isalpha() else token for token in tokens]

def remove_stopwords(tokens):
    return [token for token in tokens if not (token.lower() in stop_words and token.isalpha())]
    

lines_edg = [tokenize(i) for i in lines_edg]
lines_rob = [tokenize(i) for i in lines_rob]

lines_edg = [lemmatize_tokens(i) for i in lines_edg]
lines_rob = [lemmatize_tokens(i) for i in lines_rob]

lines_edg = [remove_stopwords(i) for i in lines_edg]
lines_rob = [remove_stopwords(i) for i in lines_rob]

lines_edg = [i for i in lines_edg if len(i) != 0]
lines_rob = [i for i in lines_rob if len(i) != 0]

In [11]:
# set labels
lines_edg_lb = [[i, 0] for i in lines_edg] # for edgar - 0
lines_rob_lb = [[i, 1] for i in lines_rob] # for rob - 1
all_lb_lines = lines_edg_lb + lines_rob_lb

# separate all lines and labels
all_lines = [i[0] for i in all_lb_lines]
all_labels = [i[1] for i in all_lb_lines]

# split training and test data
X_train, X_test, Y_train, Y_test = train_test_split(
    all_lines, all_labels, 
    test_size = 0.2, random_state = 42, stratify=all_labels
)

In [14]:
# Map words to index
def word_idx_map(line_token_list: list()) -> dict:
    word_idx = {'<unk>': 0}
    idx = 1
    for line_token in line_token_list:
        for token in line_token:
            if token not in word_idx:
                word_idx[token] = idx
                idx += 1
    return word_idx
    
# Convert words list to its' index list
def word_idx_convert(line_token_list: list(), word2idx: dict()) -> list:
    text_int = []
    for line_token in line_token_list:
        line_int = [word2idx.get(token, 0) for token in line_token]
        text_int.append(line_int)
    return text_int

In [16]:
word2idx = word_idx_map(X_train)
X_train_int = word_idx_convert(X_train, word2idx)
X_test_int = word_idx_convert(X_test, word2idx)

In [18]:
# Prepare the matrices A and P for each class
V = len(word2idx)
A0 = np.ones((V, V))
A1 = np.ones((V, V))

pi0 = np.ones(V)
pi1 = np.ones(V)

In [20]:
def count_transitions(text_int, A, pi):
    for line_int in text_int:
        last_idx = None
        for idx in line_int:
            if last_idx is None:
                pi[idx] += 1
            else:
                A[last_idx, idx] += 1
            last_idx = idx 

In [22]:
count_transitions([t for t, y in zip(X_train_int, Y_train) if y == 0], A0, pi0)
count_transitions([t for t, y in zip(X_train_int, Y_train) if y == 1], A1, pi1)

In [24]:
A0 /= A0.sum(axis = 1, keepdims = True)
A1 /= A1.sum(axis = 1, keepdims = True)

pi0 /= pi0.sum()
pi1 /= pi1.sum()

In [26]:
logA0 = np.log(A0)
logA1 = np.log(A1)
logpi0 = np.log(pi0)
logpi1 = np.log(pi1)

In [28]:
# compute priors
count0 = sum(y == 0 for y in Y_train)
count1 = sum(y == 1 for y in Y_train)
total = len(Y_train)
p0 = count0 / total
p1 = count1 / total
logp0 = np.log(p0)
logp1 = np.log(p1)
p0, p1

(0.3335270191748983, 0.6664729808251016)

In [30]:
# build a classifier
class Classifier:
  def __init__(self, logAs, logpis, logpriors):
    self.logAs = logAs
    self.logpis = logpis
    self.logpriors = logpriors
    self.K = len(logpriors) # number of classes

  def _compute_log_likelihood(self, input_, class_):
    logA = self.logAs[class_]
    logpi = self.logpis[class_]

    last_idx = None
    logprob = 0
    for idx in input_:
      if last_idx is None:
        # it's the first token
        logprob += logpi[idx]
      else:
        logprob += logA[last_idx, idx]
      
      # update last_idx
      last_idx = idx
    
    return logprob
  
  def predict(self, inputs):
    predictions = np.zeros(len(inputs))
    for i, input_ in enumerate(inputs):
      posteriors = [self._compute_log_likelihood(input_, c) + self.logpriors[c] \
             for c in range(self.K)]
      pred = np.argmax(posteriors)
      predictions[i] = pred
    return predictions

In [32]:
# each array must be in order since classes are assumed to index these lists
clf = Classifier([logA0, logA1], [logpi0, logpi1], [logp0, logp1])

In [34]:
Ptrain = clf.predict(X_train_int)
print(f"Train acc: {np.mean(Ptrain == Y_train)}")

Train acc: 0.9953515398024404


In [36]:
Ptest = clf.predict(X_test_int)
print(f"Test acc: {np.mean(Ptest == Y_test)}")

Test acc: 0.8190255220417634


In [44]:
cm = confusion_matrix(Y_train, Ptrain)
cm


array([[ 566,    8],
       [   0, 1147]])

In [46]:
cm_test = confusion_matrix(Y_test, Ptest)
cm_test

array([[ 77,  67],
       [ 11, 276]])

In [48]:
# F1 score for train data
f1_score(Y_train, Ptrain)

0.996524761077324

In [50]:
# F1 score for test data
f1_score(Y_test, Ptest)

0.8761904761904762