In [1]:
# Standard imports
import numpy as np
from pprint import pprint
import pandas as pd

import spacy

# Sklearn
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, TransformerMixin


# Built-in library
import itertools
import re
import json
from typing import Union, Optional, Any
from pathlib import Path
import logging
import warnings

warnings.filterwarnings("ignore")

# pandas settings
pd.options.display.max_rows = 1_000
pd.options.display.max_columns = 1_000
pd.options.display.max_colwidth = 600

# Black code formatter (Optional)
%load_ext lab_black
# auto reload imports
%load_ext autoreload
%autoreload 2

In [2]:
def extract_text_data(*, filepath: Path, label: int) -> pd.DataFrame:
    """This loads the text data, assigns a label and returns a DF."""
    with open(filepath, "r") as file:
        # Remove empty lines: using len(line) > 5
        data = [(line.strip(), label) for line in file.readlines() if len(line) > 5]
    # Convert to DF
    df = pd.DataFrame(data=data, columns=["text", "label"])
    return df


def remove_punctuations(text: str) -> str:
    import string

    """This returns the text without punctuations"""
    cleaned_text = re.compile(pattern=f"[{re.escape(string.punctuation)}]").sub(
        repl="", string=str(text)
    )
    return cleaned_text

In [3]:
fp = "edgar_allan_poe.txt"
fp1 = "robert_frost.txt"
label_0, label_1 = 0, 1

edgar_allan_poe = extract_text_data(filepath=fp, label=label_0)
robert_frost = extract_text_data(filepath=fp1, label=label_1)

edgar_allan_poe.head()

Unnamed: 0,text,label
0,LO! Death hath rear'd himself a throne,0
1,"In a strange city, all alone,",0
2,Far down within the dim west,0
3,"Where the good, and the bad, and the worst, and the best,",0
4,Have gone to their eternal rest.,0


In [4]:
robert_frost.head()

Unnamed: 0,text,label
0,"Two roads diverged in a yellow wood,",1
1,And sorry I could not travel both,1
2,"And be one traveler, long I stood",1
3,And looked down one as far as I could,1
4,To where it bent in the undergrowth;,1


In [5]:
# Create the data

RANDOM_STATE = 2
TEST_SIZE = 0.1

data = pd.concat([edgar_allan_poe, robert_frost], axis="rows").reset_index(drop=True)
# Remove punctuations
data = data.assign(text=data["text"].apply(remove_punctuations))

data.sample(n=10, random_state=RANDOM_STATE)

Unnamed: 0,text,label
1703,Not bluebells gracing a tunnel mouth,1
1275,Of door and headboard Where it wants to get,1
294,Hath ever told or is it of a thought,0
551,To the Lethean peace of the skies,0
426,Then desolately fall,0
662,That from new fountains overflow,0
2055,Whats this,1
611,Of its own fervour what had oer it power,0
1303,The way a man with one leg and a crutch,1
1641,How shall we,1


### Split The Data Into Train And Validation Sets

In [6]:
X = data["text"]
y = data["label"]

X_train, X_valid, y_train, y_valid = train_test_split(
    X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE
)

X_train.shape, X_valid.shape

((1936,), (216,))

In [7]:
from abc import ABC, abstractmethod


class BaseModel(ABC):
    """This is the blueprint used for building ML models."""

    @abstractmethod
    def __repr__(self) -> str:
        """This is the string representation of the model"""
        pass

    def fit(
        self, X: Union[np.ndarray, pd.DataFrame], y: Union[np.ndarray, pd.Series]
    ) -> None:
        """This is used to train the model."""
        return self

    def predict(self, X: Union[np.ndarray, pd.DataFrame]) -> None:
        """This is used to make predictions using new data."""
        pass

In [8]:
data["text"].head()

0                     LO Death hath reard himself a throne
1                              In a strange city all alone
2                             Far down within the dim west
3    Where the good and the bad and the worst and the best
4                          Have gone to their eternal rest
Name: text, dtype: object

In [9]:
class PreprocessText:
    """this is used to convert the corpus to a list of
    tokenized documents."""

    def __init__(self) -> None:
        self.X = None
        self._vocab = {}

    def __repr__(self) -> str:
        return (
            f"{__class__.__name__}(corpus_size: {len(self.X)}, "
            f"vocab_size: {len(self._vocab)})"
        )

    def fit(self, X: pd.Series) -> dict:
        """This is used to create the vocabulary of the corpus."""
        # Create a dict that'll be used to store unique words and their integer
        # mappings as key-value pairs. Add a custom token `unk` with a value of 0
        # which will be used for tokens that are not present in the training data.
        vocab = {"unk": 0}
        count = 1

        for doc in X:
            # Tokenize the document
            tokenized_doc = [x.lower() for x in doc.split()]
            for term in tokenized_doc:
                if term not in vocab:
                    vocab[term] = count
                    count += 1

        self.X, self._vocab = X, vocab
        return self

    def transform(self, X: pd.Series) -> list[int]:
        """This is used to tokenize the documents.
        It returns the tokenized documents as list of integers."""
        tokenized_documents = []
        for doc in X:
            # Tokenize the document
            tokenized_doc = [x.lower() for x in doc.split()]
            # Get the integer values of the tokens
            sent_int = [self._vocab.get(term, 0) for term in tokenized_doc]
            tokenized_documents.append(sent_int)

        return tokenized_documents

    def fit_transform(self, X: pd.Series) -> list[int]:
        """this is used to create the vocabulary and tokenize the corpus."""
        self.fit(X=X)
        return self.transform(X)

### Multi-Nomial Naive Bayes

* **Transition Matrix** $\mathbf{A_{ij}}$: This is the probability of a state, `t`, given a previous state, `t-1`. This is a matrix (2-D array).
  
$$
\mathbf{A_{ij}} = p(s_{t} = j | s_{t-1} = i)
$$

* Predicted/Estimate Transition Matrix

$$
\mathbf{\hat{A}_{ij}} = \frac{count(i \rightarrow j)}{count(i)}
$$

* **Initial State Distribution** $\mathbf{\pi_{i}} $: This is the probability of an initial state in a sequence. This is a vector (1-D array).

$$
\mathbf{\pi_{i}} = p(s_{1} = i)
$$

* Estimated Initial State Distribution

$$
\mathbf{\hat{\pi_{i}}} = \frac{count(s_{1} = i)}{N}
$$


```text
where
N = Number of sequences.
```

$$
\mathbf{p(y|X)} = \frac{p(X|y).p(y)}{p(X)}
$$

```text
where
p(y|X) = Posterior probability
p(X|y) = Class conditional probability or likelihood
p(y) = Prior probability of y
p(X) = Marginal probability of X
```

* Since $p(X)$ does not depend on `y`, we can ignore it.

$$
\mathbf{p(y|X)} = {p(X|y).p(y)}
$$

* If we have `n` features, it becomes:

$$
\mathbf{p(y|X)} = {p(x_{1}|y).p(x_{2}|y)...p(x_{n}|y).p(y)}
$$

* Taking the log, we have:

$$
\mathbf{p(y|X)} = {log(p(x_{1}|y))+log(p(x_{2}|y))+...+log(p(x_{n}|y))+log(p(y)})
$$


**Note**: $\hat{A}_{ij}$ and $\hat{\pi_{i}}$ will be used to model the `class conditional probability`.


<hr>

## Steps Required To Build The Model From Scratch

### Training:

1. Determine the vocabulary and tokenize the documents.
2. Initialize the parameters: $\hat{A}_{ij}$, $\hat{\pi_{i}}$, and $p(y)$ for each class label. 
   * i.e we need to initialize two variables per parameters since we have two class labels. i.e. $\hat{A}0_{ij}$,  $\hat{A}1_{ij}$, etc.
3. Compute the count of A and Pi. i.e A_hat and Pi_hat.
4. Calculate the log probabilities of A_hat and Pi_hat.


### Making Predictions

$$
\mathbf{p(y|X)} = argmax({log(p(x_{1}|y))+log(p(x_{2}|y))+...+log(p(x_{n}|y))+log(p(y)}))
$$

1. Calculate the posteriors. i.e (log_likelihood + log_priors) for each class.
2. Find `y` by calculating the `argmax` of the posteriors given the input over all classes. 

In [10]:
class MultiNomial_NB(BaseModel):
    """This classifier uses MArkov model for classifiication.\n
    It's trained using two models. i.e for 2 classes (labels).

    Params:
        vocab (dict): A dictionary containing the vocabulary.

    Returns:
        None
    """

    def __init__(self, *, vocab: dict) -> None:
        self.vocab = vocab
        self.transition_matrix = None
        self.initial_state_distr = None
        self.log_priors = [None, None]
        self.K = [None, None]
        self.priors = [None, None]

    def __repr__(self) -> str:
        priors_dict = {
            self.K[0]: round(self.priors[0], 2),
            self.K[1]: round(self.priors[1], 2),
        }
        return (
            f"{__class__.__name__}(log_priors={self.log_priors[0], self.log_priors[1]}, "
            f"priors={priors_dict})"
        )

    def fit(
        self, X: Union[np.ndarray, pd.DataFrame], y: Union[np.ndarray, pd.Series]
    ) -> None:
        # Since we have 2 class labels, we need to initialize and
        # create 2 models. Compute count and log probs for each model.
        # Retrieve the probs
        A_0, Pi_0 = self._init_A_and_Pi()
        A_1, Pi_1 = self._init_A_and_Pi()
        k_0, k_1 = np.unique(y)
        log_y0, log_y1 = self._log_priors(y=y)

        # Model 0
        input_0 = self._get_input(tokenized_doc=X, y=y, k_=k_0)
        A_hat_0, Pi_hat_0 = self._count_state_transitions(
            X=input_0, A_hat=A_0, Pi_hat=Pi_0
        )
        log_A_hat_0, log_Pi_hat_0 = self._convert_counts_to_log_prob(
            X=input_0, A_hat=A_0, Pi_hat=Pi_0
        )

        # Model 1
        input_1 = self._get_input(tokenized_doc=X, y=y, k_=k_1)
        A_hat_1, Pi_hat_1 = self._count_state_transitions(
            X=input_1, A_hat=A_1, Pi_hat=Pi_1
        )
        log_A_hat_1, log_Pi_hat_1 = self._convert_counts_to_log_prob(
            X=input_1, A_hat=A_1, Pi_hat=Pi_1
        )

        self.transition_matrix = (log_A_hat_0, log_A_hat_1)
        self.initial_state_distr = (log_Pi_hat_0, log_Pi_hat_1)
        self.log_priors = (log_y0, log_y1)
        self.K = k_0, k_1

        return self

    def _init_A_and_Pi(self) -> tuple:
        """This is used to initialize the state transition matrix
        and the initial state distribution."""
        V = len(self.vocab)
        # Add add-one smoothering
        # A is a matrix and Pi is a vector
        A = np.ones(shape=(V, V), dtype=float)
        Pi = np.ones(shape=(V), dtype=float)
        return (A, Pi)

    @staticmethod
    def _get_input(*, tokenized_doc: list[int], y: np.ndarray, k_: int) -> list[int]:
        """This returns an input given a specific class label.
        i.e the input given a specific class label.

        Params:
            tokenized_doc (list[int]): The tokenized documents (corpus).
            y (np.ndarray): The labels for the data.
            k_ (int): The class label.

        Returns:
            tokenized_data: The tokenized documents belonging to the
                specified class label.
        """
        tokenized_data = [txt for txt, label in zip(tokenized_doc, y) if label == k_]
        return tokenized_data

    def _log_priors(self, *, y: np.ndarray) -> list[float]:
        """This returns the log priors of y.

        Params:
            y (np.ndarray): The labels for the data.

        Returns:
            log_probs: A list containing the log probabilities
            of the class labels.
        """
        # Get the counts; calculate the log probabilities
        # using the probabilities obtained from the counts.
        counts = np.bincount(y)
        probs = counts / len(y)
        self.priors = [p_i for p_i in probs if p_i > 0]
        log_probs = [(np.log(p_i)) for p_i in self.priors]
        return log_probs

    def _count_state_transitions(
        self, *, X: list[int], A_hat: np.ndarray, Pi_hat: np.ndarray
    ) -> tuple[np.ndarray]:
        """This is used to count the occurrences of transitions.
        i.e calculate the counts of A_hat and Pi_hat.

        Returns:
            (A_hat, Pi_hat)
        """

        # To calculate the Pi_hat, we need to count the number of times
        # the initial state was `i` divided by the number of state sequences.
        # Pi_hat = (count(state = i) / N)
        # A_hat: count of the number of times we transitioned from the prev state `i`
        # to the current state `j` divided by the count of the prev state `i`.
        # i.e. A_hat = ( count(state_i to state_j) / (count(state_i)) )
        # Note: Update the prev_state after each transition.
        for tokenized_doc in X:
            prev_token = None
            for curr_token in tokenized_doc:
                if prev_token is None:
                    Pi_hat[curr_token] += 1
                else:
                    A_hat[prev_token, curr_token] += 1
                # Update the prev_token
                prev_token = curr_token
        return (A_hat, Pi_hat)

    def _convert_counts_to_log_prob(
        self,
        *,
        X: Union[np.ndarray, pd.Series],
        A_hat: np.ndarray,
        Pi_hat: np.ndarray,
    ) -> tuple[np.ndarray]:
        """This is used to calculate the log of the class conditional
        probability given a specific class label. It returns a tuple
        of arrays containing the log probabilities.

        Returns:
            (log(A_hat), log(Pi_hat))
        """
        # Calculate the probabilities
        A_hat /= A_hat.sum(axis=1, keepdims=True)  # OR A_hat/ A_hat.shape[0]
        Pi_hat /= Pi_hat.sum(axis=0)
        return (np.log(A_hat), np.log(Pi_hat))

    def _calculate_log_likelihoods(self, *, x: list[int], k_: int) -> tuple[np.ndarray]:
        """This is used to extract the log probability given the log
        probabililty and class label of the tokenized document.

        Returns:
            (log(A_hat), log(Pi_hat))
        """
        log_A_hat, log_Pi_hat = self.transition_matrix[k_], self.initial_state_distr[k_]

        # Calculate the probability:
        # if it's an initial state (prev_idx is None), retrieve the probability
        # otherwise, transition to a new state and retrieve the probability using
        # the prev_idx and the curr_idx.
        prev_idx, log_prob = None, 0

        for curr_idx in x:
            if prev_idx is None:
                log_prob += log_Pi_hat[curr_idx]
            else:
                log_prob += log_A_hat[prev_idx, curr_idx]

            # Update the value (for the next iteration)
            prev_idx = curr_idx
        return log_prob

    def predict(self, X: list[int]) -> None:
        # Instantiate
        predictions = np.zeros(shape=(len(X)))

        # For each sentence/tokenized_doc, make a prediction of the class label.
        # This is done by calculating the argmax of the posteriors over all classes.
        # posterior = likelihood + log_prior
        # i.e compute the prob that an input/sentence belongs to a specific class label.
        # The argmax of the posterior returns the index (class label) with the highest probability
        # i.e if an input has a prob of [0.05, 0.001], the argmax returns 0 (index 0) which means
        # that the input belongs to class 0 since 0.05 > 0.001 and it has an index of 0.
        for idx, sentence in enumerate(X):
            # Posteriors = posterior_k_0 and posterior_k_1
            posteriors = [
                (
                    self._calculate_log_likelihoods(x=sentence, k_=k_)
                    + self.log_priors[k_]
                )
                for k_ in self.K
            ]
            pred = np.argmax(posteriors)
            predictions[idx] = pred
        return predictions

In [11]:
preprocessor = PreprocessText()
X_vec = preprocessor.fit_transform(X=X_train)
vocab = preprocessor._vocab

preprocessor

PreprocessText(corpus_size: 1936, vocab_size: 2812)

In [12]:
# Test the preprocessor
new = pd.Series(data=["Adonai", "we worship you", "neidu"])
preprocessor.transform(new)

[[0], [181, 505, 21], [0]]

In [13]:
# Instantiate
m_nb = MultiNomial_NB(vocab=vocab)

# Train the model
m_nb.fit(X=X_vec, y=y_train)

MultiNomial_NB(log_priors=(-1.0944885714842476, -0.4075333611722234), priors={0: 0.33, 1: 0.67})

In [14]:
# Make predictions
y_pred = m_nb.predict(X=X_vec)

# Calculate the accuracy
# Note: Since the class labels are imbalanced, we'll need to use
# a better metric e.g f1 score
np.mean(y_pred == y_train)

0.9974173553719008

In [15]:
# Preprocess the validation data
X_tr_vec = preprocessor.transform(X=X_valid)

y_pred = m_nb.predict(X=X_tr_vec)

np.mean(y_pred == y_valid)

0.8611111111111112