In [87]:
import pandas as pd
import nltk
from nltk.tag import hmm
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import numpy as np
from collections import defaultdict

In [83]:
# Load data from CSV
data = pd.read_csv('ner.csv', encoding='cp1252')

In [71]:
# Data preparation
sentences = []
pos_tags = []
ner_tags = []

for i, row in data.iterrows():
    sentence = row['Sentence'].split()
    pos = eval(row['POS'])
    ner = eval(row['Tag'])
    sentences.append(sentence)
    pos_tags.append(pos)
    ner_tags.append(ner)

In [81]:
print((sentences[0]))

['"', 'And', 'I', 'think', 'they', "'ll", 'want', 'a', 'one-stop', 'shop', 'in', 'terms', 'of', 'combining', 'security', ',', 'immigration', ',', 'customs', ',', 'and', 'quarantine', 'together', 'Â…', 'just', 'to', 'make', 'sure', 'it', "'s", 'more', 'streamlined', 'and', 'provides', 'more', 'certainty', '.', '"']


In [73]:
training_data_pos = []
training_data_ner = []
len
for sent, pos, ner in zip(sentences, pos_tags, ner_tags):
    if len(sent) != len(pos) or len(sent) != len(ner):
        print(sent)
        raise ValueError(f"Mismatch in lengths: words ({len(sent)}), POS tags ({len(pos)}), NER tags ({len(ner)})\n {sent}")
    
    sent_pos = list(zip(sent, pos))
    sent_ner = list(zip(sent, ner))
    training_data_pos.append(sent_pos)
    training_data_ner.append(sent_ner)

In [88]:
# Split data into train and test sets
train_pos, test_pos = train_test_split(training_data_pos, test_size=0.2, random_state=42)
train_ner, test_ner = train_test_split(training_data_ner, test_size=0.2, random_state=42)

# Build from Scratch

In [None]:
class HMM:
    def __init__(self):
        self.transition_probs = defaultdict(lambda: defaultdict(lambda: 0))
        self.emission_probs = defaultdict(lambda: defaultdict(lambda: 0))
        self.initial_probs = defaultdict(lambda: 0)
        self.states = set()
        self.vocabulary = set()
    
    def train(self, tagged_sentences):
        # Count occurrences for transition, emission, and initial probabilities
        for sentence in tagged_sentences:
            prev_tag = None
            for idx, (word, tag) in enumerate(sentence):
                self.vocabulary.add(word)
                self.states.add(tag)
                
                if idx == 0:
                    self.initial_probs[tag] += 1  # Initial tag
                else:
                    self.transition_probs[prev_tag][tag] += 1  # Transition from prev tag to current tag
                
                self.emission_probs[tag][word] += 1  # Emission probability of word given tag
                
                prev_tag = tag
        
        # Normalize counts to probabilities
        self._normalize_probs()
    
    def _normalize_probs(self):
        # Normalize initial probabilities
        total_initials = sum(self.initial_probs.values())
        for tag in self.initial_probs:
            self.initial_probs[tag] /= total_initials
        
        # Normalize transition probabilities
        for prev_tag in self.transition_probs:
            total_transitions = sum(self.transition_probs[prev_tag].values())
            for tag in self.transition_probs[prev_tag]:
                self.transition_probs[prev_tag][tag] /= total_transitions
        
        # Normalize emission probabilities
        for tag in self.emission_probs:
            total_emissions = sum(self.emission_probs[tag].values())
            for word in self.emission_probs[tag]:
                self.emission_probs[tag][word] /= total_emissions
    
    def viterbi(self, sentence):
        V = [{}] 
        path = {} 
        
        # Initialization step
        for tag in self.states:
            V[0][tag] = self.initial_probs[tag] * self.emission_probs[tag].get(sentence[0], 1e-6)
            path[tag] = [tag]
        
        # Recursion step
        for t in range(1, len(sentence)):
            V.append({})
            new_path = {}
            
            for curr_tag in self.states:
                (prob, best_prev_tag) = max(
                    (V[t-1][prev_tag] * self.transition_probs[prev_tag].get(curr_tag, 1e-6) * self.emission_probs[curr_tag].get(sentence[t], 1e-6), prev_tag)
                    for prev_tag in self.states
                )
                
                V[t][curr_tag] = prob
                new_path[curr_tag] = path[best_prev_tag] + [curr_tag]
            
            path = new_path
        
        # Termination step: Find the best final tag
        n = len(sentence) - 1
        (prob, best_tag) = max((V[n][tag], tag) for tag in self.states)
        
        return path[best_tag], prob

In [90]:
def evaluate_model(hmm, test_data):
    y_true = []
    y_pred = []

    # Iterate over the test sentences
    for sentence in test_data:
        words = [word for word, tag in sentence]
        true_tags = [tag for word, tag in sentence]
        
        # Use the HMM to predict tags for the words in the sentence
        predicted_tags, _ = hmm.viterbi(words)
        
        # Append to the overall lists of true and predicted tags
        y_true.extend(true_tags)
        y_pred.extend(predicted_tags)

    # Calculate metrics
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, average='weighted')
    recall = recall_score(y_true, y_pred, average='weighted')
    f1 = f1_score(y_true, y_pred, average='weighted')

    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    
    return accuracy, precision, recall, f1

In [85]:
hmm_pos = HMM()
hmm_pos.train(train_pos)

# Predict using Viterbi algorithm
predicted_tags, probability = hmm_pos.viterbi(test_pos[0])

# Show the result
print("Test Sentence:", test_pos[0])
print("Predicted Tags:", predicted_tags)
print("Probability of the sequence:", probability)


Test Sentence: [('In', 'IN'), ('Tehran', 'NNP'), (',', ','), ('the', 'DT'), ('chief', 'NN'), ('of', 'IN'), ('Iran', 'NNP'), ("'s", 'POS'), ('Revolutionary', 'NNP'), ('Guards', 'NNPS'), (',', ','), ('General', 'NNP'), ('Yahya', 'NNP'), ('Rahim', 'NNP'), ('Safavi', 'NNP'), (',', ','), ('said', 'VBD'), ('Saturday', 'NNP'), ('his', 'PRP$'), ('country', 'NN'), ('would', 'MD'), ('use', 'VB'), ('ballistic', 'JJ'), ('missiles', 'NNS'), ('to', 'TO'), ('defend', 'VB'), ('itself', 'PRP'), ('if', 'IN'), ('attacked', 'VBN'), ('.', '.')]
Predicted Tags: ['DT', 'NN', 'IN', 'DT', 'NN', 'IN', 'DT', 'NN', 'IN', 'DT', 'NN', 'IN', 'DT', 'NN', 'IN', 'DT', 'NN', 'IN', 'DT', 'NN', 'IN', 'DT', 'NN', 'IN', 'DT', 'NN', 'IN', 'DT', 'NN', 'IN']
Probability of the sequence: 2.1062239456930328e-194


In [91]:
evaluate_model(hmm_pos, test_pos)

Accuracy: 0.9671
Precision: 0.9673
Recall: 0.9671
F1 Score: 0.9671


(0.9670758848445689,
 0.9672677171193271,
 0.9670758848445689,
 0.9670616423048712)

In [86]:
hmm_ner = HMM()
hmm_ner.train(train_ner)

predicted_tags, probability = hmm_ner.viterbi(test_ner[0])

print("Test Sentence:", test_ner[0])
print("Predicted Tags:", predicted_tags)
print("Probability of the sequence:", probability)

Test Sentence: [('In', 'O'), ('Tehran', 'B-tim'), (',', 'O'), ('the', 'O'), ('chief', 'O'), ('of', 'O'), ('Iran', 'B-geo'), ("'s", 'O'), ('Revolutionary', 'B-geo'), ('Guards', 'I-geo'), (',', 'O'), ('General', 'B-org'), ('Yahya', 'I-org'), ('Rahim', 'I-org'), ('Safavi', 'I-org'), (',', 'O'), ('said', 'O'), ('Saturday', 'B-tim'), ('his', 'O'), ('country', 'O'), ('would', 'O'), ('use', 'O'), ('ballistic', 'O'), ('missiles', 'O'), ('to', 'O'), ('defend', 'O'), ('itself', 'O'), ('if', 'O'), ('attacked', 'O'), ('.', 'O')]
Predicted Tags: ['O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O']
Probability of the sequence: 2.42520294864314e-182


In [92]:
evaluate_model(hmm_ner, test_ner)

Accuracy: 0.9587
Precision: 0.9577
Recall: 0.9587
F1 Score: 0.9577


(0.9586569985880776,
 0.9577115666637515,
 0.9586569985880776,
 0.9577431568000045)

# Using hmm of nltk

In [75]:
# Train HMM for POS tagging
trainer_pos = hmm.HiddenMarkovModelTrainer()
hmm_pos = trainer_pos.train(train_pos)

# Train HMM for NER tagging
trainer_ner = hmm.HiddenMarkovModelTrainer()
hmm_ner = trainer_ner.train(train_ner)

In [76]:
def evaluate_model(test_data, hmm_model):
    correct = 0
    total = 0
    for sentence in test_data:
        words = [word for word, tag in sentence]
        true_tags = [tag for word, tag in sentence]
        predicted_tags = hmm_model.tag(words)
        predicted_tags = [tag for word, tag in predicted_tags]

        for true_tag, pred_tag in zip(true_tags, predicted_tags):
            if true_tag == pred_tag:
                correct += 1
            total += 1

    accuracy = correct / total
    return accuracy

In [77]:
# Evaluate POS tagging
pos_accuracy = evaluate_model(test_pos, hmm_pos)
print(f"POS Tagging Accuracy: {pos_accuracy * 100:.2f}%")

# Evaluate NER tagging
ner_accuracy = evaluate_model(test_ner, hmm_ner)
print(f"NER Tagging Accuracy: {ner_accuracy * 100:.2f}%")

POS Tagging Accuracy: 83.23%
NER Tagging Accuracy: 94.47%


In [18]:
words = [word for word, tag in test_pos[0]]
true_tags = [tag for word, tag in test_pos[0]]
predicted_tags = hmm_pos.tag(words)
predicted_tags = [tag for word, tag in predicted_tags]

In [21]:
print(predicted_tags)
print(true_tags)

['JJ', 'NNS', 'VBN', 'DT', 'JJ', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP']
['JJ', 'NNS', 'VBN', 'DT', 'JJ', 'NN', 'IN', 'NNP', 'DT', 'NN', 'IN', 'DT', 'NNP', 'CD', 'NNS', ',', 'WDT', 'VBD', 'DT', 'NNS', 'CC', 'VBD', 'NN', 'TO', 'NNP', 'CD', 'NNS', 'IN', 'NN', 'NNS', 'VBD', 'CD', 'NNS', 'IN', 'DT', 'JJ', 'NN', '.']


# Using hmmlearn

In [1]:
import pandas as pd
import numpy as np
from hmmlearn import hmm
from sklearn.model_selection import train_test_split
from ast import literal_eval

data = pd.read_csv('test.csv')

data['POS'] = data['POS'].apply(literal_eval)
data['Tag'] = data['Tag'].apply(literal_eval)

def encode_sequences(sequences, token_to_idx):
    return [[token_to_idx[token] for token in seq] for seq in sequences]

unique_pos_tags = list(set(tag for tags in data['POS'] for tag in tags))
unique_ner_tags = list(set(tag for tags in data['Tag'] for tag in tags))

In [None]:
pos_to_idx = {tag: i for i, tag in enumerate(unique_pos_tags)}
ner_to_idx = {tag: i for i, tag in enumerate(unique_ner_tags)}

In [None]:
idx_to_pos = {i: tag for tag, i in pos_to_idx.items()}
idx_to_ner = {i: tag for tag, i in ner_to_idx.items()}

In [None]:
encoded_pos = encode_sequences(data['POS'], pos_to_idx)
encoded_ner = encode_sequences(data['Tag'], ner_to_idx)

In [None]:
train_pos, test_pos = train_test_split(encoded_pos, test_size=0.2, random_state=42)
train_ner, test_ner = train_test_split(encoded_ner, test_size=0.2, random_state=42)

In [4]:
n_components_pos = len(pos_to_idx)

pos_hmm = hmm.MultinomialHMM(n_components=n_components_pos, n_iter=100, tol=0.01)

train_lengths_pos = [len(seq) for seq in train_pos]
train_concat_pos = np.concatenate(train_pos)

pos_hmm.fit(train_concat_pos.reshape(-1, 1), lengths=train_lengths_pos)

def decode_hmm(model, test_seq):
    logprob, hidden_states = model.decode(np.array(test_seq).reshape(-1, 1), algorithm="viterbi")
    return hidden_states

test_seq_pos = test_pos[0]
predicted_states_pos = decode_hmm(pos_hmm, test_seq_pos)
decoded_pos_tags = [idx_to_pos[state] for state in predicted_states_pos]
print("Predicted POS tags:", decoded_pos_tags)


MultinomialHMM has undergone major changes. The previous version was implementing a CategoricalHMM (a special case of MultinomialHMM). This new implementation follows the standard definition for a Multinomial distribution (e.g. as in https://en.wikipedia.org/wiki/Multinomial_distribution). See these issues for details:
https://github.com/hmmlearn/hmmlearn/issues/335
https://github.com/hmmlearn/hmmlearn/issues/340


Predicted POS tags: ['PDT', 'VBN', 'VBP', 'WDT', 'NNPS', 'JJR', 'DT', 'PRP$', 'NNP', 'JJR', 'DT', 'PRP$', 'NNP', 'JJR', 'DT', 'PRP$', 'NNP', 'JJR', 'DT', 'PRP$', 'NNP', 'JJR', 'DT', 'PRP$', 'NNP', 'JJR', 'DT', 'PRP$', 'NNP', 'JJR', 'DT', 'PRP$', 'NNP', 'JJR', 'DT', 'PRP$', 'NNP', 'JJR']


In [3]:
n_components_ner = len(ner_to_idx)

ner_hmm = hmm.MultinomialHMM(n_components=n_components_ner, n_iter=100, tol=0.01)

train_lengths_ner = [len(seq) for seq in train_ner]
train_concat_ner = np.concatenate(train_ner)

ner_hmm.fit(train_concat_ner.reshape(-1, 1), lengths=train_lengths_ner)

test_seq_ner = test_ner[0]
predicted_states_ner = decode_hmm(ner_hmm, test_seq_ner)
decoded_ner_tags = [idx_to_ner[state] for state in predicted_states_ner]
print("Predicted NER tags:", decoded_ner_tags)


MultinomialHMM has undergone major changes. The previous version was implementing a CategoricalHMM (a special case of MultinomialHMM). This new implementation follows the standard definition for a Multinomial distribution (e.g. as in https://en.wikipedia.org/wiki/Multinomial_distribution). See these issues for details:
https://github.com/hmmlearn/hmmlearn/issues/335
https://github.com/hmmlearn/hmmlearn/issues/340


Predicted NER tags: ['I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim', 'I-org', 'B-tim']
