In [463]:
import pandas as pd
import numpy as np
import json

In [464]:
import numpy as np
import pandas as pd

def load_text_data(file_path, get_vocabulary=False, separate_sentences=False, replace_unknown=False, vocab=None, tags_present=True):
    text_file = open(file_path, 'r')
    lines = text_file.readlines()
    lines.append("\n")
    data = []
    final_data = []
    temp = [['0', '<START>', '<START_TAG>']]
    if replace_unknown:
        known_words = set(vocab.unique_tokens.tolist())
    if get_vocabulary:
        vocabulary = []
    for line in lines:
        if line == '\n':
            data.append(temp)
            temp = [['0', '<START>', '<START_TAG>']]
        else:
            line = line.split('\t')
            if get_vocabulary:
                vocabulary.append(line[1])
            line[-1] = line[-1].strip('\n')
            if replace_unknown and line[1] not in known_words:
                line[1] = "<UNK>"
            temp.append(line)
    if get_vocabulary:
        vocabulary = pd.DataFrame(vocabulary, columns=["token"])
    for data_sample in data:
        final_data.append(pd.DataFrame(data_sample, columns=['index', 'token', 'tag']))
    if not tags_present:
        final_data[-1] = final_data[-1].drop(columns='tag')
    combined_data = pd.concat(final_data)
    if get_vocabulary and not separate_sentences:
        return combined_data, vocabulary
    elif get_vocabulary and separate_sentences:
        return final_data, vocabulary
    elif not get_vocabulary and not separate_sentences:
        return combined_data
    else:
        return final_data
    

In [465]:
##VOCAB Creation

In [466]:
train_data, train_vocab = load_text_data('data/train', get_vocabulary=True)

In [467]:
def build_vocabulary(text_data, vocab_data, threshold=2):
    vocab_no_dups = vocab_data['token'].value_counts()
    vocab_no_dups = pd.DataFrame(list(zip(vocab_no_dups.index.tolist(), vocab_no_dups.tolist())), columns=['unique_tokens', 'count'])
    total_before = len(vocab_no_dups)
    to_remove = vocab_no_dups[vocab_no_dups['count'] < threshold]
    total_removed = len(to_remove)
    vocabulary = vocab_no_dups.drop(vocab_no_dups[vocab_no_dups['count'] < threshold].index)
    vocabulary.loc[-1] = ['<UNK>', total_removed]
    vocabulary.index = vocabulary.index + 1
    vocabulary.sort_index(inplace=True)
    vocab_size = len(vocabulary)
    token_counts = text_data['token'].value_counts()
    text_data['token'] = np.where(text_data['token'].isin(token_counts.index[token_counts < threshold]), '<UNK>', text_data['token'])
    print(f"Threshold: {threshold}\nVocabulary Size: {vocab_size}\n<UNK> Occurrences: {total_removed}")
    vocabulary['index'] = vocabulary.index
    return text_data, vocabulary, vocab_size, total_before

In [468]:
train_data, vocab, vocab_size, total_words_before = build_vocabulary(train_data, train_vocab, threshold=2)
vocab.to_csv("data/vocab.txt", index=None, header=None, sep='\t', columns=['unique_tokens', 'index', 'count'])


Threshold: 2
Vocabulary Size: 23183
<UNK> Occurrences: 20011


In [None]:
##Model Creation

In [469]:
class HiddenMarkovModel:
    def __init__(self, lexicon=None, data=None) -> None:
        self.lexicon = lexicon
        self.data = data

    def generate_helpers(self):
        self.data_records = self.data.to_dict('records')
        #print(self.data_records[:100])
        self.unique_tokens = self.lexicon['unique_tokens'].to_numpy()
        self.data['tag'].value_counts()
        self.label_freqs = dict(zip(self.data['tag'].value_counts().index.tolist(), self.data['tag'].value_counts().tolist()))

        self.labels = self.data['tag'].unique().tolist()
        self.tokens = self.lexicon['unique_tokens'].tolist()
        self.tokens.append("<START>")

    def train(self):
        self.generate_helpers()

        self.transition_freqs = {key:{key2:0 for key2 in self.labels} for key in self.labels}
        self.emission_freqs = {key:{key2:0 for key2 in self.tokens} for key in self.labels}

        prev_label = '<START_TAG>'
        temp_record = {'index': '0', 'token': '<START>', 'tag': '<START_TAG>'}

        for record in self.data_records:
            self.emission_freqs[str(record['tag'])][str(record['token'])] += 1
            self.transition_freqs[str(prev_label)][str(record['tag'])] += 1

            prev_label = record['tag']
            temp_record = record

        self.emission_freqs['<START_TAG>']['<UNK>'] = self.label_freqs['<START_TAG>']
        self.transition_freqs['<START_TAG>']['<START_TAG>'] -= 1

        self.compute_probabilities()
        self.convert_probabilities_for_output()

        print(f"No. of Transition Parameters: {len(self.transition_probs_output)} \nNo. of Emission Parameters: {len(self.emission_probs_output)}")

    def compute_probabilities(self):
        self.transition_totals = {key: sum(self.transition_freqs[str(key)].values()) for key in self.labels}
        self.emission_totals = {key: sum(self.emission_freqs[str(key)].values()) for key in self.labels}

        transition_temp = {}
        emission_temp = {}

        for key in self.labels:
            transition_temp[str(key)] = (self.label_freqs[str(key)] - self.transition_totals[str(key)])
            emission_temp[str(key)] = (self.label_freqs[str(key)] - self.emission_totals[str(key)])

        self.label_freqs_new = {key: val - emission_temp[str(key)] for key, val in self.label_freqs.items()}

        self.transition_probs = {key: {key2: val / self.label_freqs[str(key)] for key2, val in self.transition_freqs[str(key)].items()} for key in self.labels}
        self.emission_probs = {key: {key2: val / self.label_freqs_new[str(key)] for key2, val in self.emission_freqs[str(key)].items()} for key in self.labels}

    def convert_probabilities_for_output(self):
        self.transition_probs_output = {}
        self.emission_probs_output = {}

        for label1 in self.transition_probs.keys():
            for label2 in self.transition_probs[label1].keys():
                prob = self.transition_probs[label1][label2]
                if prob != 0:
                    self.transition_probs_output[f"({label1}, {label2})"] = prob

            for token in self.emission_probs[label1].keys():
                prob = self.emission_probs[label1][token]
                if prob != 0:
                    self.emission_probs_output[f"({label1}, {token})"] = prob

    def convert_output_to_probabilities(self):
        self.transition_probs = {key:{key2:0 for key2 in self.labels} for key in self.labels}
        self.emission_probs = {key:{key2:0 for key2 in self.tokens} for key in self.labels}

        for pair, prob in self.transition_probs_output.items():
            label1, label2 = pair[1:-1].split(" ")
            self.transition_probs[label1[:-1]][label2] = prob

        for pair, prob in self.emission_probs_output.items():
            label, token = pair[1:-1].split(" ")
            self.emission_probs[label[0:-1]][token] = prob

    def load_model(self, filepath):
        with open(filepath) as json_file:
            model = json.load(json_file)

        self.transition_probs_output = model['transition']
        self.emission_probs_output = model['emission']
        self.convert_output_to_probabilities()
        self.labels = list(self.transition_probs.keys())

        return self.transition_probs, self.emission_probs

    def save_model(self, filepath):
        model = {'transition': self.transition_probs_output, 'emission': self.emission_probs_output}

        with open(filepath, "w") as json_file:
            json.dump(model, json_file, indent=4)

In [470]:
hmm = HiddenMarkovModel(vocab, train_data)
hmm.train()

No. of Transition Parameters: 1416 
No. of Emission Parameters: 30305


In [471]:
hmm.save_model("data/hmm.json")
transition_prob, emission_prob = hmm.load_model("data/hmm.json")
#trans_prob  = hmm.transition_probs_output
#emis_prob   = hmm.emission_probs_output

In [472]:
## Greedy 

In [473]:
class GreedyDecoder:
    def __init__(self, data, labels, transition_probs, emission_probs, original_data):
        """
        Initializes a GreedyDecoder instance.
        
        Parameters:
            data (list): List of sentences to be decoded.
            labels (list): List of tags for each sentence.
            transition_probs (dict): Transition probabilities between labels.
            emission_probs (dict): Emission probabilities for each label.
            original_data (list): Original data used to create the model.
        """
        self.data = data
        self.original_data = original_data
        self.transition_probs = transition_probs
        self.emission_probs = emission_probs
        self.labels = labels

    def decode_sentence(self, sentence_df):
        """
        Decodes a single sentence using the greedy algorithm.
        
        Parameters:
            sentence_df (pandas.DataFrame): Sentence to be decoded.
        
        Returns:
            tuple: Tuple containing the predicted labels and the original sentence.
        """
        previous_label = '<START_TAG>'
        #print(sentence_df)
        sentence = sentence_df['token'].values.tolist()[1:]
        self.sent_predictions = []
        
        
        
        probability=1
        for token in sentence:
            max_probability = -1
            for label in self.labels:
                #print(self.transition_probs[previous_label][label])
                probability = self.transition_probs[previous_label][label] * self.emission_probs[label][token]
                
                if probability > max_probability:
                    max_probability = probability
                    predicted_label = label
            
            previous_label = predicted_label
            self.sent_predictions.append(predicted_label)
        
        return self.sent_predictions, sentence
    
    def decode(self):
        """
        Decodes all sentences in the data using the greedy algorithm.
        
        Returns:
            tuple: Tuple containing the predicted labels, original tokens, and the output text.
        """
        self.predictions = []
        original_tokens = []
        self.output_text = []
        
        counter=0
        for sentence in self.data:
            original_sentence = self.original_data[counter]['token'].values.tolist()[1:]
            #print(original_sentence)
            #print(sentence)
            predictions, sentence = self.decode_sentence(sentence)
            
            position = 1
            
            for predicted_label, token in zip(predictions, original_sentence):
                self.output_text.append(f"{position}\t{token}\t{predicted_label}\n")
                position += 1
                
            original_tokens.extend(original_sentence)
            self.predictions.extend(predictions)
            self.output_text.append("\n")
        
        self.output_text = "".join(self.output_text[:-1])
        return self.predictions
    
    def calculate_accuracy(self, targets):
        """
        Calculates the accuracy of the model on the given targets.
        
        Parameters:
            targets (list): List of true labels for each sentence.
        
        Returns:
            float: Accuracy of the model on the given targets.
        """
        match_count = 0
        
        for prediction, target in zip(self.predictions, targets):
            if prediction == target:
                match_count += 1
        
        accuracy = match_count / len(self.predictions)
        return accuracy
    
    def get_targets(self):
        """
        Gets the true labels for each sentence in the data.
        
        Returns:
            list: List of true labels for each sentence.
        """
        targets = []
        
        for sentence in self.data:
            targets.extend(sentence['tag'].values.tolist()[1:])
        
        return targets
    
    def save_predictions(self, filepath):
        """
        Saves the predicted labels and original tokens to a file.
        
        Parameters:
            filepath (str): Path to the file where the predictions will be saved.
        """
        with open(filepath, "w") as output_file:
            output_file.write(self.output_text)

In [474]:
dev_data = load_text_data('data/dev', get_vocabulary=False,separate_sentences=True, replace_unknown=True, vocab=hmm.lexicon)
dev_data_orig = load_text_data('data/dev', get_vocabulary=False, separate_sentences=True,  vocab=hmm.lexicon)


In [475]:
greedy_dev = GreedyDecoder(dev_data, hmm.labels, transition_prob, emission_prob, dev_data_orig)
preds = greedy_dev.decode()

In [476]:
transition_prob, emission_prob = hmm.load_model("data/hmm.json")

In [477]:
acc = greedy_dev.calculate_accuracy(greedy_dev.get_targets())
print(f"Greedy Decoding Accuracy on dev_data: {acc*100}")

Greedy Decoding Accuracy on dev_data: 93.4870378240544


In [None]:
##greedy test 

In [479]:
test_data = load_text_data('data/test', get_vocabulary=False,separate_sentences=True, replace_unknown=True, vocab=hmm.lexicon)
test_data_orig = load_text_data('data/test', get_vocabulary=False, separate_sentences=True,  vocab=hmm.lexicon)


In [480]:
transition_prob, emission_prob = hmm.load_model("data/hmm.json")

In [484]:
greedy_test= GreedyDecoder(test_data, hmm.labels, transition_prob, emission_prob, test_data_orig)
preds = greedy_test.decode()

In [483]:
greedy_test.save_predictions("data/greedy.out")

In [406]:
##Viterbi

In [407]:
class ViterbiDecoding:
    def __init__(self, data, tag_list, transition_prob, emission_prob, data_orig) -> None:
        self.input_data = data
        self.original_data = data_orig
        self.transition_probs = transition_prob
        self.emission_probs = emission_prob
        self.tag_list = tag_list[1:]
        self.map_tag_to_index()
        self.map_index_to_tag()
        pass

    def map_tag_to_index(self):
        self.tag_to_index = {}
        i = 0
        for tag in self.tag_list:
            self.tag_to_index[tag] = i
            i += 1

    def map_index_to_tag(self):
        self.index_to_tag = {v: k for k, v in self.tag_to_index.items()}

    def predict(self):
        self.predictions = []
        self.targets = []
        self.output_string = ""

        count = 0
        for sentence in self.input_data:
            sentence_orig = self.original_data[count]['token'].values.tolist()[1:]
            
            predictions, sentence = self.predict_sentence(sentence)
            pos = 1
            for pred, word in zip(predictions, sentence_orig):
                self.output_string += f"{pos}\t{word}\t{pred}\n"
                pos += 1
            self.predictions.extend(predictions)
            
            self.output_string += "\n"
            
            count += 1
            # if count % 500 == 0:
            #     print(f"Completed {count} sentences.")
        
        self.output_string = "".join(self.output_string[:-1])
        
        return self.predictions

    def calculate_accuracy(self, targets):
        count_of_matches = 0
        for pred, target in zip(self.predictions, targets):
            if pred == target:
                count_of_matches += 1

        self.accuracy = count_of_matches / len(self.predictions)
        return self.accuracy
    
    def get_targets(self):
        self.targets = []
        for sentence_df in self.input_data:
            self.targets.extend(sentence_df['tag'].values.tolist()[1:])
        return self.targets

    
    def predict_sentence(self, sentence_df):
        self.sentence = sentence_df['token'].values.tolist()[1:]

        sentence_length = len(self.sentence)
        no_of_tags = len(self.tag_list)

        self.option_probabilities = np.zeros((no_of_tags, sentence_length))
        self.backtrack_matrix = np.zeros((no_of_tags, sentence_length))

        index = 0
        for tag in self.tag_list:
            self.option_probabilities[self.tag_to_index[tag], index] = self.transition_probs['<START_TAG>'][tag] * self.emission_probs[tag][self.sentence[index]]

        for j in range(1, sentence_length):
            for current_tag in self.tag_list:
                temp_prob = []
                for previous_tag in self.tag_list:
                    temp_prob.append(self.option_probabilities[self.tag_to_index[previous_tag], j-1] * self.transition_probs[previous_tag][current_tag] * self.emission_probs[current_tag][self.sentence[j]])
                
                max_tag_index = np.argmax(temp_prob)

                self.option_probabilities[self.tag_to_index[current_tag], j] = temp_prob[max_tag_index]
                self.backtrack_matrix[self.tag_to_index[current_tag], j ] = max_tag_index

        predicted_tags = self.backtrack(self.option_probabilities, self.backtrack_matrix)

        return (predicted_tags, self.sentence)

    def backtrack(self, option_probabilities, backtrack_matrix):
        predicted_tag = []
        sentence_length = len(self.sentence)
        no_of_tags = len(self.tag_list)
        
        j = sentence_length - 1
        index = np.argmax(option_probabilities[:,j])
        pointer = backtrack_matrix[index, j]
        predicted_tag.append(self.index_to_tag[index])

        for j in range(sentence_length-2, -1, -1):
            predicted_tag.append(self.index_to_tag[pointer])
            pointer = backtrack_matrix[int(pointer), j]

        predicted_tag.reverse()

        return predicted_tag
    
    def write_prediction_into_file(self, filepath):
        with open(filepath, "w") as output_file:
            output_file.write(self.output_string)

In [408]:
viterbi_dev = ViterbiDecoding(dev_data, hmm.labels, transition_prob, emission_prob, dev_data_orig)

In [409]:
preds = viterbi_dev.predict()

In [410]:
acc = viterbi_dev.calculate_accuracy(viterbi_dev.get_targets())
print(f"Viterbi Decoding Accuracy on dev_data: {acc*100}")

Viterbi Decoding Accuracy on dev_data: 94.76883613623946


In [None]:
##viterbi test data

In [411]:
viterbi_test = ViterbiDecoding(test_data, hmm.labels, transition_prob, emission_prob, test_data_orig)

In [412]:
preds = viterbi_test.predict()

In [413]:
viterbi_test.write_prediction_into_file("data/viterbi.out")
