In [60]:
import torch
import numpy as np
torch.set_printoptions(precision=10)

device = torch.device('cuda:2' if torch.cuda.is_available() else 'cpu')

In [17]:
train_file: str = 'data/train.tsv'
test_file: str = ''
val_split: float = 0.99

states: dict = {
    '0': 0,
    '1': 0,
}

In [5]:
def load_data(file: str) -> list:
    print("Loading data from file {}...".format(file))
    file = open(file, 'r')
    data = []
    for line in file:
        pieces = line.rstrip("\n").split("\t")
        data.append(pieces)
    print("Loaded {} sentences".format(len(data)))
    return data

In [18]:
train_data: list = load_data(train_file)

Loading data from file data/train.tsv...
Loaded 8368167 sentences


In [19]:
if len(test_file) > 0:
    test_data: list = load_data(test_file)
print("Splitting data...")
num_train_samples: int = int(len(train_data)*(1-val_split))
val_data: list = train_data[num_train_samples:]
print(len(val_data), " validation characters")
train_data: list = train_data[:num_train_samples]
print(len(train_data), " training characters")

Splitting data...
8284486  validation characters
83681  training characters


In [7]:
print(train_data[0])

['時', '0']


In [20]:
def compute_probabilities_from_counts(counts_dict: dict) -> dict:
    counts_sum: int = sum(counts_dict.values())
    probabilities_dict: dict = {}
    for count_id in counts_dict:
        count = counts_dict[count_id]
        probabilities_dict[count_id] = count / counts_sum
    assert round(sum(probabilities_dict.values()), 2) == 1.0, "All probabilities should sum to 1 but got {}".format(round(sum(probabilities_dict.values()), 2))
    return probabilities_dict

In [21]:
def key_with_max_val(d: dict) -> str:
    """https://stackoverflow.com/questions/268272/getting-key-with-maximum-value-in-dictionary"""
    v = list(d.values())
    k = list(d.keys())
    return k[v.index(max(v))]

In [22]:
def generate_initial_state_probabilities(data: list) -> dict:
    initial_state_counts: dict = states.copy()
    initial_state: str = data[0][1]
    if initial_state in initial_state_counts:
        initial_state_counts[initial_state] += 1
    initial_state_probabilities: dict = compute_probabilities_from_counts(initial_state_counts)
    return initial_state_probabilities

In [38]:
def generate_transition_state_probabilities(data: list) -> dict:
    # create a dictionary with two levels, the first being the previous state and the second being the current state
    transition_state_counts: dict = {state: states.copy() for state in states}
    # since we enumerate over a list that excludes the first item, the enumeration index is one behind
    for prev_idx, word in enumerate(data[1:]):
        prev_state: str = data[prev_idx][1]
        current_state: str = word[1]
        if prev_state in transition_state_counts and current_state in transition_state_counts[prev_state]:
            transition_state_counts[prev_state][current_state] += 1
    for state in transition_state_counts:
        transition_state_counts[state]['STOP'] = 0
    transition_state_counts[current_state]['STOP'] += 1
    transition_state_probabilities: dict = {state: {} for state in states}
    for prev_state in transition_state_counts:
        transition_state_probabilities[prev_state] = compute_probabilities_from_counts(transition_state_counts[prev_state])
    return transition_state_probabilities

In [56]:
def generate_emission_probabilities(data: list, sequence: list) -> dict:
    all_observations: dict = {obs: 0 for obs in set(sequence)}
    emission_counts_by_state: dict = {state: all_observations for state in states}
    for word_state_pair in data:
        word, state = word_state_pair
        if state in emission_counts_by_state:
            # initialize word in state dict if the first occurrence of word X in state Y
            if word not in emission_counts_by_state[state]:
                emission_counts_by_state[state][word] = 0
            emission_counts_by_state[state][word] += 1
    emission_probabilities_by_state: dict = {state: {} for state in states}
    for state in emission_counts_by_state:
        emission_probabilities_by_state[state] = compute_probabilities_from_counts(emission_counts_by_state[state])
    return emission_probabilities_by_state

In [57]:
def fit(data: list, sequence: list) -> tuple:
        print("Fitting model to provided dataset...")
        initial_state_probabilities = generate_initial_state_probabilities(data)
        transition_probabilities = generate_transition_state_probabilities(data)
        emission_probabilities = generate_emission_probabilities(data, sequence)
        print("Model ready.")
        return initial_state_probabilities, transition_probabilities, emission_probabilities

In [77]:
def get_forward_prob(observations: list) -> dict:
    forward_probabilities: dict = {}
    for idx, observation in enumerate(observations):
        for state in initial:
            if idx == 0:
                probability = initial[state]*emission[state][observation] if observation in emission[state] else 0
                forward_probabilities[state] = torch.DoubleTensor([probability])
            else:
                probability = 0
                for prev_state in transitions:
                    probability += forward_probabilities[prev_state][idx-1]*transitions[prev_state][state]
                probability *= emission[state][observation]
                forward_probabilities[state] = torch.cat((forward_probabilities[state], torch.DoubleTensor([probability])))
    return forward_probabilities

In [78]:
def get_backward_prob(observations: list) -> dict:
    backward_probabilities: dict = {}
    for i in range(len(observations)):
        idx = len(observations)-i-1
        for state in initial:
            if idx == len(observations)-1:
                probability = transitions[state]['STOP']
                backward_probabilities[state] = torch.DoubleTensor([probability])
            else:
                probability = 0
                for next_state in transitions:
                    probability += backward_probabilities[state][0]*transitions[state][next_state]*emission[next_state][observations[idx+1]]
                backward_probabilities[state] = torch.cat((torch.DoubleTensor([probability]), backward_probabilities[state]))
    return backward_probabilities

In [79]:
def get_state_total_prob(forward_prob: dict, backward_prob: dict) -> dict:
    state_total_prob: dict = {}
    for state in forward_prob:
        state_total_prob[state] = forward_prob[state] * backward_prob[state]
    return state_total_prob

In [80]:
def get_total_prob(state_total_prob: dict, observations: list) -> torch.DoubleTensor:
    combined_state_totals = [state_total_prob[state] for state in state_total_prob]
    return torch.stack(combined_state_totals, dim=0).sum(dim=0)

In [81]:
def get_new_state_prob(state_total_prob: dict, total_prob: torch.DoubleTensor) -> dict:
    new_state_prob: dict = {}
    for state in state_total_prob:
        new_state_prob[state] = state_total_prob[state] / total_prob
    return new_state_prob

In [82]:
def get_observation_state_prob(new_state_prob: dict, observations: list) -> dict:
    observation_state_prob: dict = {}
    for state in new_state_prob:
        observation_state_prob[state] = {}
        for tag in set(observations):
            observation_state_prob[state][tag] = torch.DoubleTensor([])
            for idx, observation in enumerate(observations):
                probability = new_state_prob[state][idx] if observation == tag else 0
                observation_state_prob[state][tag] = torch.cat((observation_state_prob[state][tag], torch.DoubleTensor([probability])))
    return observation_state_prob

In [83]:
def get_transition_state_prob(forward_prob: dict, backward_prob: dict, total_prob: torch.DoubleTensor, observations: list) -> dict:
    transition_state_prob: dict = {}
    for prev_state in emission:
        transition_state_prob[prev_state] = {}
        for state in emission:
            transition_state_prob[prev_state][state] = torch.DoubleTensor([])
            for i, observation in enumerate(observations[1:]):
                idx = i + 1
                probability = forward_prob[prev_state][idx-1]*backward_prob[state][idx]
                probability *= transitions[prev_state][state]*emission[state][observation]
                probability /= total_prob[idx]
                transition_state_prob[prev_state][state] = torch.cat((transition_state_prob[prev_state][state], torch.DoubleTensor([probability])))
    return transition_state_prob

In [84]:
def get_emissions(observation_state_prob: dict, new_state_prob: dict) -> dict:
    emissions = {}
    for state in new_state_prob:
        emissions[state] = {}
        for tag in observation_state_prob[state]:
            emissions[state][tag] = torch.sum(observation_state_prob[state][tag])/torch.sum(new_state_prob[state])
    return emissions

In [85]:
def get_transitions(transition_state_prob: dict, new_state_prob: dict) -> dict:
    transitions = {}
    for prev_state in transition_state_prob:
        transitions[prev_state] = {}
        for state in transition_state_prob:
            transitions[prev_state][state] = torch.sum(transition_state_prob[prev_state][state])/torch.sum(new_state_prob[state])
        transitions[prev_state]['STOP'] = new_state_prob[prev_state][-1]/torch.sum(new_state_prob[prev_state])
    return transitions

In [86]:
def get_init(new_state_prob: dict) -> dict:
    init = {}
    for state in new_state_prob:
        init[state] = new_state_prob[state][0]
    return init

In [89]:
def iterate(observations: list):
    forward: dict = get_forward_prob(observations)
    print('A')
    backward: dict = get_backward_prob(observations)
    print('B')
    state_total: dict = get_state_total_prob(forward_prob, backward_prob)
    print('C')
    total: torch.DoubleTensor = get_total_prob(state_total_prob, observations)
    print('D')
    new_state: dict = get_new_state_prob(state_total_prob, total_prob)
    print('E')
    observation_state: dict = observation_state_prob(new_state_prob, observations)
    print('F')
    transition_state: dict = transition_state_prob(forward_prob, backward_prob, total_prob, observations)
    emissions = get_emissions(observation_state_prob, new_state_prob)
    transitions = get_transitions(transition_state_prob, new_state_prob)
    init = get_init(new_state_prob)
    return init, transitions, emissions

In [90]:
val_sequence = [pair[0] for pair in val_data]
initial, transitions, emission  = fit(train_data, val_sequence)
iterate(val_sequence[:1000])
print(e.transitions)

Fitting model to provided dataset...
Model ready.
A
B


TypeError: 'function' object is not iterable

In [66]:
init = {'H': 0.5, 'C': 0.5}
emit = {'H': {'1': 0.1, '2': 0.2, '3': 0.7},
        'C': {'1': 0.7, '2': 0.2, '3': 0.1}}
transit = {'H': {'H': 0.8, 'C': 0.1, 'STOP': 0.1},
           'C': {'H': 0.1, 'C': 0.8, 'STOP': 0.1}}
e = SemiSupervisedHMM(init, emit, transit)
i = e.iterate(['2', '3', '3', '2', '3', '2', '3', '2', '2', '3', '1', '3', '3', '1', '1', '1', 
                 '2', '1', '1', '1', '3', '1', '2', '1', '1', '1', '2', '3', '3', '2', '3', '2', '2'])

# for c, y in enumerate(i['H']['C']):
#     print(c+1, y.item())
# for c, y in enumerate(i['C']['H']):
#     print(c+1, y.item())

ValueError: too many dimensions 'str'

In [None]:
class Node:
    def __init__(self, state: str, probability: float, back_pointer):
        self.back_pointer: Node = back_pointer
        self.state: str = state
        self.probability: float = probability


def keys_match(dict_a: dict, dict_b: dict) -> bool:
    return dict_a.keys() == dict_b.keys()


def key_with_max_val(d: dict) -> str:
    """https://stackoverflow.com/questions/268272/getting-key-with-maximum-value-in-dictionary"""
    v = list(d.values())
    k = list(d.keys())
    return k[v.index(max(v))]


def node_with_max_prob(d: dict) -> Node:
    max_node = Node(None, 0.0, None)
    for node in d.values():
        if node.probability > max_node.probability:
            max_node = node
    return max_node


class Viterbi:
    def __init__(self, initial_probabilities: dict, emission_probabilities: dict, transition_probabilities: dict):
        assert keys_match(initial_probabilities, emission_probabilities) and\
               keys_match(initial_probabilities, transition_probabilities), "Hidden states must be consistent!"
        self.initial = initial_probabilities
        self.emission = emission_probabilities
        self.transitions = transition_probabilities

    def predict_path(self, observations: list) -> list:
        matrix: list = [{}]

        for state in self.initial:
            matrix[0][state] = Node(state, self.initial[state]*self.emission[state][observations[0]], None)

        # fill initial probabilities
        for prev_idx, observation in enumerate(observations[1:]):
            matrix.append({})
            for state in self.transitions:
                transitions: dict = {}
                for prev_state in matrix[prev_idx]:
                    prev_prob = matrix[prev_idx][prev_state].probability
                    transition_prob = self.transitions[prev_state][state]*prev_prob
                    transitions[prev_state] = transition_prob
                last_state = key_with_max_val(transitions)
                probability = self.emission[state][observation]*transitions[last_state]
                matrix[prev_idx+1][state] = Node(state, probability, matrix[prev_idx][last_state])

        current_node: Node = node_with_max_prob(matrix[-1])
        sequence: list = []
        while current_node is not None:
            sequence.insert(0, current_node.state)
            current_node = current_node.back_pointer

        return sequence