In [21]:
# opening the datasets

# ES:

file_path_ES = r"Data/ES/train"

with open(file_path_ES, "r") as file_ES:
    read_line_ES = file_ES.readlines()
    
    
# print(read_line_ES)
# print(len(read_line_ES))

FileNotFoundError: [Errno 2] No such file or directory: 'Data/ES/train'

In [3]:
# RU:

file_path_RU = r"Data/RU/train"

with open(file_path_RU, "r") as file_RU:
    read_line_RU = file_RU.readlines()
    
# print(read_line_RU)

In [18]:
# reading files to get the word and labels & adding START and STOP special states

# ES:

# list with tuples containing word and label -> ('word', 'label')

# START -> ('', 'START')

# STOP -> ('', 'STOP')

train_data_ES = []

# add the START state for the first sentence
train_data_ES.append(("", "START"))

for line in read_line_ES:
    if line!="\n":   #not empty line
        
        word, label = line.strip().split()
        
        train_data_ES.append((word, label))
        
    else:  #add the STOP state to denote END of sentence and add the START state for the start of the next sentence
        train_data_ES.append(("", "STOP"))
        train_data_ES.append(("", "START"))
        
# print(train_data_ES)

# ----------------------------------------------------------------------------------------------------------------

# RU :

# list with tuples containing word and label -> ('word', 'label')

# START -> ('', 'START')

# STOP -> ('', 'STOP')

train_data_RU = []

# add the START state for the first sentence
train_data_RU.append(("", "START"))

for line in read_line_RU:
    if len(line.strip().split(" ")) == 2: #line contains both word and label
        word, label = line.strip().split(" ")
        
        train_data_RU.append((word, label))
        
    else:  #add the STOP state to denote END of sentence and add the START state for the start of the next sentence
        train_data_RU.append(("", "STOP"))
        train_data_RU.append(("", "START"))
        
# print(train_data_RU)
        

## Transition Parameter:

In [19]:
# dictionary containing count of each label (initialised to zero):

label_count = {"START":0, "B-positive":0, "I-positive":0, "O":0, "B-negative":0, "I-negative":0, "B-neutral":0, "I-neutral":0, "STOP":0}



# dictionary storing the state transitions and its respective count:

# eg: {(START, B-positive):1, (B-neutral, I-neutral):3}

label_transitions={}

def estimate_transition_parameters(data):
    
    # dictionary storing the state transitions and its respective transition probability:
    
    transition_probability = {}
    
    prev_state = data[0][1] # initally the START state
    
    for elem in data[1:]: # Exclude the first element because we have already considered the START state
        
        current_state = elem[1]
        
        label_count[prev_state]+=1 # increment count of prev state
        
        # if the transition does not exist, initialise to zero:
        if (prev_state, current_state) not in label_transitions:
            label_transitions[(prev_state, current_state)] = 0
            
        # and then increment the count of transition from prev to current:
        label_transitions[(prev_state, current_state)] += 1
        
        
        
        # Following the convention given in the question
        
        # Using the transition probability formula:
        transition_probability[(current_state, prev_state)] = label_transitions[(prev_state, current_state)] / label_count[prev_state]
        
        
        
        # To get rid of the transition from END to START = 1.0, we can remove it from the transition_probability dictionary:
        if current_state=="START" and prev_state=="END":
            transition_probability.pop((current_state, prev_state))
            
            
        
        # Move to the next state with the prev state as the now current state:
        
        prev_state = current_state

#     print("label count",label_count)
        
    return transition_probability
        
        
# For ES:  
    
transition_parameters_ES= estimate_transition_parameters(train_data_ES)

# print("Transition Parameters for ES:")
# print(transition_parameters_ES)


print()

# For RU:  
    
transition_parameters_RU= estimate_transition_parameters(train_data_RU)

# print("Transition Parameters for RU:")
# print(transition_parameters_RU)



# ----------------------------------------------------------------------------------------

# e.g. transition_probability = {('O', 'START'): 0.9289176090468497} 

# where the transition probability of state START to state O is 0.929




## Emission Parameters:

In [6]:
#Read Files and Load
def readFile(filePath: str):
    with open(filePath, "r", encoding="utf-8") as f:
        return f.readlines()
    
def loadFile(file: list):
    return [word.strip() for word in file]

In [7]:
#Obtain total word count for each word
def WordCount(file: list):
    x = {
        "O": 0,
        "B-positive": 0,
        "B-negative": 0,
        "B-neutral": 0,
        "I-positive": 0,
        "I-negative": 0,
        "I-neutral": 0
    }
    for i in range(len(file)):
        if file[i] != "":
            l = file[i].split()
            entity = l[0]
            label = l[1]
            key = f"{entity}_{label}"
            if key in x:
                x[key] += 1
                x[label] += 1
            else:
                x[key] = 1
                x[label] += 1
    return x

In [8]:
#Function to calculate emission values
def calculate_emission(input_x: dict, k=1):
    emissionResult = 1
    output_x = {}
    label_l = ["O", "B-positive", "B-negative", "B-neutral", "I-positive", "I-negative", "I-neutral"]
    total_labels = sum(input_x[label] for label in label_l)

    for key, value in input_x.items():
        if key not in label_l:
            word, label = key.split("_")
            count_y = input_x[label]
            count_unk_y = input_x.get(f"#UNK#{label}", 0)
            # Using provided formula, calculate emission probability of word x with tag y
            em_value = (value + k) / (count_y + k * (total_labels + 1))
            output_x[key] = em_value
            emissionResult *= em_value

    # Add emission probability for #UNK# token for each label
    for label in label_l:
        count_y = input_x[label]
        count_unk_y = input_x.get(f"#UNK#{label}", 0)
        em_value = k / (count_y + k * (total_labels + 1))
        output_x[f"#UNK#{label}"] = em_value

    return emissionResult, output_x

In [9]:
#Function for Sentiment Prediction
def predSentiment(input_sentence, emission_params):
    predicted_tags = []
    for word in input_sentence:
        max_prob = 0
        max_tag = "O"
        for tag, prob in emission_params.items():
            if tag.startswith(word):
                if prob > max_prob:
                    max_prob = prob
                    max_tag = tag
        predicted_tags.append(max_tag)
    return predicted_tags

In [20]:
#Define file paths to use
trainFilePath = "Data/ES/train"
devInFilePath = "Data/ES/dev.in"
devOutFilePath = "Data/ES/dev.out"

#Read and load file paths
trainFile = readFile(trainFilePath)
loadTrainFile = loadFile(trainFile)

testFile = loadTrainFile

x = WordCount(testFile)
# print(x)
emissionResult, emissionParams = calculate_emission(x)
# print(f"Labels : corresponding emission value: \n{emissionParams}")

# Read development set data
devInFile = readFile(devInFilePath)
processedDevInFile = loadFile(devInFile)

# Predict sentiments for the development set
sentiment_prediction = [predSentiment(sentence.split(), emissionParams) for sentence in processedDevInFile]

# Write the predictions to dev.p1.out
with open(devOutFilePath, "w", encoding="utf-8") as f:
    for sentence_tags in sentiment_prediction:
        f.write("\n".join(sentence_tags) + "\n\n")

FileNotFoundError: [Errno 2] No such file or directory: 'Data/ES/train'

## Viterbi Algorithm:

In [11]:
import numpy as np

In [16]:
def viterbi_best_k(emission_probability, transition_probability, data, k):
    
    n = len(data)
    
    states = list(state for state in label_count.keys() if state!="START" and state!="STOP")
    
    states_with_START = list(state for state in label_count.keys() if state!="STOP")
    
    all_states = list(state for state in label_count.keys())
    
#     print(states)
    
    # 2D array pi to store the score
    
    # START - 0 STOP - 8
    
    pi=[[0 for i in range(len(all_states))] for j in range(len(all_states))]
    
#     print(pi)

#     Output sequence array:

    output = ["" for i in range(k)]
    
#     print(pi.shape)
#     print(pi)
    
    # 1. Initialisation:
    
    pi[0][0] = 1

    
#     print(pi)
    
    # 2. FOR LOOP:
    
    for j in range(0, len(states)-1):
        
        for u in states:
            
            max_score = float('-inf')
            
            for v in states_with_START:
                
#                 print("u",u)
#                 print("v",v)

    #             print(f"{data[j+1][0]}_{states[u]}")

                if f"{data[j+1][0]}_{u}" not in emission_probability:
                    emission_probability[f"{data[j+1][0]}_{u}"] = 0

                emission = emission_probability[f"{data[j+1][0]}_{u}"]
    #             print("j", j)
    #             print("u",u)
#                 print("emission",emission)


                if (u,v) not in transition_probability:
                    transition_probability[(u,v)] = 0
            
                transition = transition_probability[(u,v)]
#                 print(transition)

                score = pi[j][all_states.index(v)] * emission * transition
                max_score = max(max_score, score)
                
            pi[j+1][all_states.index(u)] = max_score
#             print(pi)
            
    # 3. Final Step:
    
    
        
    max_score = float('-inf')

    for v in states_with_START:

        if ("STOP",v) not in transition_probability:
                transition_probability[("STOP",v)] = 0

        transition = transition_probability[("STOP",v)]

        score = pi[len(states)][all_states.index(v)] * transition
        max_score = max(max_score, score)
    
    pi[len(states)+1][all_states.index("STOP")] = max_score
    
    
    # Backtracking:
    
    # Step 1:
    
    output = ["" for i in range(len(all_states))]
    k_output = [[] for i in range(len(states))]
    
    max_score = float('-inf')
    
    for u in states:
        
        if ("STOP", u) not in transition_probability:
            transition_probability[("STOP", u)] = 0
        transition = transition_probability[("STOP", u)]
        
        score = pi[len(states)][all_states.index(u)] * transition
        if score > max_score:
            max_score = score
            
            output[len(states)] = u
            
    k_output[len(states)-1] = [u]
            
    #Step 2:
    
    for j in range(len(states)-1, 0, -1):
        
        max_score = float('-inf')
        
        for u in states:
            
            for v in states_with_START:
                
                if (output[j+1],u) not in transition_probability:
                    transition_probability[(output[j+1],u)] = 0
            
                transition = transition_probability[(output[j+1],u)]
                
                score = pi[j][all_states.index(u)] * transition
                
                if score > max_score:
                    max_score = score
                    best_state_j = u
                    
        output[j] = best_state_j
        
    for j in range(len(states) - 1, 0, -1):
        
        k_output[j-1] = top_k_states(j, pi, transition_probability, k, output)
    
    print("normal output: ",output)
    
    print("k output: ",k_output)
    
    
def top_k_states(j, pi, transition_probability, k, output):
    # Create a list to store (state, score) tuples
    state_scores = []
    
    states = list(state for state in label_count.keys() if state!="START" and state!="STOP")
    
    states_with_START = list(state for state in label_count.keys() if state!="STOP")
    
    all_states = list(state for state in label_count.keys())
    
    for u in states:
        max_score = float('-inf')
        max_state = None
        
        for v in states_with_START:
            if (output[j + 1], u) not in transition_probability:
                transition_probability[(output[j + 1], u)] = 0
            transition = transition_probability[(output[j + 1], u)]
            score = pi[j][all_states.index(v)] * transition
            
            if score > max_score:
                max_score = score
                max_state = v
        
        state_scores.append((u, max_score, max_state))
    
    # Sort the state_scores by score in descending order
    state_scores.sort(key=lambda x: x[1], reverse=True)
    
    # Return the top-k states
    return [state for state, _, _ in state_scores[:k]]

    
    
viterbi_best_k(emissionParams, transition_parameters_ES, train_data_ES, 5)

normal output:  ['', 'O', 'O', 'O', 'O', 'O', 'O', 'B-positive', '']
k output:  [['O', 'B-positive', 'B-negative', 'B-neutral', 'I-positive'], ['O', 'B-positive', 'B-negative', 'B-neutral', 'I-positive'], ['O', 'B-positive', 'B-negative', 'B-neutral', 'I-positive'], ['O', 'B-positive', 'B-negative', 'B-neutral', 'I-positive'], ['O', 'B-positive', 'B-negative', 'B-neutral', 'I-positive'], ['O', 'B-positive', 'I-positive', 'B-negative', 'I-negative'], ['I-neutral']]
