In [None]:
import bz2

In [None]:
!pip install hmmlearn=="0.2.5" evaluate seqeval

In [None]:
import random
import pandas as pd
import numpy as np
from hmmlearn import hmm
import evaluate
seqeval = evaluate.load('seqeval')

In [None]:
token_docs = []
tag_docs = []
datasets = ["aij-wikiner-en-wp2.bz2", "aij-wikiner-en-wp3.bz2"]

for dataset in datasets:
    with bz2.open(f"../Data/{dataset}", "rb") as bz_file:
        docs = bz_file.readlines()
        for doc in docs:
            doc = doc.strip().decode()
            if len(doc) <= 1:
                continue

            tokens = []
            tags_l = []

            for seq in doc.split(" "):

                token, pos, tag = seq.split("|")

                tokens.append(token)
                tags_l.append(tag)

            token_docs.append(tokens)
            tag_docs.append(tags_l)

In [None]:
texts, tags_l = token_docs, tag_docs

In [None]:
print((f"Total amount of data = {len(texts)}"))
data_perc = round(len(texts) * 0.25)
print(f"Current sample of data = {data_perc}")

random.seed(100)
random_samples = random.sample(range(0, len(texts)), data_perc)
print(f"First sample index = {random_samples[0]}") ## 76372
texts, tags_l = [token_docs[i] for i in random_samples], [tag_docs[i] for i in random_samples]

In [None]:
# To calculate max len of sentences
m_len = 0
for i in texts:
    m_len = max(m_len, len(i))
print(f"Largest sentence by length = {m_len}")

In [None]:
whole_data = []
for i, sentence in enumerate(texts):
    for j, word in enumerate(sentence):
        entry = {"sentence": f"Sentence : {i}", "Word": word, "Tag": tags_l[i][j]}
        whole_data.append(entry)
whole_data = pd.DataFrame(whole_data)

In [None]:
tags = list(set(whole_data.Tag.values)) #Read Entity values
words = list(set(whole_data.Word.values))
len(tags), len(words)

In [None]:
from sklearn.model_selection import train_test_split

train_texts, test_texts, train_tags, test_tags = train_test_split(texts, tags_l, test_size=.1,random_state=100)

train_texts, val_texts, train_tags, val_tags = train_test_split(train_texts, train_tags, test_size=.1,random_state=100)

train_perc = round( 100 * (len(train_texts)/len(texts)))
val_perc = round( 100 * (len(val_texts)/len(texts)))
test_perc = round( 100 * (len(test_texts)/len(texts)))


print(f"{train_perc}% of data is TRAINING")
print(f"{val_perc}% of data is VALIDATION")
print(f"{test_perc}% of data is TESTING")

In [None]:
train_dataset = []
for i, sentence in enumerate(train_texts):
    for j, word in enumerate(sentence):
        entry = {"sentence": f"Sentence : {i}", "Word": word, "Tag": train_tags[i][j]}
        train_dataset.append(entry)
train_dataset = pd.DataFrame(train_dataset)

In [None]:
tags = list(set(train_dataset.Tag.values)) #Read Entity values
words = list(set(train_dataset.Word.values))
len(tags), len(words)

In [None]:
## Need to add some UNKNOWN words as there may be words in the test set that are not present in train set
## Using a certain percentage as random
dfupdate = train_dataset.sample(frac=.02, replace=False, random_state=42)
dfupdate.Word = 'UNKNOWN'
train_dataset.update(dfupdate)

words = list(set(train_dataset.Word.values))

# Convert words and tags into numbers
word2id = {w: i for i, w in enumerate(words)}
tag2id = {t: i for i, t in enumerate(tags)}
id2tag = {i: t for i, t in enumerate(tags)}
len(tags), len(words)

In [None]:
test_dataset = []
for i, sentence in enumerate(test_texts):
    for j, word in enumerate(sentence):
        entry = {"sentence": f"Sentence : {i}", "Word": word, "Tag": test_tags[i][j]}
        test_dataset.append(entry)
test_dataset = pd.DataFrame(test_dataset)

In [None]:
val_dataset = []
for i, sentence in enumerate(val_texts):
    for j, word in enumerate(sentence):
        entry = {"sentence": f"Sentence : {i}", "Word": word, "Tag": val_tags[i][j]}
        val_dataset.append(entry)
val_dataset = pd.DataFrame(val_dataset)

In [None]:
count_tags = dict(train_dataset.Tag.value_counts()) # Total number of Entity tags

In [None]:
count_tags_to_words = train_dataset.groupby(['Tag']).apply(lambda grp: grp.groupby('Word')['Tag'].count().to_dict()).to_dict() # Count of word given a tag
count_init_tags = dict(train_dataset.groupby('sentence').first().Tag.value_counts()) # Count of tags that come first in the sentence

In [None]:
count_tags_to_next_tags = np.zeros((len(tags), len(tags)), dtype=int) # Transition Matrix
sentences = list(train_dataset.sentence)
ner = list(train_dataset.Tag)
for i in range(len(sentences)) :
    if (i > 0) and (sentences[i] == sentences[i - 1]): # Check if word is from the same sentence
        prevtagid = tag2id[ner[i - 1]]
        nexttagid = tag2id[ner[i]]
        count_tags_to_next_tags[prevtagid][nexttagid] += 1

In [None]:
mystartprob = np.zeros((len(tags),)) # Probability of tag|(start of sent)
mytransmat = np.zeros((len(tags), len(tags))) # Probability of Tag(i)|Tag(i-1)
myemissionprob = np.zeros((len(tags), len(words))) # Probability of word|tag
num_sentences = sum(count_init_tags.values())
sum_tags_to_next_tags = np.sum(count_tags_to_next_tags, axis=1)
for tag, tagid in tag2id.items():
    floatCountTag = float(count_tags.get(tag, 0))
    mystartprob[tagid] = count_init_tags.get(tag, 0) / num_sentences
    for word, wordid in word2id.items():
        myemissionprob[tagid][wordid]= count_tags_to_words.get(tag, {}).get(word, 0) / floatCountTag
    for tag2, tagid2 in tag2id.items():
        mytransmat[tagid][tagid2]= count_tags_to_next_tags[tagid][tagid2] / sum_tags_to_next_tags[tagid]

In [None]:
model = hmm.MultinomialHMM(n_components=len(tags), algorithm='viterbi', random_state=42)

model.startprob_ = mystartprob
model.transmat_ = mytransmat
model.emissionprob_ = myemissionprob

In [None]:
## As some words may never appear in the training set, we need to transform them into UNKNOWN first.
## Then we split data_test into samples & lengths and send them to HMM.


def gen_data_for_predict(dataset):
    dataset.loc[~dataset["Word"].isin(words), "Word"] = "UNKNOWN"
    word_test = list(dataset.Word)
    samples = []
    for i, val in enumerate(word_test):
        samples.append([word2id[val]])

    # TODO use panda solution
    ## This is done to find the length of the sentence
    lengths = []
    count = 0
    sentences = list(dataset.sentence)
    for i in range(len(sentences)):
        if (i > 0) and (sentences[i] == sentences[i - 1]):
            count += 1
        elif i > 0:
            lengths.append(count)
            count = 1
        else:
            count = 1
    lengths.append(count)

    return samples, lengths


samples_test, lengths_test = gen_data_for_predict(test_dataset)
samples_val, lengths_val = gen_data_for_predict(val_dataset)

In [None]:
print(f"Length of sample in test= {len(samples_test)}")
print(f"Cumulative sum of lengths in test= {np.cumsum(lengths_test)[-1]}")
print(f"Total number of test sentences = {len(lengths_test)}")

In [None]:
ner_predict_test = model.predict(samples_test, lengths_test)
ner_predict_val = model.predict(samples_val, lengths_val)

In [None]:
print(f"Total number of word predictions in test = {len(ner_predict_test)}")

In [None]:
def get_tags_from_predict(pred, lengths):
    ner_predict_tags = []
    prev = 0
    for l in lengths:
        tag_val = [id2tag[tag_code] for tag_code in pred[prev : prev + l]]
        ner_predict_tags.append(tag_val)
        prev = l

    return ner_predict_tags


ner_predict_test_tags = get_tags_from_predict(ner_predict_test, lengths_test)
ner_predict_test_vals = get_tags_from_predict(ner_predict_val, lengths_val)

In [None]:
_random_index = 900
print(_random_index) #900
print(test_texts[_random_index])
print(test_tags[_random_index])
print(ner_predict_test_tags[_random_index])

In [None]:
def get_op_for_pred(test_t, test_tag, hmm_predict, i):
    sent = " ".join(test_t[i])
    print(sent)
    print("\tACTUAL")
    print("\t______")
    print()
    for j, tag in enumerate(test_tag[i]):
        if tag != "O":
            print(f"\t\t{test_t[i][j]} {tag}")
    print()
    print("\tPREDICTION")
    print("\t__________")
    print()
    for j, tag in enumerate(hmm_predict[i]):
        if tag != "O":
            print(f"\t\t{test_t[i][j]} {tag}")

In [None]:
get_op_for_pred(test_texts,test_tags,ner_predict_test_tags,_random_index)

In [None]:
result_test = seqeval.compute(predictions=ner_predict_test_tags, references=test_tags,mode="strict",scheme="IOB1")
result_val = seqeval.compute(predictions=ner_predict_test_vals, references=val_tags,mode="strict",scheme="IOB1")

In [None]:
def generate_metric_csv(result, csv_name):
    ## result is the o/p obtained from seqeval.compute
    test_metrics = []
    for key in result.keys():
        metric = {}
        if key in ["LOC", "MISC", "ORG", "PER"]:
            for _m in result[key].keys():
                if _m in ["precision", "recall", "f1"]:
                    metric["metric"] = f"{key}_{_m}"
                    metric["val"] = round(result[key][_m], 4)
        else:
            metric["metric"] = key
            metric["val"] = round(result[key], 4)

        test_metrics.append(metric)
    pd.DataFrame(test_metrics).to_csv(f"Results/{csv_name}.csv", index=False)

In [None]:
generate_metric_csv(result_test,"test_hmm")
generate_metric_csv(result_val,"validation_hmm")