In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import string
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, f1_score

pd.set_option('display.max_colwidth', -1)

  


# Load data

In [None]:
from google.colab import drive
drive.mount('/content/gdrive/', force_remount=True)

# Edgar Allan's line will be labled by 0 and Robert Frost's by 1
poem_lines = []
with open('/content/gdrive/MyDrive/Colab Notebooks/lazyprogrammer/data/edgar_allan_poe.txt', 'r') as file:
  for line in file:
    txt = line.rstrip().lower()
    if txt:
      txt = txt.translate(str.maketrans('', '', string.punctuation))
      poem_lines.append((txt, 0))

with open('/content/gdrive/MyDrive/Colab Notebooks/lazyprogrammer/data/robert_frost.txt', 'r') as file:
  for line in file:
    txt = line.rstrip().lower()
    if txt:
      txt = txt.translate(str.maketrans('', '', string.punctuation))
      poem_lines.append((txt, 1))

txt_df = pd.DataFrame(poem_lines, columns=['txt', 'author'])


if txt_df.isna().sum().max() == 0:
  print('There is no NA values')
else:
  print(f'There are {txt_df.isna().sum().max()} NAs' )

print(txt_df.shape)
txt_df.head()

Mounted at /content/gdrive/
There is no NA values
(2154, 2)


Unnamed: 0,txt,author
0,lo death hath reard himself a throne,0
1,in a strange city all alone,0
2,far down within the dim west,0
3,where the good and the bad and the worst and the best,0
4,have gone to their eternal rest,0


In [None]:
X_train, X_test, y_train, y_test = train_test_split(txt_df['txt'].values, txt_df['author'].values, test_size=0.2, random_state=42)
len(X_train)

1723

# Utils

In [None]:
def build_word2idx(txt_lines, skip_word_filter=None):
  word2idx = {'<unknown>': 0}
  idx = 1
  for line in txt_lines:
    for token in line.split():
      if token in word2idx:
        continue
      elif skip_word_filter is not None and not skip_word_filter(token):
        continue
      else:
        word2idx[token] = idx
        idx += 1
  return word2idx

def tokenize(txt, word2idx):
  vector = []
  for token in txt.split():
    vector.append(word2idx.get(token, 0)) 
  return vector

## Markov Model class

In [None]:
class IDiscreteProbabilityModel():
  def fit(self, x):
    pass

  def get_log_joint_prob(self, x):
    pass


class SimpleMarkovModel(IDiscreteProbabilityModel):
  def __init__(self, n_states):
    self.n_states = n_states
    self.log_pi = None
    self.log_A = None
  
  def fit(self, x):
    pi_count = np.ones(self.n_states) # t_0 probability distirubtion of initial state of s
    A_count = np.ones((self.n_states, self.n_states)) # A[i, j] - prob of transition from state s[i] to state s[j]
    for idx, x_cur in enumerate(x):
      pi_count[x_cur[0]] += 1
      for t in range(1, len(x_cur)):
        A_count[x_cur[t-1], x_cur[t]] += 1

    pi = pi_count / pi_count.sum()
    A = A_count / A_count.sum(axis=1, keepdims=True)

    self.log_pi = np.log(pi)
    self.log_A = np.log(A)

  def get_log_joint_prob(self, x):
    log_ll = self.log_pi[x[0]]
    for t in range(1, len(x)):
      log_ll += self.log_A[x[t-1], x[t]]
    return log_ll


class MAPClassifier(): # Maximum Posteriori
  def __init__(self, likelihood_models, smoothing=1e-08):
    self.n_classes = len(likelihood_models)
    self.models = likelihood_models
    self.log_prior = None
  
  def fit(self, x, y):
    for k in range(self.n_classes):
      self.models[k].fit(x[y == k])
    self.fit_prior(y)
  
  def fit_prior(self, y):
    prior = np.zeros(self.n_classes)
    for k in range(self.n_classes):
      prior[k] = np.mean(y == k)
    self.log_prior = np.log(prior)

  def predict(self, x):
    n = len(x)
    posterior = np.zeros((n, self.n_classes))
    for i, x_cur in enumerate(x):
      for k in range(self.n_classes):
        posterior[i, k] = self.models[k].get_log_joint_prob(x_cur) + self.log_prior[k]
    return posterior.argmax(axis=1)

# Preprocessing

In [None]:
word2idx_train = build_word2idx(X_train)
V = len(word2idx_train)
print('vocab size V:', V)

X_train_vectorized = np.array([tokenize(line, word2idx_train) for line in X_train], dtype=object)
X_test_vectorized = np.array([tokenize(line, word2idx_train) for line in X_test], dtype=object)

X_train_vectorized[:3]

vocab size V: 2612


array([list([1, 2, 3, 4, 5, 6, 7]), list([8, 9, 10, 11, 12, 13, 14, 15]),
       list([16, 17, 18, 19, 20, 1, 21])], dtype=object)

In [None]:
mm = MAPClassifier([SimpleMarkovModel(V), SimpleMarkovModel(V)])
mm.fit(X_train_vectorized, y_train)

In [None]:
pred_train = mm.predict(X_train_vectorized)
print(f'train accuracy: {(pred_train == y_train).mean()}')
print(f'train F1 score: {f1_score(y_train, pred_train)}')
print('train confusion matrix:')
confusion_matrix(y_train, pred_train)

train accuracy: 0.995937318630296
train F1 score: 0.9969736273238219
train confusion matrix:


array([[ 563,    7],
       [   0, 1153]])

In [None]:
pred_test = mm.predict(X_test_vectorized)
print(f'test accuracy: {(pred_test == y_test).mean()}')
print(f'test F1 score: {f1_score(y_test, pred_test)}')
print('test confusion matrix:')
confusion_matrix(y_test, pred_test)

test accuracy: 0.8445475638051044
test F1 score: 0.8928
test confusion matrix:


array([[ 85,  63],
       [  4, 279]])