### Celtic Mutations -- From Scratch
### Hidden Markov Model

In [66]:
train_file: str = 'data/train.tsv'
test_file: str = ''
val_split: float = 0.1

states: dict = {
    'S': 0,
    'U': 0,
    'T': 0,
    'H': 0,
    'N': 0
}

In [67]:
def load_data(file: str) -> list:
    print("Loading data from file {}...".format(file))
    file = open(file, 'r')
    data = []
    sentence = []
    for line in file:
        pieces = line.rstrip("\n").split("\t")
        if pieces[0] == '<S>':
            data.append(sentence)
            sentence = []
        else:
            sentence.append(pieces)
    print("Loaded {} sentences".format(len(data)))
    return data

In [68]:
train_data: list = load_data('data/train.tsv')
if len(test_file) <= 0:
    print("Splitting data...")
    num_train_samples: int = int(len(train_data)*(1-val_split))
    val_data: list = train_data[num_train_samples:]
    print(len(val_data), " validation samples")
    train_data: list = train_data[:num_train_samples]
    print(len(train_data), " training samples")

Loading data from file data/train.tsv...
Loaded 395922 sentences
Splitting data...
39593  validation samples
356329  training samples


In [69]:
def compute_probabilities_from_counts(counts_dict: dict) -> dict:
    counts_sum: int = sum(counts_dict.values())
    probabilities_dict: dict = {}
    for count_id in counts_dict:
        count = counts_dict[count_id]
        probabilities_dict[count_id] = count / counts_sum
    assert round(sum(probabilities_dict.values()), 2) == 1.0, "All probabilities should sum to 1 but got {}".format(round(sum(probabilities_dict.values()), 2))
    return probabilities_dict

In [70]:
def get_max_float_id_from_dict(float_dict: dict) -> str:
    max_value: float = -0.1
    max_id: float = None
    for dict_id in float_dict:
        if float_dict[dict_id] > max_value:
            max_value = float_dict[dict_id]
            max_id = dict_id
    return max_id


In [71]:
def generate_initial_state_probabilities(data: list) -> dict:
    initial_state_counts: dict = states.copy()
    for sentence in data:
        initial_state: str = sentence[0][1]
        if initial_state in initial_state_counts:
            initial_state_counts[initial_state] += 1
    initial_state_probabilities: dict = compute_probabilities_from_counts(initial_state_counts)
    return initial_state_probabilities

In [72]:
def generate_transition_state_probabilities(data: list) -> dict:
    # create a dictionary with two levels, the first being the previous state and the second being the current state
    transition_state_counts: dict = {state: states.copy() for state in states}
    for sentence in data:
        # since we enumerate over a list that excludes the first item, the enumeration index is one behind
        for prev_idx, word in enumerate(sentence[1:]):
            prev_state: str = sentence[prev_idx][1]
            current_state: str = word[1]
            if prev_state in transition_state_counts and current_state in transition_state_counts[prev_state]:
                transition_state_counts[prev_state][current_state] += 1
    transition_state_probabilities: dict = {state: {} for state in states}
    for prev_state in transition_state_counts:
        transition_state_probabilities[prev_state] = compute_probabilities_from_counts(transition_state_counts[prev_state])
    return transition_state_probabilities

In [73]:
def generate_emission_probabilities(data: list) -> dict:
    emission_counts_by_state: dict = {state: {} for state in states}
    for sentence in data:
        for word_state_pair in sentence:
            word, state = word_state_pair
            if state in emission_counts_by_state:
                # initialize word in state dict if the first occurrence of word X in state Y
                if word not in emission_counts_by_state[state]:
                    emission_counts_by_state[state][word] = 0
                emission_counts_by_state[state][word] += 1
    emission_probabilities_by_state: dict = {state: {} for state in states}
    for state in emission_counts_by_state:
        emission_probabilities_by_state[state] = compute_probabilities_from_counts(emission_counts_by_state[state])
    return emission_probabilities_by_state

In [74]:
class HMM:
    def __init__(self, data):
        self.initial_state_probabilities, self.transition_probabilities, self.emission_probabilities = self.fit(data)

    @staticmethod
    def fit(data: list) -> tuple:
        initial_state_probabilities = generate_initial_state_probabilities(data)
        transition_probabilities = generate_transition_state_probabilities(data)
        emission_probabilities = generate_emission_probabilities(data)
        return initial_state_probabilities, transition_probabilities, emission_probabilities

    def predict(self, sentence: str) -> dict:
        words: list = sentence.split(" ")
        state_sequence = []
        state_probabilities: dict = self.initial_state_probabilities.copy()
        for idx, word in enumerate(words):
            for state in state_probabilities:
                if word in self.emission_probabilities[state]:
                    state_probabilities[state] = state_probabilities[state]*self.emission_probabilities[state][word]
                else:
                    state_probabilities[state] = 0
            state_sequence.append(get_max_float_id_from_dict(state_probabilities))
            # for next word, initialize probabilities as transition probabilities from the previous state
            state_probabilities = self.transition_probabilities[state_sequence[idx]].copy()
        return state_sequence

    def evaluate(self, sentences: list, labels: list):
        total, correct = 0, 0
        for sentence_idx, sentence in enumerate(sentences):
            predicted_sequence = self.predict(sentence)
            total += len(predicted_sequence)
            correct += sum([int(predicted == labels[sentence_idx][tag_idx]) for tag_idx, predicted in enumerate(predicted_sequence)])
        print(correct / total)

In [75]:
model = HMM(train_data)

In [79]:
print(model.predict(" ".join([word[0] for word in val_data[0]])))
print([word[1] for word in val_data[0]])

['N', 'N', 'S', 'N', 'N', 'N', 'N', 'N', 'N']
['N', 'N', 'U', 'N', 'N', 'N', 'N', 'N', 'N']


In [77]:
model.predict(" ".join([word[0] for word in val_data[1]]))
print([word[1] for word in val_data[1]])

['N', 'N', 'N', 'N']


In [78]:
val_sentences = []
val_labels = []
for sentence in val_data:
    val_sentences.append((" ".join(word_state_pair[0] for word_state_pair in sentence)))
    val_labels.append([word_state_pair[1] for word_state_pair in sentence])
model.evaluate(val_sentences, val_labels)

0.8981434392455854
