Import libraries and read data

In [26]:
import numpy as np
import pandas as pd
import random

obs = pd.read_csv('data/observationProbs.csv', index_col=0)
test = pd.read_csv('data/testData.csv', index_col=0)
trans = pd.read_csv('data/transitionProbs.csv', index_col=0)

## Part 1: Viterbi Algorithm

In [24]:
def viterbi(transitions, emissions, priors, events):
    # Get dimensions of transitions and fill tables
    T = len(np.trim_zeros(events))
    K = transitions.shape[0]
    M = np.empty((T, K), 'd')
    C = np.empty((T, K), 'B')
    
    # Initialize table using START probabilities
    M[0,:] = priors * emissions[events[0] - 1, :]
    C[0,:] = 0
    
    # Iterate over samples from testData and calculate next values
    for i in range(1, T):
        M[i, :] = np.max(M[i - 1, :] * transitions * emissions[events[i] - 1, :], 1)
        C[i, :] = np.argmax(M[i - 1, :] * transitions, 1)
    
    # Find last hidden state using table
    X = np.empty(T, 'B')
    X[-1] = np.argmax(M[T-1, :])
    
    # Go back through samples in reverse and find most likely hidden states
    for i in reversed(range(1, T)):
        X[i - 1] = C[i, X[i]]
    return X

#### Prep data for algorithm

In [3]:
priors = trans['START'].to_numpy()
priors = priors[np.arange(priors.size - 1)]

transitions = trans.to_numpy()
transitions = np.delete(transitions, transitions.shape[0] - 1, 0)
transitions = np.delete(transitions, transitions.shape[1] - 1, 1)
emissions = obs.to_numpy()
sequences = test.to_numpy()

#### Input all sequences into algorithm and get most likely hidden states

In [30]:
viterbi_results = {}

for idx, events in enumerate(sequences):
    viterbi_results[idx] = viterbi(transitions, emissions, priors, events)
    print(viterbi_results[idx])
    

[1 1 1 1 1]
[1 1 1 1]
[0 0 0 0 0]
[0 0 0]
[0 0 0 0 0]
[0 0 0 0]
[1 1 1]
[0 0 0 0]
[0 0 1 1 1]
[0 0 0]


0 = C

1 = H

## Part 2: Likelihood Weighting

In [53]:
def eisner_one_sample_gen(length):
    rand = random.random()
    sample = (0 if rand < .5 else 1,)
    weight = .5
    threshold = .86 / .93
    for i in range(1, length):
        rand = random.random()
        sample += (sample[i-1] if rand < threshold else 1 - sample[i-1],)
        weight *= threshold if rand < threshold else 1 - threshold
    weight *= .07
    return weight, sample

def gen_lw_samples(events, num_samples):
    samples = []
    for i in range(num_samples):
        samples.append(eisner_one_sample_gen(len(events)))
    return samples

def get_most_likely(samples):
    sample_weights = {}
    for weight, sample in samples:
        if sample in sample_weights:
            sample_weights[sample] += weight
        else:
            sample_weights[sample] = weight
    return max(sample_weights, key=sample_weights.get)

for idx, events in enumerate(sequences):
    trimmed = np.trim_zeros(events)
    samples = gen_lw_samples(trimmed, 75000)
    print(get_most_likely(samples))

(1, 1, 1, 1, 1)
(1, 1, 1, 1)
(0, 0, 0, 0, 0)
(1, 1, 1)
(1, 1, 1, 1, 1)
(1, 1, 1, 1)
(0, 0, 0)
(0, 0, 0, 0)
(0, 0, 0, 0, 0)
(0, 0, 0)


0 = C

1 = H

The sampler will not converge because we are not supplying any information about the actual test
data to the sampler, apart from the sequence length. For each sequence, the sampler will converge
to either H -> H -> ... or C -> C -> ... because after a 50/50 chance of picking hot or cold for
Day 1, the heights weights will be for the repetitive sequence, so it comes down to which sequence
occurs more in the generated sample, which is random.

The likelihood weight may match our Viterbi answer sometimes when Viterbi expects all hot or all cold.