### Text Classifier Exercise Prompt

- You'll be given poems by 2 authors: Edgar Allan Poe and Robert Frost
- Build a classifier that can distinguish between the 2 authors
- Compute train and test accuracy
- Check for class imbalance, compute F1-score if imbalanced BAY

#### Details
- Convert each line of text (the samples) into integer lists
- Train a Markov model for each class (Edgar Allan Poe / Robert Frost)
- Don't forget to use smoothing (e.g. add-one smoothing)
- Consider whether you need A and x, or log(A) and log(x)
- For Bayes' rule, compute the priors: p(class = k)

- Write a function to compute the posterior for each class, given an input
- Take the argmax over the posteriors to get the predicted class
- Make predictions for both train and test sets
- Compute accuracy for train/test
- Check for class imbalance
    - If imbalanced, calculate confusion matrix and f1-score

In [None]:
!cd datasets & wget -nc https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/edgar_allan_poe.txt
!cd datasets & wget -nc https://raw.githubusercontent.com/lazyprogrammer/machine_learning_examples/master/hmm_class/robert_frost.txt

In [204]:
import string
import random

import numpy as np
from sklearn.model_selection import train_test_split


random.seed(22)


class LoadDataset:
    def __init__(self, text_path: str, label: int):
        self.file = self._load(text_path)
        self.label = label

    def _load(self, path: str):
        with open(path, mode = "r", newline = "\n") as file:
            return file.read().splitlines()
    
    def clean_lines(self) -> list:
        input_text = []

        for line in self.file:
            line = line.rstrip().lower()

            if not line:
                continue

            line = line.translate( str.maketrans("", "", string.punctuation) )
            input_text.append(line)

        return input_text

    def get_samples(self) -> tuple[list, list]:
        input_text = self.clean_lines()
        return input_text, [self.label for _ in range(len(input_text))]
    
    @classmethod
    def get_datasets(cls, x1: list, y1: list, x2: list, y2: list, train_ratio: float) -> tuple[list, list, list, list]:
        x = x1 + x2
        y = y1 + y2

        train = int(train_ratio * len(x))
        return train_test_split(x, y, train_size = train, shuffle = True, random_state = 22)
    
class Word2Idx:
    @classmethod
    def map_word2idx(cls, dataset: list) -> dict:
        word2idx = {"<unk>": 0}

        for line in dataset:
            tokens = line.split()
            for token in tokens:
                if token not in word2idx:
                    word2idx[token] = len(word2idx)
        return word2idx
    

    @classmethod
    def map_to_int(cls, dataset: list, mapper: dict) -> list:
        dataset_int = []
        for line in dataset:
            tokens = line.split()
            dataset_int.append([mapper.get(token, 0) for token in tokens])
        return dataset_int

class MarkovModel:
    def __init__(self, samples: list, mapper: list):
        self.samples = samples
        self.mapper = mapper

    def train(self, epsilon: float = 1) -> None:
        """
        Get the State Transition Matrix (A) and the Initial State (π)
        """
        self._init_states(len(self.mapper))

        for tokens in self.samples:
            first_token = tokens[0]
            self.pi[first_token] += 1

            for prev_token, current_token in zip(tokens[:-1], tokens[1:]):
                self.A[current_token, prev_token] += 1

        if epsilon > 0:
            self.A += epsilon
            self.pi += epsilon

        #Normalize
        self.A /= self.A.sum(axis = 0, keepdims = True)
        self.pi /= self.pi.sum()

        return self.A, self.pi
    
    def _init_states(self, size: int) -> None:
        self.A = np.zeros((size, size))
        self.pi = np.zeros(size)

class MarkovClassifier:
    def __init__(self):
        self.class_models = {}  # Stores {class_label: (A, pi)}
        self.class_priors = {}  # Stores P(class)

    def train(self, X_train: list, y_train: list, mapper: dict, epsilon: float) -> None:
        """
        Train a Markov model for each class.
        
        Args:
            X_train: List of tokenized sequences (as integers).
            y_train: List of class labels.
            mapper: Word-to-index mapping dictionary.
            epsilon: Smooth factor.
        """

        self.class_priors = self._compute_class_priors(y_train)

        self._train_markov_model_per_class(X_train, y_train, mapper, epsilon)


    def _compute_class_priors(self, y_train: list) -> dict:
        """
        Compute the prior probability:
                p(author = k)
        """

        total_samples = len(y_train)
        class_priors =  {}
        for class_label in set(y_train):
            class_priors[class_label] = np.sum(np.array(y_train) == class_label) / total_samples
        return class_priors

    def _train_markov_model_per_class(self, X_train: list, y_train: list, mapper: dict, epsilon: float):

        for class_label in self.class_priors:
            class_samples = [seq for seq, label in zip(X_train, y_train) if label == class_label]

            model = MarkovModel(class_samples, mapper)
            A, pi = model.train(epsilon)
            self.class_models[class_label] = (A, pi)
    
    def predict(self, x_test: list):
        """
        k* = argmax  log p(poem | author = k) + log p(author = k)
                k
        """
        predictions = []

        for sequence in x_test:
            posteriors = []

            for class_label, (A, pi) in self.class_models.items():
                log_likelihood = self.log_sequence_probability(sequence, A, pi)
                log_prior = np.log( self.class_priors[class_label] + 1e-10 )
                log_posterior = log_likelihood + log_prior

                posteriors.append(log_posterior)

            p_author = np.argmax(posteriors)
            predictions.append(p_author)
        return np.asarray(predictions)

    def log_sequence_probability(self, sequence: list, A: np.ndarray, pi: np.ndarray):
        #Initial state
        log_prob = np.log( pi[sequence[0]] + 1e-10 )

        for prev_token, current_token in zip( sequence[:-1] , sequence[1:] ):
            log_prob += np.log( A[current_token, prev_token] + 1e-10)
        return log_prob


In [198]:
poe = LoadDataset("../datasets/MarkovModelClassification/edgar_allan_poe.txt", 0)
frost = LoadDataset("../datasets/MarkovModelClassification/robert_frost.txt", 1)

X_poe, y_poe = poe.get_samples()
X_frost, y_frost = frost.get_samples()

x_train, x_test, y_train, y_test = LoadDataset.get_datasets(X_poe, y_poe, X_frost, y_frost, 0.9)

word2idx = Word2Idx.map_word2idx(x_train)

x_train_mapped = Word2Idx.map_to_int(x_train, word2idx)
x_test_mapped = Word2Idx.map_to_int(x_test, word2idx)

In [215]:
classifier = MarkovClassifier()
classifier.train(x_train_mapped, y_train, word2idx, 0.1)

In [216]:
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix

x_pred = classifier.predict(x_train_mapped)
accuracy = accuracy_score(y_train, x_pred)
f1 = f1_score(y_train, x_pred)

print(f"Train Accuracy: {accuracy}")
print(f"Train f1_score: {f1}")

confusion_matrix(y_train, x_pred)

Train Accuracy: 0.999484004127967
Train f1_score: 0.9996155324875048


array([[ 637,    1],
       [   0, 1300]])

In [217]:
x_pred = classifier.predict(x_test_mapped)
accuracy = accuracy_score(y_test, x_pred)
f1 = f1_score(y_test, x_pred)

print(f"Test Accuracy: {accuracy}")
print(f"Test f1_score: {f1}")

confusion_matrix(y_test, x_pred)

Test Accuracy: 0.8194444444444444
Test f1_score: 0.8602150537634409


array([[ 57,  23],
       [ 16, 120]])