In [1]:
import glob
import math
import nltk
import numpy as np
import os
import re
import string
from pathlib import Path

# 1. Text Classification using Naive Bayes

## 1.1 Build a Naive Bayes Classifier

In [42]:
# Get the paths of all the class sub-directories in the Training Data directory
training_data_directories = [directory.path for directory in os.scandir(os.path.join(os.getcwd(), 'Reuters Training Data')) if directory.is_dir()]

In [43]:
# For each class, get the paths of all training files
training_data_files = {}

for directory in training_data_directories:
    class_name = directory.rsplit('\\', 1)[1]
    file_names = glob.glob(os.path.join(directory, '*'))
    training_data_files[class_name] = [file_name for file_name in file_names]

In [10]:
def textFormatter(file_path):
    """
    Remove all newlines and duplicate spaces from the text in the file corresponding
    to the specified file path, and convert all characters to lower case.
    """
    formatted_text = ''
    
    with open(file_path, 'r') as text:
        print(file_path)
        formatted_text = ' '.join([line.strip() for line in text.read().split('\n') if line.strip])
    
    formatted_text = re.sub(' +', ' ', formatted_text)
    
    return formatted_text.lower()

In [45]:
class NaiveBayesMB:
    """
    Multinomial Naive Bayes classifier that works with text data.
    """
    def __init__(self):
        self.classes_ = []
        self.log_priors_ = {}
        self.log_likelihoods_ = {}
        self.vocabulary_ = []
        self.vocabulary_size_ = 0
    
    def fit(self, X, y):
        """
        Compute log likelihood and log prior values from the specified X and y lists,
        where X is a list of (pre-processed) texts and y is a list of class labels.
        """
        self.classes_ = set(y)
        total_document_count = len(X)
        training_class_word_counts = NaiveBayesMB.trainClassWordCounts(X, y)
        self.createVocabulary(training_class_word_counts)
        
        for train_class in training_class_word_counts.keys():
            log_prior = math.log2(y.count(train_class) / total_document_count)
            self.log_priors_[train_class] = log_prior
            self.log_likelihoods_[train_class] = {}
            for word in self.vocabulary_:
                # Use Laplace add-1 smoothing
                word_count = 0 if word not in training_class_word_counts[train_class] else training_class_word_counts[train_class][word]
                smoothed_class_word_count = word_count + 1
                smoothed_class_vocabulary_count = len(training_class_word_counts[train_class]) + self.vocabulary_size_
                log_likelihood = math.log2(smoothed_class_word_count / smoothed_class_vocabulary_count)
                self.log_likelihoods_[train_class][word] = log_likelihood
                
    def predict(self, X):
        """
        Perform classification on a list of texts.
        """
        predictions = []
        
        for text in X:
            sums = {}
            probs = {}
            for train_class in self.classes_:
                sums[train_class] = self.log_priors_[train_class]
                tokenized_text = NaiveBayesMB.textTokenizer(text)
                for word in tokenized_text:
                    if word in self.vocabulary_:
                        log_likelihood = 0 if word not in self.log_likelihoods_[train_class] else self.log_likelihoods_[train_class][word]
                        sums[train_class] = sums[train_class] + log_likelihood
                probs[train_class] = 2**sums[train_class]
            predicted_class = max(probs, key=probs.get)
            predictions.append(predicted_class)
            
        return predictions
    
    def createVocabulary(self, class_word_counts):
        """
        Generate a vocabulary of all unique words in all of the input texts.
        """
        for train_class in class_word_counts.keys():
            for word in class_word_counts[train_class].keys():
                if not word in self.vocabulary_:
                    self.vocabulary_.append(word)
        
        self.vocabulary_size_ = len(self.vocabulary_)
    
    def trainClassDocumentUniqueWords(X, y):
        """
        Return a dictionary of unique words for each document in each
        class.
        """
        training_class_document_unique_words = {}
        
        for index, text in enumerate(X):
            train_class = y[index]
            if not train_class in training_class_document_unique_words.keys():
                training_class_document_unique_words[train_class] = {}
            tokenized_text = NaiveBayesMB.textTokenizer(text)
            unique_words = list(set(tokenized_text))
            training_class_document_unique_words[train_class][index] = unique_words
            
        return training_class_document_unique_words
    
    def trainClassWordCounts(X, y):
        """
        Return the counts for each unique word that occurs in each class.
        The counts refer to the number of documents each word occurs in.
        """
        training_class_word_counts = {}
        training_class_document_unique_words = NaiveBayesMB.trainClassDocumentUniqueWords(X, y)
        
        for train_class in training_class_document_unique_words.keys():
            training_class_word_counts[train_class] = {}
            for file_name in training_class_document_unique_words[train_class]:
                for word in training_class_document_unique_words[train_class][file_name]:
                    if not word in training_class_word_counts[train_class].keys():
                        training_class_word_counts[train_class][word] = 1
                    else:
                        training_class_word_counts[train_class][word] = training_class_word_counts[train_class][word] + 1
        
        return training_class_word_counts
    
    def textTokenizer(text):
        """
        Tokenize and return the specified text into a list of words.
        """
        non_alpha_numeric_matcher = re.compile('\W')
        tokens = nltk.word_tokenize(text)
        tokens = [token for token in tokens if non_alpha_numeric_matcher.match(token) == None]
        
        return tokens
        

In [46]:
# Create list of 'earn' class file names
earn_class_files_names = []

for file in training_data_files['earn']:
    file_name = file.rsplit('\\', 1)[1]
    earn_class_files_names.append(file_name)

In [47]:
# Create inputs for Multinomial Naive Bayes model
input_texts = []
classes = []

for train_class in training_data_files.keys():
    for file in training_data_files[train_class]:
        file_name = file.rsplit('\\', 1)[1]
        class_name = 'earn' if file_name in earn_class_files_names else 'not_earn'
        formatted_text = textFormatter(file)
        input_texts.append(formatted_text)
        classes.append(class_name)

In [48]:
# Create the Multinomial Naive Bayes classifier and fit to the training data
naive_bayes_MB = NaiveBayesMB()
naive_bayes_MB.fit(input_texts, classes)

In [49]:
# Run the Multinomial Naive Bayes classifier on the training data 
train_predictions = naive_bayes_MB.predict(input_texts)

## 1.2 Implement the Evaluation Metrics

In [50]:
class EvaluationMetrics:
    """
    Calculate precision, recall, and F1 Measure for a list of classes, a list of
    predictions, and a target category.
    """
    def __init__(self, classes, predictions, target):
        self.classes_ = classes
        self.predictions_ = predictions
        self.target_ = target
        
    def precision(self):
        """
        Calculate the precision.
        """
        true_positives = 0
        false_positives = 0
        
        for index,prediction in enumerate(self.predictions_):
            if self.classes_[index] == self.target_ and prediction == self.target_:
                true_positives = true_positives + 1
            elif self.classes_[index] != self.target_ and prediction == self.target_:
                false_positives = false_positives + 1
                
        precision = true_positives / (true_positives + false_positives)
        
        return precision
    
    def recall(self):
        """
        Calculate the recall.
        """
        true_positives = 0
        false_negatives = 0
        
        for index,prediction in enumerate(self.predictions_):
            if self.classes_[index] == self.target_ and prediction == self.target_:
                true_positives = true_positives + 1
            if self.classes_[index] == self.target_ and prediction != self.target_:
                false_negatives = false_negatives + 1
                
        recall = true_positives / (true_positives + false_negatives)
        
        return recall
    
    def F1(self):
        """
        Calculate F1 Measure.
        """
        precision = self.precision()
        recall = self.recall()
        
        f1 = (2 * precision * recall) / (precision + recall)
        
        return f1
    
    def printSummary(self):
        """
        Print a summary of the precision, recall, and F1 Measure scores.
        """
        precision = self.precision()
        recall = self.recall()
        f1 = self.F1()
        
        print("precision: {}".format(precision))
        print("recall: {}".format(recall))
        print("f1: {}".format(f1))

In [51]:
# Calculate precision, recall, and F1 measure for the training predictions and print a summary
metrics = EvaluationMetrics(classes, train_predictions, 'earn')
metrics.printSummary()

precision: 0.9995194617972128
recall: 0.7155142758857929
f1: 0.834001603849238


## 1.3 Run the Classifier on Test Set

In [13]:
# Load in the test data
test_files = glob.glob(os.path.join(os.getcwd(), 'Reuters Test Data', '*'))
test_texts = [textFormatter(file) for file in test_files]

D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0009603
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0009604
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0009605
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0009606
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0009609
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0009611
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0009613
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0009616
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0009618
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0009622
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0009623
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0009624
D:\School\NLP\Class Repo\CS-

D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0010517
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0010518
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0010526
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0010527
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0010528
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0010529
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0010530
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0010531
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0010533
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0010534
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0010535
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0010536
D:\School\NLP\Class Repo\CS-

D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011204
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011206
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011207
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011208
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011209
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011210
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011211
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011212
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011213
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011215
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011216
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011218
D:\School\NLP\Class Repo\CS-

D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011962
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011963
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011965
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011966
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011967
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011968
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011969
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011970
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011971
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011972
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011973
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0011974
D:\School\NLP\Class Repo\CS-

D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0012551
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0012552
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0012553
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0012554
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0012555
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0012556
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0012557
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0012558
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0012559
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0012560
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0012561
D:\School\NLP\Class Repo\CS-6120_saulniern\Assignment2\Reuters Test Data\0012562
D:\School\NLP\Class Repo\CS-

In [53]:
# Run the classifier on the test set
test_predictions = naive_bayes_MB.predict(test_texts)

In [54]:
# Get the list of files classified as 'earn'
test_earn_files = []

for index,prediction in enumerate(test_predictions):
    if prediction == 'earn':
        file_name = test_files[index].rsplit('\\', 1)[1]
        test_earn_files.append(file_name)

In [55]:
# Print the file names of test files classified as 'earn' to an output file
test_predictions_out = os.path.join(os.getcwd(), 'Naive Bayes Classifier Output', 'files_classified_as_earn.txt')

with open(test_predictions_out, 'w') as file:
    for file_name in test_earn_files:
        line = file_name + '\n'
        file.write(line)

# 2. Part-of-Speech (POS) Tagging using Hidden Markov Models (HMM)

## 2.1 Frequency Counts

In [56]:
def writeDictionaryToFile(dictionary, out_file):
    """
    Write the specified dictionary to the specified output file.
    """
    with open(out_file, 'w') as file:
        for key in dictionary.keys():
            line = str(key) + ' : ' + str(dictionary[key]) + '\n'
            file.write(line)

In [57]:
# Get all Brown Corpus training file names
brown_corpus_train_files = glob.glob(os.path.join(os.path.join(os.getcwd(), 'Brown Corpus Training Data'), '*'))

In [58]:
# Create a string containing the formatted text from all Brown Corpus training files
training_file_texts = []

for file in brown_corpus_train_files:
    formatted_text = textFormatter(file)
    training_file_texts.append(formatted_text)

brown_corpus_train_text = ' '.join(training_file_texts)

In [59]:
sentence_begin = '<s>'
sentence_end = '<EOS>'

In [60]:
# Create a regex for parsing word/tag pairs
word_tag_pair_regex_string = '\s?(([^\s]+)\/([^\s]+))\s{1}'
word_tag_pair_regex = re.compile(word_tag_pair_regex_string)

In [61]:
# Extract all word/tag pairs in the Brown Corpus training text
word_tag_pairs = [(match.group(2),match.group(3)) for match in word_tag_pair_regex.finditer(brown_corpus_train_text)]

In [62]:
# Insert sentence end markers
sentence_end_insertions = []

end_count = 0
for index, pair in enumerate(word_tag_pairs):
    if pair[0] == '.' and pair[1] == '.':
        sentence_end_insertions.append(index + end_count)
        end_count += 1
    
for index in sentence_end_insertions:
    word_tag_pairs.insert(index, (sentence_end, sentence_end))

In [63]:
# Remove all punctuation pairs
word_tag_pairs = [pair for pair in word_tag_pairs if (not any(punct in pair[0][0] for punct in string.punctuation)) or pair[0] == sentence_end]

In [64]:
# Insert sentence begin markers
sentence_begin_insertions = []

begin_count = 1
sentence_begin_insertions.append(0)
for index, pair in enumerate(word_tag_pairs):
    if pair[0] == sentence_end and pair[1] == sentence_end:
        sentence_begin_insertions.append((begin_count + index) + 1)
        begin_count += 1
        
for index in sentence_begin_insertions:
    word_tag_pairs.insert(index, (sentence_begin, sentence_begin))

In [65]:
# Count word-tag pair occurrences
word_tag_pair_counts = {}

for pair in word_tag_pairs:
    if pair[0] != sentence_begin and pair[0] != sentence_end:
        if pair in word_tag_pair_counts:
            word_tag_pair_counts[pair] += 1
        else:
            word_tag_pair_counts[pair] = 1

In [66]:
# Count tag unigram occurrences
tag_unigram_counts = {}

for pair in word_tag_pairs:
    tag = pair[1]
    if tag in tag_unigram_counts:
        tag_unigram_counts[tag] += 1
    else:
        tag_unigram_counts[tag] = 1

In [67]:
# Count tag bigram occurences
tag_bigram_counts = {}

for index in range(1, len(word_tag_pairs) - 1):
    tag1 = word_tag_pairs[index - 1][1]
    tag2 = word_tag_pairs[index][1]
    tag_bigram = (tag1, tag2)
    if tag_bigram in tag_bigram_counts:
        tag_bigram_counts[tag_bigram] += 1
    else:
        tag_bigram_counts[tag_bigram] = 1

In [68]:
# Sort the count dictionarys in decreasing order by occurrence
word_tag_pair_counts = {word:count for word,count in sorted(word_tag_pair_counts.items(), key=lambda item: item[1], reverse=True)}
tag_unigram_counts = {unigram:count for unigram,count in sorted(tag_unigram_counts.items(), key=lambda item: item[1], reverse=True)}
tag_bigram_counts = {bigram:count for bigram,count in sorted(tag_bigram_counts.items(), key=lambda item: item[1], reverse=True)}

In [69]:
# Write the word/tag pair, tag/unigram, and tag/bigram counts to output files
word_tag_pair_out = os.path.join(os.getcwd(), 'Brown Corpus Training Data Counts Output', 'word_tag_pair_counts.txt')
tag_unigram_out = os.path.join(os.getcwd(), 'Brown Corpus Training Data Counts Output', 'tag_unigram_counts.txt')
tag_bigram_out = os.path.join(os.getcwd(), 'Brown Corpus Training Data Counts Output', 'tag_bigram_counts.txt')

writeDictionaryToFile(word_tag_pair_counts, word_tag_pair_out)
writeDictionaryToFile(tag_unigram_counts, tag_unigram_out)
writeDictionaryToFile(tag_bigram_counts, tag_bigram_out)

## 2.2 Transition Probabilities

In [70]:
# Calculate transition probabilities for each possibile bigram
transition_probabilities = {}

for tag_unigram_1 in tag_unigram_counts.keys():
    for tag_unigram_2 in tag_unigram_counts.keys():
        tag_bigram = (tag_unigram_1, tag_unigram_2)
        bigram_count = 0 if not tag_bigram in tag_bigram_counts.keys() else tag_bigram_counts[tag_bigram]
        transition_probability = bigram_count / tag_unigram_counts[tag_unigram_1]
        transition_probabilities[tag_bigram] = transition_probability

## 2.3 Emission Probabilities

In [71]:
# Get the list of unique words in the training data
unique_words = list(set([pair[0] for pair in word_tag_pair_counts.keys()]))

In [72]:
# Calculate the emission probability for each possible word/tag pair
emission_probabilities = {}

for word in unique_words:
    for tag in tag_unigram_counts.keys():
        word_tag_pair = (word, tag)
        word_tag_count = 0 if not word_tag_pair in word_tag_pair_counts else word_tag_pair_counts[word_tag_pair]
        emission_probability = word_tag_count / tag_unigram_counts[tag]
        emission_probabilities[word_tag_pair] = emission_probability

## 2.4 Viterbi Algorithm

In [73]:
def cleanBrownTestSentence(sentence):
    """
    Remove punctuation from the tokenized sentence and return as a string with all tokens
    separated with a single space.
    """
    lines = sentence.split()
    lines = [line.lower() for line in lines if (not all([char in string.punctuation for char in line]))]
    return ' '.join(lines)

In [74]:
# Read the Brown Corpus tokenized test data file
brown_corpus_test_text = Path(os.path.join(os.getcwd(), 'Brown Corpus Test Data', 'BrownCorpusTokenizedDataTest_File.txt')).read_text()
brown_corpus_test_text_lines = brown_corpus_test_text.splitlines()

In [75]:
# Get the indexes for sentence start and sentence end tags
sentence_start_indexes = []
sentence_end_indexes = []
sentence_start_substring = '< sentence ID'
sentence_end_string = '<EOS>'

for index,line in enumerate(brown_corpus_test_text_lines):
    if sentence_start_substring in line:
        sentence_start_indexes.append(index)
    elif line == sentence_end_string:
        sentence_end_indexes.append(index)

In [76]:
# Parse and clean the test data sentences
test_data_sentences = {}
sentence_id_regex = re.compile('(\d+)')

for index in range(0, len(sentence_start_indexes)):
    sentence_start_index = sentence_start_indexes[index]
    sentence_end_index = sentence_end_indexes[index]
    sentence_tokens = brown_corpus_test_text_lines[sentence_start_index:sentence_end_index]
    sentence_tokens.insert(1, sentence_begin)
    sentence_tokens.insert(len(sentence_tokens) - 1, sentence_end)
    sentence_id = int(re.search(sentence_id_regex, sentence_tokens[0]).group(0))
    test_data_sentences[sentence_id] = cleanBrownTestSentence(' '.join(sentence_tokens[1:-1]))

In [77]:
def maxState(word_state_matrix, current_state, transition_probabilities, emission_probability):
    """
    Find and return the max previous state and the probability of the max state occurring before 
    the previous state given the emission probability of the current state and the current word.
    
    Inputs:
        word_state_matrix: matrix of probability values for all states for an observation. the
            probability values represent the maximum probability of each state considering all 
            previous observations.
        current_state: the state being transitioned to; the max state is the previous state that
            gives the highest probability for the current state
        transition_probabilities: dictionary of probabilities for moving from one state to another
        emission_probability: probability of the word associated with the given word_state_matrix
            occurring the current state
    Outputs:
        A pair: the first element is the max previous state given the current state and current 
        word, and the second element is the probability of the most optimal path to the max
        previous state considering all observations preceeding the previous observation
    """
    max_state = list(word_state_matrix.keys())[0]
    
    for state,probability in word_state_matrix.items():
        tag_bigram = (state, current_state)
        transition_probability = 0 if not tag_bigram in transition_probabilities else transition_probabilities[tag_bigram]
        state_conditional_probability = probability * transition_probability * emission_probability
        if state_conditional_probability > word_state_matrix[max_state]:
            max_state = state
            
    return (max_state, word_state_matrix[max_state])

In [78]:
def viterbi(observations, states, start_probabilities, transition_probabilities, emission_probabilities):
    """
    POS tagging for a single sentence using the viterbi algorithm.
    
    inputs:
        observations: list of word tokens, representing all words in a single sentence
        states: list of POS tags
        start_probabilities: probability of each state occurring at the start of a sentence
        transition_probabilities: dictionary of probabilities of one tag being preceeded by another; key
            is a pair of POS tags in the order being evaluated
        emission_probabilities: dictionary of probabilites of words being associated with a tag; key is
            a word/tag pair
    outputs:
        list of most likely POS tag for each observation
    """
    word_state_matrix = {}
    
    # Instantiate state values for the first word in the sentence
    word_state_matrix[observations[0]] = {}
    for state in states:
        word_tag_pair = (observations[0], state)
        start_probability = start_probabilities[(sentence_begin, state)]
        emission_probability = 0 if not word_tag_pair in emission_probabilities else emission_probabilities[word_tag_pair]
        word_state_matrix[observations[0]][state] = start_probability * emission_probability

    # For each observation/state pair, find the optimal path to that observation/state
    for observation_index in range(1, len(observations)):
        current_word = observations[observation_index]
        word_state_matrix[current_word] = {}
        previous_word = observations[observation_index - 1]
        for state in states:
            word_tag_pair = (current_word, state)
            emission_probability = 0 if not word_tag_pair in emission_probabilities else emission_probabilities[word_tag_pair]
            max_previous_state = maxState(word_state_matrix[previous_word], state, transition_probabilities, emission_probability)
            tag_bigram = (max_previous_state[0], state)
            transition_probability = 0 if not tag_bigram in transition_probabilities else transition_probabilities[tag_bigram]   
            word_state_matrix[current_word][state] = max_previous_state[1] * transition_probability * emission_probability

    # Backtrack through the word/state matrix, choosing the most optimal state for
    # each observation
    pos_tags = []
    for observation_index in range(len(observations)-1, -1, -1):
        current_word = observations[observation_index]
        current_word_state_matrix = word_state_matrix[current_word]
        best_tag_current_word = max(current_word_state_matrix, key=current_word_state_matrix.get)
        pos_tags.append(best_tag_current_word)
        
    return list(reversed(pos_tags))

In [79]:
# Create start probabilities matrix
start_probabilities = {}

for tag in tag_unigram_counts.keys():
    tag_bigram = (sentence_begin, tag)
    start_probability = 0 if not tag_bigram in transition_probabilities else transition_probabilities[tag_bigram]
    start_probabilities[tag_bigram] = start_probability

In [81]:
# Tag each sentence in the test data using the viterbi implementation and
# write results to output file
viterbi_out = os.path.join(os.getcwd(), 'Viterbi Output', 'POS_test_data_output.txt')

with open(viterbi_out, 'w') as file:
    for sentence_id, sentence in test_data_sentences.items():
        observations = sentence.split()[1:-1]
        pos_tags = viterbi(observations, tag_unigram_counts.keys(), start_probabilities, transition_probabilities, emission_probabilities)
        sentence_id_tag = '<sentence ID=' + str(sentence_id) + '>\n'
        file.write(sentence_id_tag)
        for index in range(0, len(pos_tags)):
            line = str(observations[index]) + ',' + str(pos_tags[index]) + '\n'
            file.write(line)
        file.write(sentence_end + '\n')