# ðŸ§¬ Named Entity Recognition using HMM & Viterbi

This notebook implements a **Hidden Markov Model (HMM)** from scratch for the task of **Named Entity Recognition (NER)** on the CoNLL-2003 dataset. 

While modern NLP often relies on Transformers, this project explores the mathematical efficiency of probabilistic graphical models. By calculating **Transition** and **Emission** probabilities, the model learns the underlying structure of entity sequences (e.g., how a `B-PER` tag is statistically likely to be followed by an `I-PER` tag).

### ðŸš€ Key Features:
* **Supervised Training:** Computing probability matrices from the CoNLL-2003 training split.
* **Viterbi Algorithm:** Implementation of the dynamic programming approach to decode the most likely sequence of hidden states.
* **Evaluation:** Detailed classification report covering Precision, Recall, and F1-Score across 9 entity classes.
* **Statistical Insights:** Analysis of how probabilistic models handle unseen words and state transitions.

### ðŸ”— Source Code & Full Project
The complete implementation, including the Word2Vec embedding training and neural network comparisons, can be found on GitHub:

ðŸ‘‰ [**View Project on GitHub**](https://github.com/Mohamed-Mohamed-Ibrahim/Named-Entity-Recognition)

In [1]:
from datasets import load_dataset, load_from_disk
import numpy as np
from sklearn.preprocessing import LabelEncoder


# Model

In [2]:

class HMMCustom:
    def __init__(self, n_components, n_observations, startprob=None, transmat=None, emissionprob=None, strategy="viterbi"):

        self.n_components_ = n_components
        self.n_observations_ = n_observations
        self.strategy = strategy

        if startprob is None:
            self.startprob_ = np.zeros(n_components)
        else:
            self.startprob_ = startprob
        if transmat is None:
            self.transmat_ = np.zeros((n_components, n_components))
        else:
            self.transmat_ = transmat
        if emissionprob is None:
            self.emissionprob_ = np.zeros((n_components, n_observations))
        else:
            self.emissionprob_ = emissionprob

    def fit(self, X, y):

        # Get start & transition & emission probs
        for idx, sentence in enumerate(X):
            for i, word in enumerate(sentence):

                # Get start prob
                if i == 0:
                    self.startprob_[y[idx][i]] += 1
                # Get transition prob
                else:
                    self.transmat_[y[idx][i], y[idx][i-1]] += 1

                # Get emission prob
                self.emissionprob_[y[idx][i], word] += 1

        # Get start & transition & emission probs
        for i in range(self.n_components_):
            self.startprob_[i] += 1
            for j in range(self.n_components_):
                self.transmat_[i, j] += 1
            for j in range(self.n_observations_):
                self.emissionprob_[i, j] += 1

        with np.errstate(divide='ignore', invalid='ignore'):
            self.startprob_ /= np.sum(self.startprob_)
            self.emissionprob_ /= ( np.sum(self.emissionprob_, axis=1).reshape(-1, 1) )
            # self.emissionprob_ /= ( np.sum(self.transmat_, axis=1).reshape(-1, 1) + self.startprob_.reshape(-1, 1) )
            self.transmat_ /= np.sum(self.transmat_, axis=1).reshape(-1, 1)
            # self.transmat_ /= np.sum(self.transmat_, axis=1)[:, np.newaxis]
        self.transmat_ = np.nan_to_num(self.transmat_)
        self.emissionprob_ = np.nan_to_num(self.emissionprob_)

        # print(self.startprob_)
        # print()
        # for x in self.transmat_:
        #     print(x)
        # print()
        # for x in self.emissionprob_:
        #     print(x)
        # print()

    def _greedy(self, X):
        log_likelihood, hidden_states = 0, []
        prev_state = None

        for i, word in enumerate(X):
            score = -1
            if i == 0:
                for state in range(self.n_components_):
                    prob = self.startprob_[state] * self.emissionprob_[state][word]
                    if prob > score:
                        score = prob
                        prev_state = state
            else:
                for state in range(self.n_components_):
                    prob = self.transmat_[state][prev_state] * self.emissionprob_[state][word]
                    if prob > score:
                        score = prob
                        prev_state = state

            hidden_states.append(prev_state)

            log_likelihood += score
            # print(score, prev_state)

        return log_likelihood, hidden_states

    def _viterbi(self, X):
        log_likelihood, hidden_states = 0, []

        n_steps = len(X)

        if n_steps == 0:
            return log_likelihood, hidden_states
        
        m = np.zeros((self.n_components_, n_steps))
        parent = np.ones((self.n_components_, n_steps), dtype=int)

        for i, word in enumerate(X):
            if i == 0:
                for state in range(self.n_components_):
                    # print(self.startprob_[state], self.emissionprob_[state][word])
                    prob = self.startprob_[state] * self.emissionprob_[state][word]
                    if prob > m[state, i]:
                        parent[state, i] = state
                        m[state, i] = prob
            else:
                for s1 in range(self.n_components_):        # prev state
                    for s2 in range(self.n_components_):    # cur  state
                        # print(self.transmat_[s2, s1], self.emissionprob_[s2, word], m[s1, i-1])
                        prob = self.transmat_[s2, s1] * self.emissionprob_[s2, word] * m[s1, i-1]

                        if prob > m[s2, i]:
                            parent[s2, i] = s1
                            m[s2, i] = prob

        # print()
        # for x in m:
        #     print(x)
        # print()
        # for x in parent:
        #     print(x)

        mostLikelyStateIdx = np.argmax(m[:, -1])
        hidden_states.append(mostLikelyStateIdx)
        log_likelihood += m[mostLikelyStateIdx, -1]
        i = n_steps - 2

        if n_steps > 2:
            while i > 0:
                # put parent of state i
                hidden_states.append(parent[mostLikelyStateIdx, i+1])
                # add likelihood of state i
                log_likelihood += m[parent[mostLikelyStateIdx, i+1], i]
                mostLikelyStateIdx = hidden_states[-1]
                i -= 1
        if n_steps > 1:
            hidden_states.append(parent[mostLikelyStateIdx, 1])
            log_likelihood += m[hidden_states[-1], 0]



        return log_likelihood, reversed(hidden_states)

    def decode(self, X):

        if self.strategy == "viterbi":
            return self._viterbi(X)
        elif self.strategy == "greedy":
            return self._greedy(X)

## Final Version

In [3]:
import numpy as np

class HMMCustom:
    def __init__(self, n_components, n_observations, strategy="viterbi"):
        self.n_components_ = n_components
        self.n_observations_ = n_observations
        self.strategy = strategy

        self.startprob_ = np.zeros(n_components)
        self.transmat_ = np.zeros((n_components, n_components))
        self.emissionprob_ = np.zeros((n_components, n_observations))

    def fit(self, X, y):
        # X: List of lists (sentences of observations)
        # y: List of lists (corresponding hidden states)
        
        # 1. Count occurrences
        for idx, sentence in enumerate(X):
            for i, word in enumerate(sentence):
                state = y[idx][i]
                
                # Emission: State -> Word
                self.emissionprob_[state, word] += 1

                if i == 0:
                    # Start Probability
                    self.startprob_[state] += 1
                else:
                    # Transition: Previous State -> Current State
                    prev_state = y[idx][i-1]
                    self.transmat_[prev_state, state] += 1 

        # 2. Add Laplace Smoothing (add 1) to avoid zero division
        self.startprob_ += 1
        self.transmat_ += 1
        self.emissionprob_ += 1

        # 3. Normalize (probabilities sum to 1)
        self.startprob_ /= np.sum(self.startprob_)
        
        # Normalize rows (axis 1) so sum of outgoing probs = 1
        self.transmat_ /= np.sum(self.transmat_, axis=1, keepdims=True)
        self.emissionprob_ /= np.sum(self.emissionprob_, axis=1, keepdims=True)

        # 4. CONVERT TO LOG SPACE 
        # We store logs so we can add them instead of multiplying
        with np.errstate(divide='ignore'):
            self.startprob_ = np.log(self.startprob_)
            self.transmat_ = np.log(self.transmat_)
            self.emissionprob_ = np.log(self.emissionprob_)

    def _viterbi(self, X):
        n_steps = len(X)
        if n_steps == 0:
            return -np.inf, []

        # m stores the max log-probability reaching state s at time t
        m = np.zeros((self.n_components_, n_steps))
        # parent stores the best previous state
        parent = np.zeros((self.n_components_, n_steps), dtype=int)

        # --- Initialization (Step 0) ---
        for s in range(self.n_components_):
            # log(start) + log(emission)
            m[s, 0] = self.startprob_[s] + self.emissionprob_[s, X[0]]

        # --- Recursion (Forward Step) ---
        for t in range(1, n_steps):
            for s in range(self.n_components_): # Current state
                
                # Calculate transition from all previous states (s_prev) to current state (s)
                # vector operation: m[:, t-1] is all prev path probs
                # transmat_[:, s] is prob of moving from any prev -> s
                probs = m[:, t-1] + self.transmat_[:, s] + self.emissionprob_[s, X[t]]
                
                # Find max probability and the state that produced it
                parent[s, t] = np.argmax(probs)
                m[s, t] = np.max(probs)

        # --- Termination ---
        best_path_log_prob = np.max(m[:, -1])
        last_state = np.argmax(m[:, -1])

        # --- Backtracking (Backward Step) ---
        best_path = [last_state]
        
        # Loop backwards from last step down to 1
        for t in range(n_steps - 1, 0, -1):
            prev_state = parent[best_path[-1], t]
            best_path.append(prev_state)

        # Reverse to get correct order
        return best_path_log_prob, list(reversed(best_path))

    def decode(self, X):
        if self.strategy == "viterbi":
            return self._viterbi(X)
        return None

# Data preparation

In [4]:
dataset = load_dataset("lhoestq/conll2003")
dataset.save_to_disk("conll2003")

dataset_infos.json: 0.00B [00:00, ?B/s]

data/train-00000-of-00001.parquet:   0%|          | 0.00/1.07M [00:00<?, ?B/s]

data/validation-00000-of-00001.parquet:   0%|          | 0.00/281k [00:00<?, ?B/s]

data/test-00000-of-00001.parquet:   0%|          | 0.00/259k [00:00<?, ?B/s]

Generating train split:   0%|          | 0/14041 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/3250 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/3453 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/14041 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/3250 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/3453 [00:00<?, ? examples/s]

In [5]:
dataset

DatasetDict({
    train: Dataset({
        features: ['id', 'tokens', 'pos_tags', 'chunk_tags', 'ner_tags'],
        num_rows: 14041
    })
    validation: Dataset({
        features: ['id', 'tokens', 'pos_tags', 'chunk_tags', 'ner_tags'],
        num_rows: 3250
    })
    test: Dataset({
        features: ['id', 'tokens', 'pos_tags', 'chunk_tags', 'ner_tags'],
        num_rows: 3453
    })
})

# Training

In [6]:
random_state = 42

# nerTags = dataset["train"][:100]['ner_tags']
nerTags = dataset["train"][:]['ner_tags']
# tokens = dataset["train"][:100]['tokens']
tokens = dataset["train"][:]['tokens']
states = ["Other", "B-PER", "I-PER", "B-ORG", "I-ORG", "B-LOC", "I-LOC", "B-MISC", "I-MISC", "UNK"]
observations = set()
maxLen = 0
observations.add(" ")
for token in tokens:
    maxLen = max(maxLen, len(token))
    for word in token:
        observations.add(word.lower())
observations = list(sorted(observations))
# print("Observations:", observations)

le = LabelEncoder()
encoded_data = le.fit_transform(observations)
# print("Encoded data:", encoded_data)

X = []
for token in tokens:
    encoded_tokens = le.transform([word.lower() for word in token])
    X.append(encoded_tokens.tolist())

# print(X)
# print(tokens)
# print(nerTags)

n_components = len(states)
n_observations = len(observations)
# print(n_components)
# print(n_observations)

model = HMMCustom(n_components=n_components, n_observations=n_observations)

model.fit(X, nerTags)

In [7]:
def insert_zeros_at_indices_numpy(original_list, excluded_indices):
    arr = np.array(original_list)
    excluded_indices = np.array(excluded_indices)
    # print(arr)
    # print(excluded_indices)
    new_size = len(excluded_indices) + len(arr)
    mask = np.ones(new_size, dtype=bool)
    mask[excluded_indices] = False
    # why [0] -> (array([3, 4]),)
    desired_indices = np.where(mask)[0]
    
    output = np.zeros(new_size, dtype=np.int8) + (n_components - 1)
    
    output[desired_indices] = arr
    # print(output)
    # print()
    return output.tolist()

# Validation

In [8]:
n_samples = 10


# y_val = dataset["validation"][:n_samples]['ner_tags']
y_val = dataset["validation"][:]['ner_tags']
# X_val_tmp = dataset["validation"][:n_samples]['tokens']
X_val_tmp = dataset["validation"][:]['tokens']

# print("y val:", y_val)

X_val = []
dropped_words = {}
for i, token in enumerate(X_val_tmp):
    lowercaseToken = []
    for j, word in enumerate(token):
        word = word.lower()
        if word in le.classes_:
            lowercaseToken.append(word)
        else:
            if i not in dropped_words:
                dropped_words[i] = [(j, word)]
            else:
                dropped_words[i].append((j, word))
    encoded_tokens = le.transform(lowercaseToken)
    X_val.append(encoded_tokens.tolist())
    
# print("X val:", X_val)
# print("Dropped words:")
# for key, val in dropped_words.items():
#     for i, x in enumerate(val):
#         print(f"token {key+1}, word {i+1}: {x}")
# print()

y_test = []
for token in y_val:
    y_test.extend(token)
y_pred = []

for i, sample in enumerate(X_val):
    log_likelihood, hidden_states = model.decode(sample)
    hidden_states = list(hidden_states)
    # print(hidden_states)
    indices = []
    if i in dropped_words:
        for val in dropped_words[i]:
            indices.append(val[0])
        hidden_states = insert_zeros_at_indices_numpy(hidden_states, indices)
    # print(hidden_states)
    y_pred.extend(hidden_states)
    # print("Sample", i+1)
    # print("Log-likelihood of observations:", log_likelihood)
    # print("Most likely sequence:", [states[s] for s in hidden_states])
    # print("Observation sequence:", [states[s] for s in y_val[i]])
    
    # print()

In [9]:
from sklearn.metrics import precision_score, recall_score, f1_score
# y_test = y_test[:772]
precision = precision_score(y_test, y_pred, average='macro')
recall = recall_score(y_test, y_pred, average='macro')
f1 = f1_score(y_test, y_pred, average='macro')

print(f"Precision: {precision:.8f}")
print(f"Recall: {recall:.8f}")
print(f"F1-score: {f1:.8f}")

Precision: 0.75570854
Recall: 0.49246153
F1-score: 0.58535537


  _warn_prf(average, modifier, msg_start, len(result))


In [10]:
from sklearn.metrics import classification_report

print(classification_report(y_test, y_pred, digits=4))

              precision    recall  f1-score   support

           0     0.9581    0.9415    0.9497     42759
           1     0.9377    0.4902    0.6439      1842
           2     0.8717    0.5096    0.6432      1307
           3     0.7500    0.5235    0.6166      1341
           4     0.7343    0.3276    0.4530       751
           5     0.8881    0.7692    0.8244      1837
           6     0.7844    0.5097    0.6179       257
           7     0.9350    0.5933    0.7259       922
           8     0.6977    0.2601    0.3789       346
           9     0.0000    0.0000    0.0000         0

    accuracy                         0.8752     51362
   macro avg     0.7557    0.4925    0.5854     51362
weighted avg     0.9409    0.8752    0.9010     51362



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


# Testing

In [11]:
n_samples = 5


y_val = dataset["test"][:]['ner_tags']
X_val_tmp = dataset["test"][:]['tokens']

# print("y val:", y_val)

X_val = []
dropped_words = {}
for i, token in enumerate(X_val_tmp):
    lowercaseToken = []
    for j, word in enumerate(token):
        word = word.lower()
        if word in le.classes_:
            lowercaseToken.append(word)
        else:
            if i not in dropped_words:
                dropped_words[i] = [(j, word)]
            else:
                dropped_words[i].append((j, word))
    encoded_tokens = le.transform(lowercaseToken)
    X_val.append(encoded_tokens.tolist())
    
# print("X val:", X_val)
# print("Dropped words:")
# for key, val in dropped_words.items():
#     for i, x in enumerate(val):
#         print(f"token {key+1}, word {i+1}: {x}")
# print()

y_test = []
for token in y_val:
    y_test.extend(token)
y_pred = []

for i, sample in enumerate(X_val):
    log_likelihood, hidden_states = model.decode(sample)
    hidden_states = list(hidden_states)
    # print(hidden_states)
    indices = []
    if i in dropped_words:
        for val in dropped_words[i]:
            indices.append(val[0])
        hidden_states = insert_zeros_at_indices_numpy(hidden_states, indices)
    # print(hidden_states)
    y_pred.extend(hidden_states)
    # print("Sample", i+1)
    # print("Log-likelihood of observations:", log_likelihood)
    # print("Most likely sequence:", [states[s] for s in hidden_states])
    # print("Observation sequence:", [states[s] for s in y_val[i]])
    
    # print()

In [12]:
from sklearn.metrics import precision_score, recall_score, f1_score

precision = precision_score(y_test, y_pred, average='macro')
recall = recall_score(y_test, y_pred, average='macro')
f1 = f1_score(y_test, y_pred, average='macro')

print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1-score: {f1:.4f}")

Precision: 0.7236
Recall: 0.4488
F1-score: 0.5376


  _warn_prf(average, modifier, msg_start, len(result))


In [13]:
from sklearn.metrics import classification_report

print(classification_report(y_test, y_pred, digits=4))

              precision    recall  f1-score   support

           0     0.9599    0.9226    0.9409     38323
           1     0.8802    0.2863    0.4321      1617
           2     0.8011    0.2474    0.3781      1156
           3     0.7750    0.4521    0.5711      1661
           4     0.6954    0.3772    0.4891       835
           5     0.8350    0.7584    0.7948      1668
           6     0.7888    0.4942    0.6077       257
           7     0.8771    0.5285    0.6596       702
           8     0.6233    0.4213    0.5028       216
           9     0.0000    0.0000    0.0000         0

    accuracy                         0.8405     46435
   macro avg     0.7236    0.4488    0.5376     46435
weighted avg     0.9335    0.8405    0.8744     46435



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
