# Hidden Markov Model

In [28]:
import collections
import itertools
import random
import math
from sys import float_info
from collections import defaultdict
from pathlib import Path
from operator import itemgetter
from pprint import pprint

## Constants

In [29]:
DATA_ROOT = Path("data")
DATASETS = ["SG", "CN", "EN", "AL"]

## Helper Functions

In [57]:
def load_dataset(path, split=True, shuffle=False):
    """
    Load a dataset from a specified path.
    
    Args:
        path: The path to read the data from
        split (bool): Whether to split labels from each line of data
    """
    with open(path) as f:
        sequences = [sent.split("\n") for sent in f.read().split("\n\n")][:-1]
    if shuffle:
        random.shuffle(sequences)
    if split:
        sequences = [[pair.split() for pair in seq] for seq in sequences]
        sequences = [[[pair[i] for pair in s] for s in sequences] for i in [0, 1]]
    return sequences


def pairwise(sequence, include_start_stop=True):
    """
    Rolling window over iterable (with offset=1 and window_size=2)

    Args:
        sequence: The iterable to window over
        include_start_stop (bool): If True, adds START & STOP are added to either end of output
        
    Examples:
        >>> pairwise([1, 2, 3], include_start_stop=True)
        [("START", 1), (1, 2), (2, 3), (3, "STOP")]

        >>> pairwise([1, 2, 3, 4], include_start_stop=False)
        [(1, 2), (2, 3), (3, 4)]
    """
    a, b = itertools.tee(sequence)
    next(b)
    pairs = zip(a, b)
    if include_start_stop:
        pairs = itertools.chain([("START", sequence[0])], pairs, [(sequence[-1], "STOP")])
    return pairs


def flatten(sequences):
    """
    Flatten a nested sequence
    """
    return itertools.chain.from_iterable(sequences)


def count(sequences, as_probability=False):
    """
    Get a dictionary of word-count pairs in a dataset.

    Args:
        sequences: The sequence (or collection of sequences) of words to count
        as_probability (bool): Whether to return the counts as probabilties (over the entire dataset)
    """
    counts = dict(collections.Counter(flatten(sequences)))
    if as_probability:
        counts = {k: v / sum(counts.values()) for k, v in counts.items()}
    return counts


def smooth(inputs, thresh):
    """
    Replace tokens appearing less than `thresh` times with a "#UNK#" token.

    Args:
        inputs: The collection of sequences to smooth
        thresh (bool): The minimum number of occurrences required for a word to not be replaced
    """
    inputs = list(inputs)
    to_replace = {k for k, v in count(inputs, as_probability=False).items() if v < thresh}
    return [["#UNK#" if x in to_replace else x for x in sub] for sub in inputs]


def clean_inputs(inputs, emission):
    """
    For each token in the given inputs, replace it with "#UNK#" if it doesn't appear in the emission probability list.
    """
    return [[x if x in emission else "#UNK#" for x in sub] for sub in inputs]


def get_token_map(sequences):
    """
    Get token_to_id and id_to_token maps from a collection of sequences
    """
    tokens = set(flatten(sequences))
    return {token: i for i, token in enumerate(tokens)}


def encode_numeric(sequences):
    """
    Encode a collection of token sequences as numerical values
    """
    token_map = get_token_map(sequences)
    return [[token_map[token] for token in sequence] for sequence in sequences], token_map


def decode_numeric(sequences, token_map):
    """
    Decode a collection of token ID sequences to tokens
    """
    token_map = {i: token for i, token in token_map.items()}  # Reverse token map
    return [[token_map[val] for val in sequence] for sequence in sequences]


def pprint_dict(d, max_entires=40):
    pprint(dict(itertools.islice(d.items(), max_entires)))

## Part 3

### Emission Parameters

In [96]:
def get_emission_parameters(inputs, outputs):
    """
    Estimate emission paramters from a collection of input-output pairs 
    """
    n_inputs = max(flatten(inputs)) + 1  # Input space size
    n_outputs = max(flatten(outputs)) + 1  # Output space size
    emission_matrix = [[0 for _ in range(n_outputs)] for _ in range(n_inputs)]

    for inp, out in zip(inputs, outputs):
        for i, o in zip(inp, out):
            emission_matrix[i][o] += 1

    for i in range(n_inputs):
        row_sum = sum(emission_matrix[i])
        for j in range(n_outputs):
            emission_matrix[i][j] /= row_sum

    return emission_matrix

In [None]:
for dataset in DATASETS:
    print(f"Dataset: {dataset}")
    features, labels = load_dataset(f"data/{dataset}/train")
    features = smooth(features, 3)  # Input feature smoothing

    # Numerically encode dataset
    feature_ids, feature_map = encode_numeric(features)
    label_ids, label_map = encode_numeric(labels)

    # Calculate Emission Parameters
    emission_parameters = get_emission_parameters(feature_ids, label_ids)
    print(emission_parameters[:10])

Dataset: SG
[[0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.3333333333333333, 0.0, 0.3333333333333333, 0.3333333333333333], [0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.1, 0.5, 0.0, 0.1, 0.3], [0.0, 0.0, 0.3333333333333333, 0.0, 0.0, 0.3333333333333333, 0.3333333333333333], [0.2, 0.0, 0.0, 0.1, 0.0, 0.5, 0.2], [0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0], [0.0, 0.10526315789473684, 0.0, 0.10526315789473684, 0.0, 0.7894736842105263, 0.0], [0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0]]
Dataset: CN


### Transition Parameters

In [6]:
def get_transition_parameters(sequences):
    """
    Estimate transition paramters from a collection of sequences
    """
    transitions = defaultdict(lambda: defaultdict(int))  # Number of transitions along each edge
    for sequence in sequences:
        for origin, dest in pairwise(sequence, include_start_stop=True):
            transitions[origin][dest] += 1
    transition_parameters = defaultdict(lambda: defaultdict(float), {
        origin: defaultdict(float, {
            dest: count / sum(dests.values()) for dest, count in dests.items()
        }) for origin, dests in transitions.items()
    })
    return transition_parameters

*Not sure if I should pass `features` or `labels` to `get_transition_probabilities`...*

In [104]:
for dataset in DATASETS:
    print(f"Dataset: {dataset}")
    features, labels = load_dataset(f"data/{dataset}/train")
    transition_parameters = get_transition_parameters(labels)
    pprint_dict(transition_parameters)

Dataset: SG
{'B-negative': defaultdict(<class 'float'>,
                           {'B-negative': 0.004140786749482402,
                            'B-neutral': 0.002329192546583851,
                            'I-negative': 0.28442028985507245,
                            'O': 0.681935817805383,
                            'STOP': 0.02717391304347826}),
 'B-neutral': defaultdict(<class 'float'>,
                          {'B-negative': 0.00024764735017335313,
                           'B-neutral': 0.010599306587419515,
                           'B-positive': 0.0011391778107974245,
                           'I-neutral': 0.47206537890044575,
                           'O': 0.4958395245170877,
                           'STOP': 0.020108964834076277}),
 'B-positive': defaultdict(<class 'float'>,
                           {'B-neutral': 0.002011802575107296,
                            'B-positive': 0.00536480686695279,
                            'I-positive': 0.4134924892703863,
     

### Viterbi Algorithm

In [105]:
def log(x):
    return math.log(x) if x else -float('inf')


def viterbi(observations, transition, emission):
    label_space = transition.keys()
    n = len(observations)
    path = defaultdict(lambda: defaultdict(tuple))

    # Base case
    for label in label_space:
        path[0][label] = (log(0), None)
        emit_prob = emission[observations[0]][label]
        trans_prob = transition["START"][label]
        path[1][label] = (log(trans_prob) + log(emit_prob), "START")
    path[0]["START"] = (log(1), None)

    path = {origin: {dest: count for dest, count in dests.items()} for origin, dests in path.items()}
    print(path)

In [106]:
for dataset in DATASETS:
    print(f"Dataset: {dataset}")
    features, labels = load_dataset(f"data/{dataset}/train")
    feature_counts = count(features)
    label_counts = count(labels)
    emission_parameters = get_emission_parameters(features, labels)
    transition_parameters = get_transition_parameters(labels)

    dev_features = load_dataset(f"data/{dataset}/dev.in", split=False)
    smoothed_dev_features = smooth(dev_features)
    print(dev_features[0])
    predicted_dev_labels = viterbi(dev_features[0], transition_parameters, emission_parameters)

Dataset: SG
['Tour', 'Scotland', 'followers', 'to', 'visit', '@beley_crysta2', ',', '@MyExpatJob', ',', ',', '@paridise15', ',', '@JCBlakeney76', ',', '@UnwearyWorld', ',', '@szilviade', ',', '@garrywatts1231']
{0: {'START': (0.0, None), 'O': (-inf, None), 'B-positive': (-inf, None), 'I-positive': (-inf, None), 'B-neutral': (-inf, None), 'I-neutral': (-inf, None), 'B-negative': (-inf, None), 'I-negative': (-inf, None)}, 1: {'START': (-inf, 'START'), 'O': (-0.3760144579374646, 'START'), 'B-positive': (-5.808641770632523, 'START'), 'I-positive': (-inf, 'START'), 'B-neutral': (-inf, 'START'), 'I-neutral': (-inf, 'START'), 'B-negative': (-inf, 'START'), 'I-negative': (-inf, 'START')}}
Dataset: CN
['一', '觉醒', '来天', '都', '黑', '了', '。', '梦到', '了', '继科', '哥哥', '，', '给', '他', '做饭', '忙', '得', '团团转', '，', '继科', '哥哥', '很乖', '，', '不怎么', '爱', '说话', '。']
{0: {'START': (0.0, None), 'O': (-inf, None), 'B-neutral': (-inf, None), 'I-neutral': (-inf, None), 'B-positive': (-inf, None), 'I-positive': (-inf,