In [11]:
import numpy as np 

train_dir = "dataset/train"
test_dir = "dataset/dev.in"


START_STATE_KEY = "START"
STOP_STATE_KEY = "STOP"

def MLE_emission_parameters(train_dir="dataset/train"):
    ''' Calculates the emission parameters by count(y->x)/count(y)
    
    :param train_dir: our train file path to either ES or RU
    :type train_dir: str

    :return: count_y_dict, Count(y), keys are word '!', value MLE
    :rtype: dict

    :return: count_y_to_x_dict, Count(y->x), keys are tuples of word and label ('!', 'O'), value MLE
    :rtype: dict    

    :return: emission_dict, Count(y->x)/Count(y), keys are tuples of word and label ('!', 'O'), value MLE
    :rtype: dict
    
    '''
    count_y_dict = {}
    count_y_to_x_dict = {}
    emission_tranisition_dict = {}

    with open(train_dir, "r", encoding="utf8") as f:
        for line in f:
            # Parse each line
            if len(line.split(" ")) == 2:
                word, label = line.replace("\n","").split(" ")
            else:
                # skip lines with space 
                continue
            if label in count_y_dict:
                count_y_dict[label] = count_y_dict.get(label) + 1
            else:
                count_y_dict[label] = 1
            if (word,label) in count_y_to_x_dict:
                count_y_to_x_dict[(word,label)] = count_y_to_x_dict.get((word,label)) + 1
            else:
                count_y_to_x_dict[(word,label)] = 1
    # print("count(y): \n", count_y_dict, "\n")
    # print("count(y->x): \n",list(count_y_to_x_dict.items())[0:5], len(count_y_to_x_dict), "\n")
    # Calculate our emission
    for key, value in count_y_to_x_dict.items(): # Default is iterate keys()
        word = key[0]
        label = key[1]
        string = f"emission: {label}+{word}" 
        prob =  value / count_y_dict.get(label)
        emission_tranisition_dict[string] = np.where(prob != 0, np.log(prob), float("-inf"))
    # print("MLE: \n",list(emission_tranisition_dict.items())[0:5],len(emission_tranisition_dict) ,"\n")

    return count_y_dict, count_y_to_x_dict, emission_tranisition_dict

In [12]:
count_y_dict, count_y_to_x_dict, emission_tranisition_dict = MLE_emission_parameters(train_dir)
print(list(emission_tranisition_dict.items())[0:5])

[('emission: O+All', array(-9.01768561)), ('emission: O+in', array(-4.5403488)), ('emission: O+all', array(-5.78556456)), ('emission: O+,', array(-3.24728345)), ('emission: O+the', array(-3.09164887))]


In [13]:
def  MLE_transition_parameters(emission_tranisition_dict, train_dir="dataset/train"):
    ''' Calculates the transition parameters by count(y->x)/count(y)
    
    :param train_dir: our train file path to either ES or RU
    :type train_dir: str

    :return: count_y_dict, Count(yi-1), keys are word '!', value MLE
    :rtype: dict

    :return: count_y_to_y_dict, Count(yi-1,yi), keys are tuples of word and label ('!', 'O'), value MLE
    :rtype: dict    

    :return: transition_dict, Count(yi-1, yi)/Count(yi-1), keys are tuples of word and label ('!', 'O'), value MLE
    :rtype: dict

    '''
    count_y_dict = {}
    count_y_to_y_dict = {}
    prev_label = ""

    with open(train_dir, "r", encoding="utf8") as f:
        for line in f:
            # Parse each line
            if len(line.split(" ")) == 2:
                _, label = line.replace("\n","").split(" ")
            else:
                label = ''
            if label == '' and prev_label != '':
                count_y_dict["STOP"] = count_y_dict.get("STOP") + 1 if count_y_dict.get("STOP") else 1
            elif label !='':
                if prev_label == '':
                    count_y_dict["START"] = count_y_dict.get("START") + 1 if count_y_dict.get("START") else 1
                if label in count_y_dict:
                    count_y_dict[label] = count_y_dict.get(label)+1
                else:
                    count_y_dict[label] = 1
            if prev_label == '' and label != '':
                if ("START", label) in count_y_to_y_dict:
                    count_y_to_y_dict[("START", label)] = count_y_to_y_dict.get(("START", label)) + 1
                else:
                    count_y_to_y_dict[("START", label)] = 1
            elif label == '' and prev_label != '':
                if (prev_label, "STOP") in count_y_to_y_dict:
                    count_y_to_y_dict[(prev_label, "STOP")] = count_y_to_y_dict.get((prev_label, "STOP")) + 1
                else:
                    count_y_to_y_dict[(prev_label, "STOP")] = 1
            elif label != '' and prev_label != '':
                if (prev_label, label) in count_y_to_y_dict:
                    count_y_to_y_dict[(prev_label, label)] = count_y_to_y_dict.get((prev_label, label)) + 1
                else:
                    count_y_to_y_dict[(prev_label, label)] = 1
            prev_label = label
    # print("count(y): \n", count_y_dict, "\n")
    # print("count(y->x): \n",list(count_y_to_y_dict.items()), len(count_y_to_y_dict), "\n")
    # Calculate our transition
    for key, value in count_y_to_y_dict.items(): # Default is iterate keys()
        prev_label = key[0]
        label = key[1]
        string = f"transition: {prev_label}+{label}" 
        prob =  value / count_y_dict.get(prev_label)
        emission_tranisition_dict[string] = np.where(prob != 0, np.log(prob), float("-inf"))
    # print("MLE: \n",list(emission_tranisition_dict.items()), len(emission_tranisition_dict) ,"\n")

    return count_y_dict, count_y_to_y_dict, emission_tranisition_dict

In [14]:
count_y_dict, count_y_to_y_dict, emission_tranisition_dict = MLE_transition_parameters(emission_tranisition_dict, train_dir)
list(emission_tranisition_dict.items())[-5:]

[('transition: B-positive+STOP', array(-4.25268812)),
 ('transition: I-positive+STOP', array(-4.56434819)),
 ('transition: B-positive+B-positive', array(-7.08590146)),
 ('transition: B-neutral+STOP', array(-3.24519313)),
 ('transition: B-negative+STOP', array(-4.83628191))]