In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [10]:
###################################
# CS B551 Fall 2022, Assignment #4
#
# Scoring code by D. Crandall
#
# PLEASE DON'T MODIFY THIS FILE.
# Edit pos_solver.py instead!
#

class Score:
    def __init__(self):
        self.word_scorecard = {}
        self.sentence_scorecard = {}
        self.word_count = 0
        self.sentence_count = 0

    def score(self, algo_outputs, gt):
        self.word_count += len(gt)
        self.sentence_count += 1

        for algo, labels in algo_outputs.items():
            correct = 0
            for j in range(0, len(gt)):
                correct += 1 if gt[j] == labels[j] else 0

            self.word_scorecard[algo] = self.word_scorecard.get(algo, 0) + correct
            self.sentence_scorecard[algo] = self.sentence_scorecard.get(algo, 0) + (correct == len(gt))

    def print_scores(self):
        print("\n==> So far scored %d sentences with %d words." % (self.sentence_count, self.word_count))
        print("                   Words correct:     Sentences correct: ")

        for i in sorted(self.word_scorecard):
            print("%18s:     %7.2f%%             %7.2f%%" % (i, self.word_scorecard[i] * 100 / float(self.word_count),
                                                             self.sentence_scorecard[i] * 100 / float(
                                                                 self.sentence_count)))

    @staticmethod
    def print_helper(description, list, sentence):
        print(("%40s" % description) + " " + " ".join(
            [(("%-" + str(max(5, len(sentence[i]))) + "s") % list[i]) for i in range(0, len(list))]))

    @staticmethod
    def print_results(sentence, outputs, posteriors, models):
        Score.print_helper(" ".join([("%7s" % model) for model in models]), sentence, sentence)
        for algo in sorted(outputs.keys()):
            Score.print_helper(algo + "  " + " ".join(
                [("%7.2f" % posteriors[algo][model]) if algo in posteriors else " " * 7 for model in models]),
                               outputs[algo], sentence)

In [93]:
import math
from collections import Counter, defaultdict
from itertools import product
import random
from tqdm import trange


class Solver:
    """
    This class predicts parts-of-speech for words in a sentence
    in three ways using different bayesian network architectures:
        1. a Simple model using only emission probabilities
            for parts-of-speech given a word
        2. a Hidden Markov model implemented by Viterbi algorithm
        3. a Complex bayesian model by Monte Carlo simulations
            implemented by Gibbs sampling
    """

    def __init__(self, default_prob=1e-6):
        """
        These dictionaries store the log probabilities
        corresponding to different edges in the bayesian models:
            1. Emissions: POS --> Word
            2. Transitions: POS --> next POS
            3. Emission2s: POS --> next Word
            4. Transition2s: POs --> next next POS
        The default part-of-speech probability is also defined here
        for words which are not in the training data.
        """
        self.emissions = {}  # simple
        self.transitions = {}  # hmm
        self.emission2s = {}  # complex
        self.transition2s = {}  # complex
        self.default_prob = math.log(1e-6)

    def train(self, data: list, verbose=False):
        """
        This method fills the log probability dictionaries for the
        emissions, transition, emission2s, and transition2s
        bayesian network edges.
        It takes a list of tuples composed of two objects each:
            1. a tuple of words, representing the sentence
            2. a tuple of parts-of-speech, corresponding to
                each word in the sentence
        The occurence of each edge is counted and normalized
        and LaPlace smoothing is applied for non-existent edges.
        """
        if verbose:
            print("training...")
            print("counting emissions and transitions")
        # Initialize counters for emissions and transitions
        emission_counts = defaultdict(int)
        transition_counts = defaultdict(int)
        tag_counts = defaultdict(int)
        start_tag_counts = defaultdict(int)
        emission2_counts = defaultdict(int)
        transition2_counts = defaultdict(int)

        # Count emissions and transitions from the training data
        for sentence, tags in data:
            start_tag_counts[tags[0]] += 1  # Increment the start tag count
            # count emission from current tag
            for i in range(len(tags)):
                tag = tags[i]
                word = sentence[i].lower()
                tag_counts[tag] += 1
                emission_counts[(tag, word)] += 1  # simple
                # count transition and emission from previous tag
                if i > 0:
                    prev_tag = tags[i - 1]
                    transition_counts[(prev_tag, tag)] += 1  # hmm
                    emission2_counts[(prev_tag, word)] += 1  # complex
                # count transition from previous previous tag
                if i > 1:
                    prev2_tag = tags[i - 2]
                    transition2_counts[(prev2_tag, tag)] += 1  # complex

        # Get sets of all words and tags to determine vocab and number of tags
        self.word_list = list(set(word for sentence, _ in data
                                  for word in sentence))
        self.tag_list = list(set(tag for _, tags in data
                                 for tag in tags))

        # Laplace smoothing - add one to all counts
        if verbose:
            print("Laplace smoothing - Add one to all counts")
        for tag in self.tag_list:
            start_tag_counts[tag] += 1
        for tag in self.tag_list:
            for word in self.word_list:
                emission_counts[(tag, word)] += 1
                emission2_counts[(tag, word)] += 1
        for prev_tag in self.tag_list:
            for tag in self.tag_list:
                transition_counts[(prev_tag, tag)] += 1
                transition2_counts[(prev_tag, tag)] += 1

        # Convert counts to probabilities with smoothing
        # Use total_words for emission probabilities
        if verbose:
            print("computing probabilities...")
        self.initial = {
            k: math.log(v / (tag_counts[k] + len(self.tag_list)))
            for k, v in start_tag_counts.items()
        }
        self.emissions = {
            k: math.log(v / (tag_counts[k[0]] + len(self.word_list)))
            for k, v in emission_counts.items()
        }
        self.transitions = {
            k: math.log(v / (tag_counts[k[0]] + len(self.tag_list)))
            for k, v in transition_counts.items()
        }
        self.emission2s = {
            k: math.log(v / (tag_counts[k[0]] + len(self.word_list)))
            for k, v in emission2_counts.items()
        }
        self.transition2s = {
            k: math.log(v / (tag_counts[k[0]] + len(self.tag_list)))
            for k, v in transition2_counts.items()
        }
        if verbose:
            print("training complete!")

    def posterior(self, model: str, sentence: tuple, label: tuple):
        """
        This method computes the posterior probabilities of a given sentence
        as labeled by a corresponding set of parts-of-speech.
        The method is implemented for each different model,
        Simple, HMM, and MCMC.
        """
        if model == "Simple":
            # calculate the sum of emission log probabilities
            probability_sum = 0
            for word, tag in zip(sentence, label):
                probability_sum += self.emissions.get((tag, word.lower()),
                                                      self.default_prob)
            return probability_sum

        elif model == "HMM":
            # For the HMM model, calculate the total log probability
            total_log_prob = 0
            for i in range(len(sentence)):
                word = sentence[i].lower()
                tag = label[i]
                # Emission probability
                emission_prob = self.emissions.get((tag, word),
                                                   self.default_prob)
                total_log_prob += emission_prob
                # transition probability
                if i == 0:
                    # Transition from start tag
                    start_transition_prob = self.initial.get(tag,
                                                             self.default_prob)
                    total_log_prob += start_transition_prob
                else:
                    # Transition from previous tag
                    prev_tag = label[i-1]
                    transition_prob = self.transitions.get((prev_tag, tag),
                                                           self.default_prob)
                    total_log_prob += transition_prob
            return total_log_prob

        elif model == "Complex":
            # For the complex model, calculate the total log probability
            total_log_prob = 0
            for i in range(len(sentence)):
                word = sentence[i].lower()
                tag = label[i]
                # emission probability to current word
                emission_prob = self.emissions.get((tag, word),
                                                   self.default_prob)
                total_log_prob += emission_prob
                # transition from start tag
                if i == 0:
                    start_transition_prob = self.initial.get(tag,
                                                             self.default_prob)
                    total_log_prob += start_transition_prob
                # transition from previous tag
                else:
                    prev_tag = label[i-1]
                    transition_prob = self.transitions.get((prev_tag, tag),
                                                           self.default_prob)
                    total_log_prob += transition_prob
                # emission to next word
                if i != range(len(sentence))[-1]:
                    next_word = sentence[i+1].lower()
                    emission2_prob = self.emission2s.get((tag, next_word),
                                                         self.default_prob)
                    total_log_prob += emission2_prob
                # transition from previous previous tag
                if i > 1:
                    prev2_tag = label[i-2]
                    transition2_prob = self.transition2s.get((prev2_tag, tag),
                                                             self.default_prob)
                    total_log_prob += transition2_prob
            return total_log_prob
        else:
            print("Unknown algorithm!")

    def simplified(self, sentence: tuple):
        """
        Simplified POS tagging method assigns the most probable tag
        to each word as the argmax of the emission log probability
        """
        argmax_list = [
            max(self.tag_list,
                key=lambda tag: self.emissions.get((tag, word.lower()), 0))
            for word in sentence
        ]
        print("Simple Model complete!")
        return argmax_list

    def hmm_viterbi(self, sentence: tuple):
        """
        Implementation of the Viterbi algorithm for HMM mode
        """
        # Number of tags
        n_tags = len(self.tag_list)
        # Initialize the Viterbi matrix with negative infinity
        viterbi = [[-math.inf for _ in range(n_tags)]
                   for _ in range(len(sentence))]
        # Backpointer matrix to track the best path
        backpointer = [[0 for _ in range(n_tags)]
                       for _ in range(len(sentence))]

        # Initialization step using log probabilities
        first_word = sentence[0].lower()
        for tag_idx, tag in enumerate(self.tag_list):
            # Check for the presence of (word, tag) in emission probabilities
            if (tag, first_word) in self.emissions:
                viterbi[0][tag_idx] = self.initial.get(tag,
                                                       self.default_prob) + \
                                        self.emissions.get((tag, first_word),
                                                           self.default_prob)
            else:
                # Handle unseen (word, tag) pairs
                viterbi[0][tag_idx] = self.initial.get(tag,
                                                       self.default_prob) + \
                                        self.default_prob

        # Iterate through the rest of the sentence
        for t in range(1, len(sentence)):
            word = sentence[t].lower()
            for tag_idx, tag in enumerate(self.tag_list):
                # Find the max log probability and corresponding tag index
                # from the previous step
                max_log_prob, best_tag_idx = max([
                    (
                        viterbi[t-1][prevtag_idx] +
                        self.transitions.get((self.tag_list[prevtag_idx], tag),
                                             self.default_prob),
                        prevtag_idx
                    )
                    for prevtag_idx in range(n_tags)
                ], key=lambda x: x[0])

                # Update the Viterbi matrix
                if (tag, word) in self.emissions:
                    viterbi[t][tag_idx] = max_log_prob + \
                                          self.emissions.get((tag, word),
                                                             self.default_prob)
                # Handle unseen pairs
                else:
                    viterbi[t][tag_idx] = max_log_prob + self.default_prob

                # Update the backpointer matrix
                backpointer[t][tag_idx] = best_tag_idx

        # Termination step
        max_prob, best_tag_idx = max(
            [(viterbi[len(sentence)-1][tag_idx], tag_idx)
             for tag_idx in range(n_tags)],
            key=lambda x: x[0]
        )

        # Backtrack to find the best path
        best_path = [best_tag_idx]
        for t in range(len(sentence)-1, 0, -1):
            best_path.insert(0, backpointer[t][best_path[0]])

        print("Hidden Markov Model complete!")
        # Convert numeric indices to tags
        return [self.tag_list[i] for i in best_path]

    def SampleKeys(self, edge_probs: dict, n: int):
        """
        This method gets random keys (bayesian network edges)
        as given by their values (probabilities).
        This is the mechanism that implements the Monte Carlo simulations.
        """
        edges = list(edge_probs.keys())
        probabilities = [math.exp(v) for v in edge_probs.values()]
        sampled_edges = random.choices(edges, probabilities, k=n)
        return sampled_edges

    def mcmc_word_probs(self, sentence: tuple, predicted_pos: list, i: int):
        """
        This method gets all possible POS log probabilities
        for the observed word given its context in the sentence.
        """
        word_probs = {}
        word_probs['emission'] = {
            k: v for k, v in self.emissions.items()
            if k[1] == sentence[i]
        }
        if i == 0:
            word_probs['initial'] = {
                k: v for k, v in self.initial.items()
            }
        if i > 0:
            word_probs['transition'] = {
                k: v for k, v in self.transitions.items()
                if k[0] == predicted_pos[-1]
            }
        if i != range(len(sentence))[-1]:
            word_probs['emission2'] = {
                k: v for k, v in self.emission2s.items()
                if k[1] == sentence[i+1]
            }
        if i > 1:
            word_probs['transition2'] = {
                k: v for k, v in self.transition2s.items()
                if k[0] == predicted_pos[-2]
            }
        # if its a new word, fill probabilities with default log probability
        for k, v in word_probs.items():
            if k == 'initial' and len(v) == 0:
                word_probs[k] = {
                    tag: self.default_prob for tag in self.tag_list
                }
            elif k == 'emission' and len(v) == 0:
                word_probs[k] = {
                    (tag, sentence[i]): self.default_prob
                    for tag in self.tag_list
                }
            elif k == 'transition' and len(v) == 0:
                word_probs[k] = {
                    (predicted_pos[-1], tag): self.default_prob
                    for tag in self.tag_list
                }
            elif k == 'emission2' and len(v) == 0:
                word_probs[k] = {
                    (tag, sentence[i+1]): self.default_prob
                    for tag in self.tag_list
                }
            elif k == 'transition2' and len(v) == 0:
                word_probs[k] = {
                    (predicted_pos[-2], tag): self.default_prob
                    for tag in self.tag_list
                }
        return word_probs

    def complex_mcmc(self, sentence: tuple, n=100):
        """
        This method applies Gibbs sampling to perform Markov Chain Monte Carlo
        simulations to estimate the distribution of the Complex model
        and predict the parts of speech.
        """
        print(f"Monte Carlo Markov Chain with {n} simulations per word...")
        # iterate through each word in the sentence to get predicted POS
        predicted_pos = []
        for i in trange(len(sentence)):
            # first get the relevant possible probabilities of POS for the word
            word_probs = self.mcmc_word_probs(sentence=sentence,
                                              predicted_pos=predicted_pos,
                                              i=i)
            # then sample from the probabilities to get the most frequent POS
            pos_samples = []
            # special case for 1st character in 1-character sentence
            if i == 0 and i == range(len(sentence))[-1]:
                combinations = product(word_probs['initial'].keys(),
                                       word_probs['emission'].keys())
                initial_probs = {
                    emk: sum([word_probs['initial'][ink],
                              word_probs['emission'][emk]])
                    for ink, emk in combinations
                    if ink == emk[0]
                }
                pos_samples.extend([pos for pos, word
                                    in self.SampleKeys(initial_probs, n=n)])

            # draw the first POS from the initial probability distribution
            elif i == 0:
                combinations = product(word_probs['initial'].keys(),
                                       word_probs['emission'].keys(),
                                       word_probs['emission2'].keys())
                initial_probs = {
                    emk: sum([word_probs['initial'][ink],
                              word_probs['emission'][emk],
                              word_probs['emission2'][em2k]])
                    for ink, emk, em2k in combinations
                    if ink == emk[0] == em2k[0]
                }
                pos_samples.extend([pos for pos, word
                                    in self.SampleKeys(initial_probs, n=n)])

            # special case for 2nd character in 2-character sentence
            elif i == 1 and i == range(len(sentence))[-1]:
                combinations = product(word_probs['transition'].keys(),
                                       word_probs['emission'].keys())
                final_probs = {
                    emk: sum([word_probs['transition'][trk],
                              word_probs['emission'][emk]])
                    for trk, emk in combinations
                    if trk[1] == emk[0]
                }
                pos_samples.extend([pos for pos, word
                                    in self.SampleKeys(final_probs, n=n)])

            # draw the second POS from emission, transition, and emission2
            elif i == 1:
                combinations = product(word_probs['transition'].keys(),
                                       word_probs['emission'].keys(),
                                       word_probs['emission2'].keys())
                next_probs = {
                    emk: sum([word_probs['transition'][trk],
                              word_probs['emission'][emk],
                              word_probs['emission2'][em2k]])
                    for trk, emk, em2k in combinations
                    if trk[1] == emk[0] == em2k[0]
                }
                pos_samples.extend([pos for pos, word
                                    in self.SampleKeys(next_probs, n=n)])

            # draw POS from emission, transition, emission2, and transition2
            elif i != range(len(sentence))[-1]:
                combinations = product(word_probs['transition'].keys(),
                                       word_probs['emission'].keys(),
                                       word_probs['transition2'].keys(),
                                       word_probs['emission2'].keys())
                next_probs = {
                    emk: sum([word_probs['transition'][trk],
                              word_probs['emission'][emk],
                              word_probs['transition2'][tr2k],
                              word_probs['emission2'][em2k]])
                    for trk, emk, tr2k, em2k in combinations
                    if trk[1] == emk[0] == tr2k[1] == em2k[0]
                }
                pos_samples.extend([pos for pos, word
                                    in self.SampleKeys(next_probs, n=n)])

            # draw the final POS from emission, transition, and transition2
            else:
                combinations = product(word_probs['transition'].keys(),
                                       word_probs['emission'].keys(),
                                       word_probs['transition2'].keys())
                final_probs = {
                    emk: sum([word_probs['transition'][trk],
                              word_probs['emission'][emk],
                              word_probs['transition2'][tr2k]])
                    for trk, emk, tr2k in combinations
                    if trk[1] == emk[0] == tr2k[1]
                }
                pos_samples.extend([pos for pos, word
                                    in self.SampleKeys(final_probs, n=n)])

            # assign part-of-speech as most commonly sampled POS occurence
            best_pos = Counter(pos_samples).most_common(1)[0][0]
            predicted_pos.append(best_pos)
        return predicted_pos

    def solve(self, model: str, sentence: tuple):
        """
        This solve() method is called by label.py,
        so you should keep the interface the same,
        but you can change the code itself.
        It should return a list of part-of-speech labelings of the sentence,
        one part of speech per word.
        """
        if model == "Simple":
            return self.simplified(sentence)
        elif model == "HMM":
            return self.hmm_viterbi(sentence)
        elif model == "Complex":
            return self.complex_mcmc(sentence)
        else:
            print("Unknown algorithm!")


In [94]:
import sys

path = '/home/mark/Nextcloud/Local/Mark/GradSchool/Coursework/CSCI_551/margree-jonjhans-a4/Part1/'

def read_data(fname):
    exemplars = []
    file = open(fname, 'r');
    for line in file:
        data = tuple([w.lower() for w in line.split()])
        exemplars += [(data[0::2], data[1::2]), ]
    return exemplars


(train_file, test_file) = (path+'bc.train', path+'bc.test')

print("Learning model...")
solver = Solver()

train_data = read_data(train_file)
solver.train(train_data)

class Score:
    def __init__(self):
        self.word_scorecard = {}
        self.sentence_scorecard = {}
        self.word_count = 0
        self.sentence_count = 0

    def score(self, algo_outputs, gt):
        self.word_count += len(gt)
        self.sentence_count += 1

        for algo, labels in algo_outputs.items():
            correct = 0
            for j in range(0, len(gt)):
                correct += 1 if gt[j] == labels[j] else 0

            self.word_scorecard[algo] = self.word_scorecard.get(algo, 0) + correct
            self.sentence_scorecard[algo] = self.sentence_scorecard.get(algo, 0) + (correct == len(gt))

    def print_scores(self):
        print("\n==> So far scored %d sentences with %d words." % (self.sentence_count, self.word_count))
        print("                   Words correct:     Sentences correct: ")

        for i in sorted(self.word_scorecard):
            print("%18s:     %7.2f%%             %7.2f%%" % (i, self.word_scorecard[i] * 100 / float(self.word_count),
                                                             self.sentence_scorecard[i] * 100 / float(
                                                                 self.sentence_count)))

    @staticmethod
    def print_helper(description, list, sentence):
        print(("%40s" % description) + " " + " ".join(
            [(("%-" + str(max(5, len(sentence[i]))) + "s") % list[i]) for i in range(0, len(list))]))

    @staticmethod
    def print_results(sentence, outputs, posteriors, models):
        Score.print_helper(" ".join([("%7s" % model) for model in models]), sentence, sentence)
        for algo in sorted(outputs.keys()):
            Score.print_helper(algo + "  " + " ".join(
                [("%7.2f" % posteriors[algo][model]) if algo in posteriors else " " * 7 for model in models]),
                               outputs[algo], sentence)

print("Loading test data...")
test_data = read_data(test_file)

print("Testing classifiers...")
scorer = Score()

Algorithms = ("Simple", "HMM", "Complex")
Algorithm_labels = [str(i + 1) + ". " + Algorithms[i] for i in range(0, len(Algorithms))]
# for (s, gt) in sorted(test_data, key=lambda x: len(x[0])):
for (s, gt) in test_data:
    print("Sentence:", s)

    outputs = {"0. Ground truth": gt}

    # run all algorithms on the sentence
    for (algo, label) in zip(Algorithms, Algorithm_labels):
        outputs[label] = solver.solve(algo, s)

        # calculate posteriors for each output under each model
    posteriors = {o: {a: solver.posterior(a, s, outputs[o]) for a in Algorithms} for o in outputs}
    Score.print_results(s, outputs, posteriors, Algorithms)

    scorer.score(outputs, gt)
    scorer.print_scores()

    print("----")

Learning model...
Loading test data...
Testing classifiers...
Sentence: ('at', 'the', 'same', 'instant', ',', 'nick', 'hit', 'the', 'barrel', 'and', 'threw', 'himself', 'upon', 'the', 'smaller', 'man', '.')
Simple Model complete!
Hidden Markov Model complete!
Monte Carlo sampling the Complex Markov Chain...


100%|█████████████████████████████████████████████████████████| 17/17 [00:07<00:00,  2.40it/s]


                  Simple     HMM Complex at    the   same  instant ,     nick  hit   the   barrel and   threw himself upon  the   smaller man   .    
0. Ground truth   -87.92 -115.81 -245.25 adp   det   adj   noun    .     noun  verb  det   noun   conj  verb  pron    adp   det   adj     noun  .    
      1. Simple   -87.22 -120.48 -257.37 adp   det   adj   noun    .     x     verb  det   noun   conj  verb  pron    adp   det   adj     noun  .    
         2. HMM   -87.75 -114.89 -245.43 adp   det   adj   noun    .     pron  verb  det   noun   conj  verb  pron    adp   det   adj     noun  .    
     3. Complex   -87.92 -115.81 -245.25 adp   det   adj   noun    .     noun  verb  det   noun   conj  verb  pron    adp   det   adj     noun  .    

==> So far scored 1 sentences with 17 words.
                   Words correct:     Sentences correct: 
   0. Ground truth:      100.00%              100.00%
         1. Simple:       94.12%                0.00%
            2. HMM:       94.12%      

100%|█████████████████████████████████████████████████████████| 21/21 [00:08<00:00,  2.42it/s]


                  Simple     HMM Complex the   gun   fired next  to    his   ear   with  a     vicious whoosh like  the   first stroke of    an    old   steam engine .    
0. Ground truth  -123.99 -155.46 -334.75 det   noun  verb  adp   adp   det   noun  adp   det   adj     prt    adp   det   adj   noun   adp   det   adj   noun  noun   .    
      1. Simple  -120.03 -152.90 -333.52 det   noun  verb  adj   prt   det   noun  adp   det   adj     .      adp   det   adj   noun   adp   det   adj   noun  noun   .    
         2. HMM  -121.22 -147.57 -321.80 det   noun  verb  adj   adp   det   noun  adp   det   adj     noun   adp   det   adj   noun   adp   det   adj   noun  noun   .    
     3. Complex  -121.22 -147.57 -321.80 det   noun  verb  adj   adp   det   noun  adp   det   adj     noun   adp   det   adj   noun   adp   det   adj   noun  noun   .    

==> So far scored 2 sentences with 38 words.
                   Words correct:     Sentences correct: 
   0. Ground truth:      100.00%    

 25%|██████████████▌                                           | 8/32 [00:03<00:11,  2.11it/s]


KeyboardInterrupt: 

In [11]:
sentence = ("the", "cat", "is", "green", "although", "it's", "eyes", "are", "blue")

# get all the emission and transition probabilities for each word in the sentence
# this is the slowest part of this method.
# The emissions items are huge so it takes time to filter them for individual words
all_probs = {}
for i in range(len(sentence)):
    word_probs = {}
    word_probs['emission'] = {k: v for k, v in solver.emissions.items()
                              if k[1] == sentence[i]}
    if i == 0:
        word_probs['initial'] = {k: v for k, v in solver.initial.items()}
    if i > 0:
        word_probs['transition'] = {k: v for k, v in solver.transitions.items()}
    if i != range(len(sentence))[-1]:
        word_probs['emission2'] = {k: v for k, v in solver.emission2s.items()
                                   if k[1] == sentence[i+1]}
    if i > 1:
        word_probs['transition2'] = {k: v for k, v in solver.transition2s.items()}

    all_probs[i] = word_probs
# set gibbs sampling iterations
n = 2500
# initiate dict for storing the samples
all_samples = {i: [] for i in range(len(sentence))}
# draw probability distribution for POS initial state
initial_probs = {}
for k1, v1 in all_probs[0]['emission'].items():
    for k2, v2 in all_probs[0]['emission2'].items():
        if k1[0] == k2[0]:
            initial_probs[k1] = v1 * v2 * all_probs[0]['initial'][k1[0]]
initial_probs = initial_probs if len(initial_probs) > 0 else {t: 1e-6 for t in solver.tag_list}
# for each sample parse the sentence to draw a state
# from the complex markov chain
for m in range(0, n):
    for i in range(len(sentence)):
        if i == 0:  # draw the first POS from the emission and the initial
            all_samples[i].append(SampleKeys(initial_probs, n=1)[0][0])
            continue
        # calculate transition, emission, and emission2 probabilities for 2nd+ POS
        tr = {k: v for k, v in all_probs[i]['transition'].items()
              if k[0] == all_samples[i-1][m]}
        tr = tr if len(tr) > 0 else {(all_samples[i-1][m], t): 1e-6 for t in solver.tag_list}

        em = {k: v for k, v in all_probs[i]['emission'].items()}
        em = em if len(em) > 0 else {(t, sentence[i]): 1e-6 for t in solver.tag_list}

        if i != range(len(sentence))[-1]:
            em2 = {k: v for k, v in all_probs[i]['emission2'].items()}
            em2 = em2 if len(em2) > 0 else {(t, sentence[i+1]): 1e-6 for t in solver.tag_list}
        if i == 1:  # draw the second POS from the emission, transition, and emission2
            next_prob = {
                emk: tr[trk] * em[emk] * em2[em2k]
                for trk, emk, em2k in product(tr.keys(), em.keys(), em2.keys())
                if trk[1] == emk[0] == em2k[0]
            }
        elif i != range(len(sentence))[-1]:  # draw the remaining POS from the emission, transition, emission2, and transition2
            tr2 = {k: v for k, v in all_probs[i]['transition2'].items()
                   if k[0] == all_samples[i-2][m]}
            tr2 = tr2 if len(tr2) > 0 else {(all_samples[i-2][m], t): 1e-6 for t in solver.tag_list}
            next_prob = {
                emk: tr[trk] * em[emk] * tr2[tr2k] * em2[em2k]
                for trk, emk, tr2k, em2k in product(tr.keys(), em.keys(), tr2.keys(), em2.keys())
                if trk[1] == emk[0] == tr2k[1] == em2k[0]
            }
        else:  # draw the remaining POS from the emission, transition, emission2, and transition2
            tr2 = {k: v for k, v in all_probs[i]['transition2'].items()
                   if k[0] == all_samples[i-2][m]}
            tr2 = tr2 if len(tr2) > 0 else {(all_samples[i-2][m], t): 1e-6 for t in solver.tag_list}
            next_prob = {
                emk: tr[trk] * em[emk] * tr2[tr2k]
                for trk, emk, tr2k in product(tr.keys(), em.keys(), tr2.keys())
                if trk[1] == emk[0] == tr2k[1]
            }
        all_samples[i].append(SampleKeys(next_prob, n=1)[0][0])
# get the POS pattern which was drawn the most - the max marginal
max_marginal = list(max(
    Counter(map(tuple, zip(*all_samples.values()))).items(),
    key=lambda x: x[1]
)[0])
print(max_marginal)

['det', 'noun', 'verb', 'adj', 'adp', 'det', 'noun', 'verb', 'adj']


In [12]:
{k: v * all_probs[0]['initial'][k[0]]
 for k, v in all_probs[0]['emission'].items()}

{('det', 'the'): 0.034302742430860116,
 ('x', 'the'): 1.4880481566396015e-06,
 ('pron', 'the'): 2.241926758097272e-06,
 ('prt', 'the'): 9.715436000802033e-07,
 ('adp', 'the'): 2.923559709831986e-07,
 ('.', 'the'): 1.4385498517437183e-07,
 ('noun', 'the'): 9.790676110349678e-08,
 ('adj', 'the'): 2.0711127937399897e-07,
 ('adv', 'the'): 1.0194848343522371e-06,
 ('verb', 'the'): 7.313841922004554e-08,
 ('num', 'the'): 1.074573795135696e-06,
 ('conj', 'the'): 8.932585012427945e-07}

In [13]:
i=1

{k: v for k, v in all_probs[i]['emission2'].items()}

{('.', 'is'): 0.003953360356302857,
 ('det', 'is'): 0.005802986720811165,
 ('noun', 'is'): 0.016782116092255554,
 ('verb', 'is'): 0.001028478947505808,
 ('adv', 'is'): 0.0020910496401837456,
 ('pron', 'is'): 0.02488804793440555,
 ('prt', 'is'): 0.008125877225382286,
 ('conj', 'is'): 0.0019270833333333334,
 ('num', 'is'): 0.0008833772190605623,
 ('adp', 'is'): 0.0003013489798450712,
 ('adj', 'is'): 0.0009404946493480085,
 ('x', 'is'): 0.00023656422718768146}

In [None]:
print("Loading test data...")
test_data = read_data(test_file)

print("Testing classifiers...")
scorer = Score()

Algorithms = ("Simple", "HMM", "Complex")
Algorithm_labels = [str(i + 1) + ". " + Algorithms[i] for i in range(0, len(Algorithms))]
for (s, gt) in test_data:

    outputs = {"0. Ground truth": gt}

    # run all algorithms on the sentence
    for (algo, label) in zip(Algorithms, Algorithm_labels):
        outputs[label] = solver.solve(algo, s)

        # calculate posteriors for each output under each model
    posteriors = {o: {a: solver.posterior(a, s, outputs[o]) for a in Algorithms} for o in outputs}
    Score.print_results(s, outputs, posteriors, Algorithms)

    scorer.score(outputs, gt)
    scorer.print_scores()

    print("----")

In [None]:
%%time

from collections import Counter
from itertools import product
import random


# this function gets random keys as given by the probabilities (values)
def SampleKeys(dictionary, n):
    keys = list(dictionary.keys())
    probabilities = list(dictionary.values())
    sampled_keys = random.choices(keys, probabilities, k=n)
    return sampled_keys

sentence = ("the", "cat", "is", "black", "although", "it's", "eyes", "are", "yellow", ".")

# get all the emission and transition probabilities for each word in the sentence
all_probs = {}
for i in range(0, len(sentence)):
    word = sentence[i]
    word_probs = {}
    word_probs['emission'] = {k: v for k, v in solver.emissions.items()
                              if k[1] == word}
    if i == 0:
        word_probs['initial'] = {k: v for k, v in solver.initial.items()}
    if i > 0:
        word_probs['transition'] = {k: v for k, v in solver.transitions.items()}
        word_probs['emission2'] = {k: v for k, v in solver.emission2s.items()
                                   if k[1] == word}
    if i > 1:
        word_probs['transition2'] = {k: v for k, v in solver.transition2s.items()}
    all_probs[i] = word_probs


# set gibbs sampling iterations
n = 2000
# initiate dict for storing the samples
all_samples = {i: [] for i in range(0, len(sentence))}
# draw probability distribution for initial state
initial_probs = {k: v * all_probs[0]['initial'][k[0]]
                 for k, v in all_probs[0]['emission'].items()}
# for each sample parse the sentence to draw a state
# from the complex markov chain
for m in range(0, n):
    for i in range(0, len(sentence)):
        if i == 0:
            all_samples[i].append(SampleKeys(initial_probs, n=1)[0][0])
            continue
        em2 = [v for k, v in all_probs[i]['emission2'].items()
               if k[0] == all_samples[i-1][m]][0]
        tr = {k: v for k, v in all_probs[i]['transition'].items()
              if k[0] == all_samples[i-1][m]}
        em = {k: v for k, v in all_probs[i]['emission'].items()}
        if i == 1:
            next_prob = {
                emk: tr[trk] * em[emk] * em2
                for trk, emk in product(tr.keys(), em.keys())
                if trk[1] == emk[0]
            }
        else:
            tr2 = {k: v for k, v in all_probs[i]['transition2'].items()
                   if k[0] == all_samples[i-2][m]}
            next_prob = {
                emk: tr[trk] * em[emk] * tr2[tr2k] * em2
                for trk, emk, tr2k in product(tr.keys(), em.keys(), tr2.keys())
                if trk[1] == emk[0] == tr2k[1]
            }
        all_samples[i].append(SampleKeys(next_prob, n=1)[0][0])

list(max(Counter(map(tuple, zip(*all_samples.values()))).items(),key=lambda x: x[1])[0])

CPU times: user 6.95 s, sys: 3.26 ms, total: 6.95 s
Wall time: 7.04 s


['det', 'noun', 'verb', 'adj', 'adp', 'prt', 'noun', 'verb', 'adj', '.']

In [None]:
solver.__dict__.keys()

dict_keys(['emissions', 'transitions', 'emission2s', 'transition2s', 'word_list', 'tag_list', 'initial'])