In [33]:
import math, sys

START_STATE_KEY = "START"
STOP_STATE_KEY = "STOP"
UNK_KEY = "#UNK#"
LOG_ZERO_PROB = -(sys.maxsize**2)

# word_seq: list of [word_1,word_2,word_3,...]
# log_emission_params: NON-SPARSE dict of {
#   "source_state": {
#     "target_word": logarithmic probability
#   }
# }
# log_transmission_params follows same format
# k: number of best candidates to output
def output_function(word_seq,log_emission_params,log_transmission_params,k):
    probabilities = {START_STATE_KEY: [(0,[START_STATE_KEY])]}
    
    for i in range(len(word_seq)):
        word = word_seq[i]
        probabilities = viterbi_best_k(probabilities,log_emission_params,log_transmission_params,word,k)
        
    probabilities = viterbi_best_k(probabilities,log_emission_params,log_transmission_params,STOP_STATE_KEY,k)
    
    return probabilities[STOP_STATE_KEY]

def filter_best_k(ls, k):
    ls_sorted = sorted(ls,key=lambda i: -1*i[0])
    return ls_sorted[:k]

def viterbi_best_k(probabilities,log_emission_params,log_transmission_params,word,k):
    target_stateset = None
    output_proba_dict = dict()
    target_stateset = [STOP_STATE_KEY] if word==STOP_STATE_KEY else [i for i in list(log_transmission_params.keys()) if not i==START_STATE_KEY]
    
    for target_state in target_stateset:
        log_emission_prob = 0 if STOP_STATE_KEY in target_stateset else log_emission_params[target_state].get(word,None)
        sum_new_list = []
        for source_state in probabilities:
            current_list = probabilities[source_state]
            log_transmission_prob = log_transmission_params[source_state][target_state]
            for i in current_list: sum_new_list.append((i[0]+log_transmission_prob+log_emission_prob,i[1]+[target_state]))
        output_proba_dict[target_state] = filter_best_k(sum_new_list,k)
    
    return output_proba_dict

In [34]:
# Testing
epsilon = 10**-5

test_emission_proba_dict = {
    "X": {
        "a": math.log(0.4),
        "b": math.log(0.6),
        "c": LOG_ZERO_PROB
    },
    "Y": {
        "a": math.log(0.2),
        "b": LOG_ZERO_PROB,
        "c": math.log(0.8)
    },
    "Z": {
        "a": math.log(0.2),
        "b": math.log(0.6),
        "c": math.log(0.2)
    }
}

test_transmission_proba_dict = {
    START_STATE_KEY: {
        "X": math.log(0.5),
        "Y": LOG_ZERO_PROB,
        "Z": math.log(0.5),
        STOP_STATE_KEY: LOG_ZERO_PROB
    },
    "X": {
        "X": LOG_ZERO_PROB,
        "Y": math.log(0.4),
        "Z": math.log(0.4),
        STOP_STATE_KEY: math.log(0.2)
    },
    "Y": {
        "X": math.log(0.2),
        "Y": LOG_ZERO_PROB,
        "Z": math.log(0.2),
        STOP_STATE_KEY: math.log(0.6)
    },
    "Z": {
        "X": math.log(0.4),
        "Y": math.log(0.6),
        "Z": LOG_ZERO_PROB,
        STOP_STATE_KEY: LOG_ZERO_PROB
    }
}

test_1 = output_function(["b","c"],test_emission_proba_dict,test_transmission_proba_dict,1)
test_1_expected_sequence = [START_STATE_KEY,"Z","Y",STOP_STATE_KEY]
assert abs((math.e**test_1[0][0])-0.0864)<=epsilon
assert len(test_1_expected)==len(test_1[0][1])
for i in range(len(test_1_expected)):
    assert test_1[0][1][i]==test_1_expected_sequence[i]
    
test_2 = output_function(["b","c"],test_emission_proba_dict,test_transmission_proba_dict,3)
test_2_expected_scores = [0.0864,0.0576,0]
test_2_expected_sequences = [[START_STATE_KEY,"Z","Y",STOP_STATE_KEY],[START_STATE_KEY,"X","Y",STOP_STATE_KEY]]
for i in range(len(test_2_expected_scores)):
    assert abs((math.e**test_2[i][0])-test_2_expected_scores[i])<=epsilon
for i in range(len(test_2_expected_sequences)):
    for j in range(len(test_2_expected_sequences[i])):
        assert test_2[i][1][j]==test_2_expected_sequences[i][j]

In [3]:
def convert_to_log_emission_params(emission_dict):
    output_dict = dict()
    source_state_set = set()
    target_word_set = set()
    for i in emission_dict:
        source_state_set.add(i[1])
        target_word_set.add(i[0])
    for i in list(source_state_set):
        add_dict = dict()
        for j in list(target_word_set):
            if (j,i) in emission_dict:
                add_dict[j] = emission_dict[(j,i)]
            else:
                add_dict[j] = 0
        output_dict[i] = add_dict
    return output_dict

0.0
