In [2]:
import re
import os
import numpy as np
import copy

In [3]:
# Defining Helper variables to be used later

labelList = ["O", "B-positive", "B-negative", "B-neutral", "I-positive", "I-negative", "I-neutral"]
labelDictInit = {   
        "START": 0,
        "O": 0,
        "B-positive": 0,
        "B-negative": 0,
        "B-neutral": 0,
        "I-positive": 0,
        "I-negative": 0,
        "I-neutral": 0,
        "END": 0
    }

NUMBER_OF_LABELS = len(labelList)

# Initialise a random number generator with a fixed seed for reproducible results and deterministic behavior
rng = np.random.default_rng(1004519 + 1004103 + 1004555)

# Defining the filePath for the datasets
folderPath = os.path.abspath(os.getcwd())

EsTrainFilePath = os.path.join(folderPath, "../Data/ES/train")
EsTrain1FilePath = os.path.join(folderPath, "../Data/ES/train1")
EsDevInFilePath = os.path.join(folderPath, "../Data/ES/dev.in")
EsDevOutFilePath = os.path.join(folderPath, "../Data/ES/dev.out")
EsPredOutputFilePath = os.path.join(folderPath, "../Data/ES/dev.p1.out")

RuTrainFilePath = os.path.join(folderPath, "../Data/RU/train")
RuDevInFilePath = os.path.join(folderPath, "../Data/RU/dev.in")
RuDevOutFilePath = os.path.join(folderPath, "../Data/RU/dev.out")
RuPredOutputFilePath = os.path.join(folderPath, "../Data/RU/dev.p1.out")

ES = [
    EsTrainFilePath,
    labelDictInit,
    EsDevInFilePath,
    EsDevOutFilePath,
    EsPredOutputFilePath]

RU = [
    RuTrainFilePath,
    labelDictInit,
    RuDevInFilePath,
    RuDevOutFilePath,
    RuPredOutputFilePath]

languages = [ES]


In [4]:
# Helper functions to read and parse data
def readFile(filePath: str):
    with open(filePath, "r", encoding="utf-8") as f:
        return f.readlines()
    
def processFile(file: list):
    return [word[:len(word)-1] for word in file]

def getAllUniqueTokens(input_data):
    # Might want to somehow ensure that this order stays consistent between runs
    return list(set(item.split(" ")[0] for item in input_data))

def getSentences(input_data):
    return [sentence for sentence in [group.split('\n') for group in '\n'.join(input_data).split('\n\n')] if sentence != ['']]



### Part 1

1. Write a function that estimates the emission parameters from the training set using MLE (maximum likelihood estimation):
<br>
$$
e(x|y) = \frac{{\text{{Count}}(y \rightarrow x)}}{{\text{{Count}}(y)}}
$$

2. Set k to 1, implement this fix into your function for computing the emission parameters

$$
e(x|y) = \begin{cases}
\frac{{\text{{Count}}(y \rightarrow x)}}{{\text{{Count}}(y)+k}}, & \text{{if the word token }} x \text{{ appears in the training set}} \\
\frac{k}{{\text{{Count}}(y)+k}}, & \text{{if word token }} x \text{{ is the special token \#UNK\#}}
\end{cases}




In [5]:
# Calculating Emissions Function

def calcCountofEachWord(file: list, labelDict_in: dict):
    tokenDict_out = {}

    for i in range(len(file)):
        if file[i] != "":
            l = file[i].split()
            token = l[0]
            label = l[1]
            key = (token, label)
            if label in labelDict_in:
                labelDict_in[label] += 1

            else:
                labelDict_in[label] = 1
                

            if key in tokenDict_out:
                tokenDict_out[key] += 1
            else:
                tokenDict_out[key] = 1
    return tokenDict_out, labelDict_in

def calcEmission(tokenDict_in: dict, labelDict_in: dict, uniqueTokensList_in: list, k: float = 1.0):
    emissionDict_out = {}
    unknownDict = {}

    for token, label in tokenDict_in.keys():
        if token not in emissionDict_out:
            emissionDict_out[token] = {}  # Create an empty inner dictionary for the token
        if labelDict_in[label] != 0:
            e = tokenDict_in[(token, label)] / (labelDict_in[label] + k)
            emissionDict_out[token][label] = e  # Update the emission value for the specific label

    for key, labelCount in labelDict_in.items():
        if key not in ["START", "END"]:
            # Creating entry for unknown words
            unknownToken = "#UNK"
            e = k / (labelCount + k)
            unknownDict[key] = e
            emissionDict_out[unknownToken] = unknownDict

    return emissionDict_out

def getLabel(tokenInput: str, uniqueTokensList_in: list, emissionsDict_in: dict):
    if tokenInput in uniqueTokensList_in:
        x = max(emissionsDict_in[tokenInput], key=emissionsDict_in[tokenInput].get)
        # print('x -> {tokenInput} Current max probability is: ', x)
        return x
    else:
        y = max(emissionsDict_in["#UNK"], key=emissionsDict_in["#UNK"].get)
        # print(f'y -> {tokenInput} Current max probability is: {y}')
        return y

def calcSentimentAnalysis(tokenList: list, trainedData: dict, uniqueTokensList_in: list):

    predictedTokenList = []

    for token in tokenList:
        if token:
            if token in trainedData:
                predictedTokenList.append(token + " " + getLabel(token, uniqueTokensList_in, trainedData))
            else:
                predictedTokenList.append(token + " " + getLabel(token, uniqueTokensList_in, trainedData))
        else:
            predictedTokenList.append("")
    return predictedTokenList


def evalModel(predictedTokenFilePath: str, actualTokenFilePath: str):
    
    # Reading both files
    predictedTokenFile = processFile(readFile(predictedTokenFilePath))
    actualTokenFile = processFile(readFile(actualTokenFilePath))

    # print("\n=============Processed Files=============")
    # print(f"Predicted Token File: {predictedTokenFile}")
    # print(f"Actual Token File: {actualTokenFile}")
    # print("=============Processed Files=============\n")

    # Extracting the predicted and actual labels
    predictedLabels = [line.split()[1] for line in predictedTokenFile if line]
    actualLabels = [line.split()[1] for line in actualTokenFile if line]

    # print("\n=============Label Lists=============")
    # print(f"Predicted Labels: {predictedLabels}")
    # print(f"Actual Labels: {actualLabels}")
    # print("=============Label Lists=============\n")
    
    # Checking if the shape of the label lists are the same
    assert len(predictedLabels) == len(actualLabels) 
    
    # Precision
    correctPredictions = 0
    # Calculate metrics for each label
    totalPredicted = len(predictedLabels)
    totalActual = len(actualLabels)

    for i in range(totalPredicted):    
        if predictedLabels[i] == actualLabels[i]:
            correctPredictions += 1

    precision = round(correctPredictions/len(predictedLabels), 5) if (totalPredicted) != 0 else 0

    # Recall
    recall = round(correctPredictions/len(actualLabels), 5) if (totalActual) != 0 else 0

    # F1
    f1 = round(2 * ((precision * recall) / (precision + recall)), 5) if ((totalPredicted) != 0 ) and ((totalActual) != 0) else 0

    print("\n=============Evaluation Metrics=============")
    print("Precision: ", precision)
    print("Recall: ", recall)
    print("F1: ", f1)
    print("=============Evaluation Metrics=============\n\n")

    return precision, recall, f1
    
    

In [6]:
def predictAndWrite(
        trainFilePath: str, 
        labelDictIn: dict, 
        devInFilePath: str, 
        devOutFilePath: str,
        predOutputFilePath: str,
    ):
    # Processing the file to separate line by line
    trainData = processFile(readFile(filePath=trainFilePath))
    uniqueTokensList = getAllUniqueTokens(trainData)

    # Calculating the count of each token to the label
    tokenDict, labelDictOut = calcCountofEachWord(trainData, labelDictIn)

    # print(f"Token Dict is: \n{tokenDict}")
    # print(f"Label Dict is: \n{labelDictOut}")

    # Calculating the emission value for each unique token
    emissionsDict = calcEmission(tokenDict, labelDictOut, uniqueTokensList)

    # Reading test file
    testInData = processFile(readFile(devInFilePath))

    # Predicting test data
    predictedLabels = calcSentimentAnalysis(testInData, emissionsDict, uniqueTokensList)

    # Writing to file
    with open(predOutputFilePath, "w+", encoding="utf-8") as file:
        for line in predictedLabels:
            file.write(line + "\n")

    # Calculating Precision
    precision, recall, f1 = evalModel(predOutputFilePath, devOutFilePath)

    return uniqueTokensList, emissionsDict, precision, recall, f1


In [7]:
# Running the function
for language in languages:
    uniqueTokensList, emissionsDict, precision, recall, f1 = predictAndWrite(
            language[0],
            language[1],
            language[2],
            language[3],
            language[4]
        )
    
emissionsDict


Precision:  0.6308
Recall:  0.6308
F1:  0.6308




{'Estuvimos': {'O': 0.00020664003306240529},
 'hace': {'O': 0.0008954401432704229},
 'poco': {'O': 0.0018942003030720485},
 'mi': {'O': 0.0024796803967488635},
 'pareja': {'O': 0.00044772007163521146},
 'y': {'O': 0.0352665656426505,
  'I-negative': 0.011627906976744186,
  'I-positive': 0.015873015873015872},
 'yo': {'O': 0.0012398401983744318, 'I-negative': 0.005813953488372093},
 'comiendo': {'O': 0.00034440005510400884},
 'resultó': {'O': 0.00013776002204160352},
 'todo': {'O': 0.0039606006336961016, 'I-negative': 0.005813953488372093},
 'muy': {'O': 0.013638242182118749, 'I-positive': 0.006349206349206349},
 'bien': {'O': 0.005682600909216145, 'B-positive': 0.0008613264427217916},
 ',': {'O': 0.05730816916930707,
  'I-positive': 0.006349206349206349,
  'I-negative': 0.01744186046511628},
 'tanto': {'O': 0.0013431602149056344},
 'la': {'O': 0.026002204160352666,
  'I-positive': 0.0380952380952381,
  'I-negative': 0.01744186046511628,
  'I-neutral': 0.022727272727272728},
 'comida': 

In [8]:
correctFile = processFile(readFile(os.path.join(folderPath, "correct_dev.p1.out")))
ourFile = processFile(readFile(os.path.join(folderPath, "../Data/ES/dev.p1.out")))

out = []

for i in range(len(correctFile)):
    if correctFile[i] != ourFile[i]:
        out.append( (correctFile[i], ourFile[i]))
print(out)
print(len(out))

[]
0


In [57]:

def calcEndStates_TransitionsDict(
    filePathIn: str,
    labelListIn: list
):  
    # Reading the file
    file = processFile(readFile(filePathIn))

    # Dict to count the number of START and END
    endStatesDict = {
        'START': {'count': 0},
        'STOP': {'count': 0}
    }

    for label in labelListIn:
        endStatesDict[label] = {'count': 0, 'tokenList': []}

    # Dict -> Keys = Label, Value = Dict of Label: Count 
    # => Counts the number of Transitions from Label x -> y
    transitionDict = {}
    transitionDict['START'] = {}
    for label in labelListIn: 
        transitionDict['START'][label] = 0
    transitionDict['START']['STOP'] = 0
    for label1 in labelListIn:
        transitionDict[label1] = {}
        for label2 in labelListIn:
            transitionDict[label1][label2] = 0
        transitionDict[label1]['STOP'] = 0

    
    prevLabel = "START"

    for line in file:

        try:
            token, label = line.split(" ")
            # Blank => Start and End of sentence
            if prevLabel == "START":
                currLabel = label
                

                endStatesDict["START"]['count'] += 1
                transitionDict["START"][currLabel] += 1
                endStatesDict[currLabel]["count"] += 1
                endStatesDict[currLabel]["tokenList"].append(token)

                prevLabel = currLabel
            else:
                currLabel = label

                transitionDict[prevLabel][currLabel] += 1
                endStatesDict[currLabel]["count"] += 1
                endStatesDict[currLabel]["tokenList"].append(token)

                prevLabel = currLabel    
        except:
            # STOP
            currLabel = "STOP"

            endStatesDict["STOP"]["count"] += 1
            transitionDict[prevLabel]["STOP"] += 1

            # Preparing to transition from STOP to START
            prevLabel = "START"



    calcTransitionsDict = {}

    for state in transitionDict.keys():
        calcTransitionsDict[state] = {}
        for nextState in transitionDict[state].keys():
            try: calcTransitionsDict[state][nextState] = round(transitionDict[state][nextState] / endStatesDict[state]['count'], 5)
            except: calcTransitionsDict[state][nextState] = 0.0

    return transitionDict, endStatesDict ,calcTransitionsDict


def viterbiAlgo(
    endStatesDictIn: dict,
    calcTransitionsDictIn: dict,
    labelListIn: list,
    filePathIn: str,
    emissionsDictIn: dict,
):

    # Defining dicts
    pathsDict = {}
    prevTag = "START"
    sentenceCounter = 0
    # Paths Dict = {
    #   line number : { token : { label : prob }
    # }

    # Policy Dict = {
    #   token : { label : prob }
    # }

    # Read the file
    file = processFile(readFile(filePathIn))
    sentences = getSentences(file)
    lineCount1 = 0
    lineCount2 = 0
    for i in range(len(file)):
        lineCount1 += 1

    # Loop through the sentences
    for sentence in sentences:
        policyDict = {}
        # Loop through each line in the sentence
        for line in sentence:
            lineCount2 += 1
            if line != "":
                probLabelDict = {}
                for label in labelListIn:
                    try:
                        token, tag = line.split(" ")
                        currTag = tag
                        prob = (calcTransitionsDictIn[prevTag][currTag]* emissionsDictIn[token][label])
                        probLabelDict[label] = prob

                    except:
                        continue
                policyDict[token] = probLabelDict
        
        sentenceCounter += 1

        pathsDict[sentenceCounter] = policyDict

    print(lineCount1)
    print(lineCount2)
    return pathsDict


def findOptimalPaths(pathsDictIn: dict):
    optimalPaths = {}
    for i in pathsDictIn.keys():
        optimalPaths[i] = {}
        for token in pathsDictIn[i].keys():
            optimalPaths[i][token] = max(pathsDictIn[i][token], key=pathsDictIn[i][token].get)

    return optimalPaths

def writeOptimalPaths(optimalPathsIn: dict):
    with open("testingOptimalPaths.txt", "w") as f:
        for i in optimalPathsIn.keys():
            for token in optimalPathsIn[i].keys():
                f.write(token + " " + optimalPathsIn[i][token] + "\n")
            f.write("\n")
        


## Calling the functions
transitionDict, endStatesDict ,calcTransitionsDict = calcEndStates_TransitionsDict(EsTrainFilePath, labelList)


pathsDict = viterbiAlgo(
    endStatesDictIn=endStatesDict,
    calcTransitionsDictIn=calcTransitionsDict,
    labelListIn=labelList,
    filePathIn=EsTrainFilePath,
    emissionsDictIn=emissionsDict
)

optimalPaths = findOptimalPaths(pathsDict)
optimalPaths
writeOptimalPaths(optimalPaths)

33033
31177


In [59]:
ourFile = processFile(readFile(os.path.join(folderPath, "./testingOptimalPaths.txt")))
correctFile = processFile(readFile(EsTrainFilePath))

out = []

# for i in range(len(correctFile)):
#     if correctFile[i] != ourFile[i]:
#         out.append( (correctFile[i], ourFile[i]))
print(ourFile[63:66])
print(correctFile[63:66])

print(len(ourFile))
print(len(correctFile)) # why is the len of ourFile less than the correctFile?
print(len(correctFile)- len(ourFile))

missing_lines = []

for i, correct_line in enumerate(correctFile):
    if i >= len(ourFile) or ourFile[i] != correct_line:
        missing_lines.append(correct_line)

# Print the missing lines
for line in missing_lines:
    print("Missing Line:", line)

['y O', 'agradable O', '. O']
['Comida B-positive', 'exquisita O', '. O']
29082
33033
3951
Missing Line: yo O
Missing Line: y O
Missing Line: resultó O
Missing Line: todo O
Missing Line: muy O
Missing Line: bien O
Missing Line: , O
Missing Line: tanto O
Missing Line: la O
Missing Line: comida B-positive
Missing Line: , O
Missing Line: el O
Missing Line: vino B-positive
Missing Line: , O
Missing Line: el O
Missing Line: trato B-positive
Missing Line: , O
Missing Line: la O
Missing Line: decoración B-positive
Missing Line: … O
Missing Line: nos O
Missing Line: gustó O
Missing Line: todo O
Missing Line: mucho O
Missing Line: . O
Missing Line: 
Missing Line: Por O
Missing Line: poner O
Missing Line: algún O
Missing Line: pero O
Missing Line: , O
Missing Line: quizá O
Missing Line: el O
Missing Line: jamón B-negative
Missing Line: no O
Missing Line: era O
Missing Line: todo O
Missing Line: lo O
Missing Line: " O
Missing Line: ibérico O
Missing Line: " O
Missing Line: que O
Missing Line: cab

In [None]:
l1 = ['Estuvimos O', 'hace O', 'poco O', 'mi O', 'pareja O', 'y O', 'yo I-negative', 'comiendo O', 'resultó O', 'todo I-negative', 'muy O', 'bien O', ', O', 'tanto O', 'la I-positive', 'comida B-positive', 'el I-neutral', 'vino B-negative', 'trato B-positive', 'decoración B-positive', '… B-negative', 'nos O', 'gustó O', 'mucho I-negative', '. O']


l2 = ['Estuvimos O', 'hace O', 'poco O', 'mi O', 'pareja O', 'y O', 'yo O', 'comiendo O', 'y O', 'resultó O', 'todo O', 'muy O', 'bien O', ', O', 'tanto O', 'la O', 'comida B-positive', ', O', 'el O', 'vino B-positive', ', O', 'el O', 'trato B-positive', ', O', 'la O']

for i in range(len(l1)):
    if l1 != 

In [10]:
### Part 3

# Helper Functions    
def update_sorted_paths(array, value):
        if value[0] > array[0][0]: array[0] = value
        array.sort(key=lambda x:x[0])

def update_policy_for_tag(policy, step, tag, word, transition_probs, emission_probs, k):
    """
    Update the policy for a specific tag at a given step.
    
    Args:
    - policy: Dictionary containing the policy for each step and tag.
    - step: Current step in the sequence.
    - tag: Current tag being considered.
    - word: Current word in the sequence.
    - transition_probs: Dictionary containing transition probabilities.
    - emission_probs: Dictionary containing emission probabilities.
    - k: Number of top paths to consider.
    
    Returns:
    - None. The policy dictionary is updated in-place.
    """
    if step not in policy:
        policy[step] = {}
    
    kth_list = [(-1, None, None) for _ in range(k)]
    for prev_tag in policy[step - 1]:
        for i in range(k):
            prev_prob, _, _ = policy[step - 1][prev_tag][i]
            trans_prob = transition_probs.get((prev_tag, tag), 0)
            emis_prob = emission_probs.get((tag, word), 0)
            prob = prev_prob * trans_prob * emis_prob
            update_sorted_paths(kth_list, (prob, prev_tag, i))
    
    policy[step][tag] = kth_list

# get policy
def get_policy(step, y,policy_dict):
    # Base case, where step is 0
    if step == 0:
        return 1 if y == "START" else 0
    else: 
        return policy_dict[step][y]
    
def get_path_value(path_records, position, tag, index):
    """
    Helper function to retrieve the value of a path for a given position and tag.
    
    Args:
    - path_records (dict): A nested dictionary containing path records.
    - position (int): The position in the sequence.
    - tag (str): The tag for which the value needs to be retrieved.
    - index (int): The index of the kth best path (for k-best Viterbi).
    
    Returns:
    - float: The probability value of the path at the given position and tag.
    """
    # Check if the position exists in the path records
    if position in path_records:
        # Check if the tag exists for the given position
        if tag in path_records[position]:
            # Retrieve the value (probability) for the given tag and index
            value_tuple = path_records[position][tag]
            if index < len(value_tuple):
                return value_tuple[index][0]  # Return the probability value
    # Return None if the value doesn't exist
    return None

def backtrack_kth_best_sequences(path_records, final_position, k, sequence_length):
    """
    Helper function to backtrack and find the k-th best sequences.
    
    Args:
    - path_records (dict): A nested dictionary containing path records.
    - final_position (int): The final position in the sequence.
    - k (int): The index of the kth best path (for k-best Viterbi).
    - sequence_length (int): The length of the sequence.
    
    Returns:
    - list: The k-th best sequence of tags.
    """
    # Initialize an empty list to store the k-th best sequence
    kth_best_sequence = []
    
    # Start from the final position and 'STOP' tag
    current_position = final_position
    current_tag = 'STOP'
    
    # Backtrack through the path records
    while current_position > 0:
        # Retrieve the previous tag and index for the current tag and k
        _, previous_tag, previous_k = path_records[current_position][current_tag][k]
        
        # Add the previous tag to the k-th best sequence
        kth_best_sequence.append(previous_tag)
        
        # Update the current tag and position for the next iteration
        current_tag = previous_tag
        current_position -= 1
        k = previous_k  # Update k to the previous k for backtracking
    
    # Reverse the k-th best sequence to get the correct order
    kth_best_sequence.reverse()
    
    # Return the k-th best sequence
    return kth_best_sequence

def kth_best_seq(observed_seq,label_list,known_words,get_emission,get_transition,k):
    path_records = {}

    #Initialization
    curr_pos = 1
    path_records[curr_pos]= {}
    word = observed_seq[0]
    if word not in known_words:
        word = "#UNK#"
    for label in label_list:
        top_kpath = [(-1,None,None) for _ in range (k)]
        emission_prob = get_emission(word,label)
        transition_prob = get_transition("START",label)
        update_sorted_paths(top_kpath,(emission_prob*transition_prob,"START",0))
        path_records[curr_pos][label] = k
    curr_pos += 1

    #Recursive Step
    for word in observed_seq[1:]:
        if word not in known_words:
              word = "#UNK#"
        path_records[curr_pos] = {}
        for curr_label in label_list:
             update_policy_for_tag(path_records, curr_pos, curr_label, word, get_emission, get_transition, label_list, k)
        curr_pos += 1

    #Termination
    top_k_paths = [(-1, None, None) for _ in range(k)]
    for prev_tag in label_list:
        for i in range(k):
            prob = get_path_value(path_records, curr_pos - 1, prev_tag, i) * get_transition(prev_tag, 'STOP')
            update_sorted_paths(top_k_paths, (prob, prev_tag, i))
    path_records[curr_pos]["STOP"] = top_k_paths

    # Backtrack to find the k-th best sequences
    sequences = backtrack_kth_best_sequences(path_records, curr_pos, k, len(observed_seq))
    return sequences



# Part 3 : write 2nd and 8th best output using Viterbi algorithm 
def evaluate_viterbi_kth(testfilepath, label_list, known_words,get_emission,get_transition,k): 
    f = open(testfilepath, "r", encoding="utf8") 
    w2 = open(f"Data/{testfilepath[5:7]}/dev.p3.2nd.out", "w", encoding="utf8") 
    w8 = open(f"Data/{testfilepath[5:7]}/dev.p3.8th.out", "w", encoding="utf8") 
    lines = f.readlines() 
    sentence = [] 
    for line in lines: 
        # if line is not empty, add the word to the sentence 
        if line != "\n": sentence.append(line[:-1]) 
        # if line is empty, run viterbi on the sentence 
        else: 
            best_sequences = kth_best_seq(sentence,8) 
            second_best = best_sequences[1] 
            eighth_best = best_sequences[7] 
            for tag2, tag8, word in zip(second_best, eighth_best, sentence): 
                w2.write(f"{word} {tag2}\n") 
                w8.write(f"{word} {tag8}\n") 
            sentence = [] 
            w2.write("\n") 
            w8.write("\n") 
    w2.close() 
    w8.close() 
    f.close()

In [11]:
evaluate_viterbi_kth(
    EsDevInFilePath,
    labelList,
    uniqueTokensList,
    emissionsDict,
    transitionDict,
    k=8
    
)

FileNotFoundError: [Errno 2] No such file or directory: 'Data/TD/dev.p3.2nd.out'