In [2]:
#imports
import pandas as pd
import numpy as np

#Read the data
language = 'RU'
file_path = f"Data/{language}/train"
labeled_data = []

with open(file_path, 'r', encoding='utf-8') as file:
    lines = file.readlines()

labeled_data.append(('START', 'START'))
for line in lines:
    line = line.strip()
    if line:
        try:
            data_point, label = line.rsplit(' ', 1) #Split by only the last space in a line, cus of some noise in RU data
            labeled_data.append((label, data_point))  #Storing as a list of tuples (label, data_point)
        except:
            print("Error at line " + line)
    else:
        labeled_data.append(('STOP', 'STOP')) #If line is empty it signals the end of a sentence, and the start of a new one
        labeled_data.append(('START', 'START'))
labeled_data.append(('STOP', 'STOP'))

#Global data
df = pd.DataFrame(labeled_data, columns=['Label', 'Data'])
labels = df['Label'].unique()
words = df['Data'].unique()
label_counts = df['Label'].value_counts()

In [3]:
#Part 1
#Generate Emission Matrix
#emission_matrix[label][word]

word_label_counts_matrix = pd.crosstab(df['Data'], df['Label'])
k = 1
unk_row = pd.DataFrame([[k]*(len(word_label_counts_matrix.columns)-2)+[0]*2], columns=word_label_counts_matrix.columns, index=['#UNK#']) # Create a new row with index '#UNK#' and values all set to k
word_label_counts_matrix = pd.concat([word_label_counts_matrix, unk_row]) # Append the new row to the DataFrame
emission_matrix = word_label_counts_matrix.div(label_counts, axis=1)

# Replace any value of 0 with 0.00001. Necessary otherwise some sentences will have no valid path.
def replace_zero_except_start_stop(val):
    if val == 0 and col not in ['START', 'STOP']:
        return 0.00001
    return val

for col in emission_matrix.columns:
    emission_matrix[col] = emission_matrix[col].apply(replace_zero_except_start_stop)

# Find the highest value and its corresponding column name for each row
max_value_columns = emission_matrix.idxmax(axis=1)
max_emission = pd.DataFrame({'Max_Emission': max_value_columns})

print(emission_matrix)

Label  B-negative  B-neutral  B-positive  I-negative  I-neutral  I-positive  \
!        0.000010   0.000010    0.000010    0.000010   0.000010    0.000010   
"        0.000010   0.009662    0.002693    0.014184   0.058824    0.045226   
%        0.000010   0.000010    0.000010    0.000010   0.000010    0.000010   
&        0.000010   0.000010    0.000010    0.000010   0.000010    0.003350   
'        0.000010   0.000010    0.000010    0.000010   0.000010    0.000010   
...           ...        ...         ...         ...        ...         ...   
–        0.000010   0.000010    0.000010    0.000010   0.000010    0.000010   
—        0.000010   0.000010    0.000010    0.000010   0.000010    0.000010   
…        0.000010   0.000010    0.000010    0.000010   0.000010    0.000010   
№        0.000010   0.000010    0.000010    0.000010   0.000010    0.001675   
#UNK#    0.002252   0.004831    0.000539    0.007092   0.014706    0.001675   

Label         O  START  STOP  
!      0.021912    0

In [4]:
#Part 1
#Read test data and implement naive max emission estimation

train_file_path = f'Data/{language}/train'
dev_file_path = f'Data/{language}/dev.in'
dev_out_file_path = f'Outputs/{language}/dev.p1.out'

def calc_argmax(word):
    """calculates the maximum probability"""
    if (df['Data'] == word).sum() == 0:
        word = '#UNK#'
    em_prob = {}
    for label in labels:
        em_prob[label] = emission_matrix[label][word]
    return max(em_prob, key=em_prob.get)
        
outputs = []

with open(dev_file_path, "r", encoding="utf-8") as f:
    open(dev_out_file_path, "w")
    for line in f:
        line = line.replace("\n", "")
        if line != "":
            tag = calc_argmax(line)
        else:
            tag = ""
        output = f"{line} {tag}"
        outputs.append(output)

# Open the file in write mode and write each element of the array as a new line
with open(dev_out_file_path, "w", encoding='utf-8') as file:
    for output in outputs:
        file.write(output + "\n")

In [29]:
#Part 2
#Generate MLE transition matrix
#transition_matrix[label][prev_label] -> likelihood of label fallowing prev_label

def max_likelihood_estimation(prev_label, label):
    prev_label_count = (df['Label'] == prev_label).sum()
    df['Previous_Label'] = df['Label'].shift(1)
    condition = (df['Label'] == label) & (df['Previous_Label'] == prev_label)
    label_follows_previous_count = condition.sum()
    return label_follows_previous_count/prev_label_count

transition_matrix = pd.DataFrame(index=labels, columns=labels)
for prev_label in labels:
    for label in labels:
        transition_matrix[label][prev_label] = max_likelihood_estimation(prev_label, label)

print(transition_matrix)

               START B-positive         O I-positive      STOP B-negative  \
START            0.0   0.112308  0.844597        0.0  0.000326   0.021548   
B-positive       0.0   0.001616   0.80937   0.188476  0.000539        0.0   
O                0.0   0.037235  0.874451        0.0  0.075482   0.009327   
I-positive       0.0   0.001675  0.582915   0.413735  0.001675        0.0   
STOP        0.999674        0.0       0.0        0.0       0.0        0.0   
B-negative       0.0        0.0  0.815315        0.0       0.0        0.0   
I-negative       0.0        0.0  0.574468        0.0  0.007092        0.0   
B-neutral        0.0        0.0  0.864734        0.0       0.0        0.0   
I-neutral        0.0        0.0  0.411765        0.0       0.0        0.0   

           I-negative B-neutral I-neutral  
START             0.0  0.021221       0.0  
B-positive        0.0       0.0       0.0  
O                 0.0  0.003504       0.0  
I-positive        0.0       0.0       0.0  
STOP     

In [30]:
#Part 2
#Viterbi Algorithm

def viterbi(sentence): #sentence eg ['START','Plato','degustación',':','un','poco','abundante','de','más',',','pero','bien','cocinado','.','STOP']
    viterbi_matrix = pd.DataFrame(index=labels, columns=sentence)
    best_back_path_matrix = pd.DataFrame(index=labels, columns=sentence)

    #Init START emitting START to 1
    #viterbi_matrix[label][word] represents the highest score of path from START to this node
    viterbi_matrix.loc[:, 'START'] = 0
    viterbi_matrix.loc['START', 'START'] = 1
    best_back_path_matrix.loc[:, 'START'] = None
    best_back_path_matrix.iloc[:, 1] = 'START'

    #DP algorithm
    for word_index in range(1,len(sentence)): #Start from second element, skip START
        for label in labels:
            max_score = 0
            for prev_label in labels:
                score = viterbi_matrix.iloc[viterbi_matrix.index.get_loc(prev_label), word_index-1] * emission_matrix[label][sentence[word_index]] * transition_matrix[label][prev_label]
                if score > max_score:
                    max_score = score
                    best_back_path_matrix.iloc[viterbi_matrix.index.get_loc(label), word_index] = prev_label
            viterbi_matrix.iloc[viterbi_matrix.index.get_loc(label), word_index] = max_score
            if max_score == 0: #No chance for this label, ignore possible path
                best_back_path_matrix.iloc[best_back_path_matrix.index.get_loc(label), word_index] = None

    #From highest scoring value at STOP, find the best back path.
    pointer = 'STOP'
    best_path = []
    for word_index in range(len(sentence)-1,-1,-1):
        best_path.append(pointer)
        pointer = best_back_path_matrix.iloc[best_back_path_matrix.index.get_loc(pointer), word_index]
    best_path = best_path[::-1] #Reverse the array
    best_path = best_path[1:-1] #Remove START and STOP
    
    return best_path

#Helper function to replace all new words with #UNK# and add START and STOP
def add_startstop_replace_new(sentence):
    for index, word in enumerate(sentence):
        if (df['Data'] == word).sum() == 0:
            sentence[index] = '#UNK#'
    sentence.insert(0, 'START')
    sentence.append('STOP')

#Wrapper function, call this.
def predict_sentence_sentiment(sentence):
    sentence_copy = sentence.copy()
    add_startstop_replace_new(sentence_copy)
    return viterbi(sentence_copy)

#sentence = ['Plato','degustación',':','un','poco','abundante','de','más',',','pero','bien','cocinado','.']
#predict_sentence_sentiment(sentence)

In [14]:
#Part 2
#Predict sentiment of dev.in, write to dev.p2.out

#Read the data
file_path = f"Data/{language}/dev.in"

with open(file_path, 'r', encoding='utf-8') as file:
    lines = file.readlines()

#Read data into array of sentence arrays
sentences = [[]]
sentence_number = 0
for line in lines:
    line = line.strip()
    if line:
        sentences[sentence_number].append(line)
    else:
        sentences.append([])
        sentence_number += 1

sentences_with_prediction = []
for sentence in sentences:
    try:
        prediction = predict_sentence_sentiment(sentence)
        # Using list comprehension
        sentence_with_prediction = [s1 + ' ' + s2 for s1, s2 in zip(sentence, prediction)]
        sentences_with_prediction.append(sentence_with_prediction)
    except:
        print("Error at sentence: ")
        print(sentence)

# Name of the output file
file_name = f"Outputs/{language}/dev.p2.out"

# Open the file in write mode and write each element of the array as a new line
with open(file_name, "w", encoding='utf-8') as file:
    for sentence in sentences_with_prediction:
        for line in sentence:
            file.write(line + "\n")
        file.write("\n")

In [31]:
#Part 3
#Viterbi Algorithm modified - find k-th best path

def viterbi_multiple(sentence, k): #sentence eg ['START','Plato','degustación',':','un','poco','abundante','de','más',',','pero','bien','cocinado','.','STOP']
    viterbi_matrix = np.zeros((len(labels), len(sentence), k)) 
    best_back_path_matrix = np.empty((len(labels), len(sentence), k), dtype=object) #best_bath_path_matrix[0,1,2] = (2,1) means third best score to first label, second word points back to second label, second best score

    START_LABEL_INDEX = np.where(labels == 'START')[0][0]
    STOP_LABEL_INDEX = np.where(labels == 'STOP')[0][0]

    #Init START emitting START to 1
    #viterbi_matrix[0,1,2] = 0.001 means third best score to first label, second word
    viterbi_matrix[START_LABEL_INDEX, 0, 0] = 1 #Init (START, START, path 0) to 1
    best_back_path_matrix[:, :, :] = None

    #DP algorithm
    for word_index in range(1,len(sentence)): #Start from second element, skip START
        for label_index, label in enumerate(labels):
            for prev_label_index, prev_label in enumerate(labels):
                for depth in range(k): #Iterate over all k best paths from previous node
                    score = viterbi_matrix[prev_label_index, word_index-1, depth] * emission_matrix[label][sentence[word_index]] * transition_matrix[label][prev_label]
   
                    #Find the position to insert the new value to maintain descending order
                    #Use negation as searchsorted only works for ascending
                    position = np.searchsorted(-viterbi_matrix[label_index, word_index, :], -score, side='right')

                    #If position == k, the score is smaller than existing scores
                    #Shift all values in both matrices from 'position' by one, then insert new element
                    if position < k:
                        for pointer in range(k-1, position, -1):
                            viterbi_matrix[label_index, word_index, pointer] = viterbi_matrix[label_index, word_index, pointer-1]
                            best_back_path_matrix[label_index, word_index, pointer] = best_back_path_matrix[label_index, word_index, pointer-1]
                        viterbi_matrix[label_index, word_index, position] = score
                        best_back_path_matrix[label_index, word_index, position] = (prev_label_index, depth)

    final_scores = viterbi_matrix[STOP_LABEL_INDEX, len(sentence)-1, :]

    #From all the highest scoring paths at STOP, find the best back path.
    best_paths = []
    best_path = []
    for path_number in range(k):
        if final_scores[path_number] != 0: #If score is 0 (no path), just put as previous path.
            pointer = STOP_LABEL_INDEX
            depth_pointer = path_number
            best_path = []
            for word_index in range(len(sentence)-1,-1,-1):
                best_path.append(labels[pointer])
                if word_index > 0:
                    back_path = best_back_path_matrix[pointer, word_index, depth_pointer]
                    pointer = back_path[0]
                    depth_pointer = back_path[1]
            best_path = best_path[::-1] #Reverse the array
            best_path = best_path[1:-1] #Remove START and STOP
            best_paths.append(best_path)
        else:
            best_paths.append(best_path)
    return best_paths

#Wrapper function, call this.
def predict_sentence_sentiment_multiple(sentence, k):
    sentence_copy = sentence.copy()
    add_startstop_replace_new(sentence_copy) #Same as qn2
    return viterbi_multiple(sentence_copy, k)

#sentence = ['Plato','degustación',':','un','poco','abundante','de','más',',','pero','bien','cocinado','.']
#predict_sentence_sentiment_multiple(sentence, 4)

In [37]:
#Part 3
#Predict sentiment of dev.in, output to dev.p3.2nd.out and dev.p3.8th.out

#Read the data
file_path = f"Data/{language}/dev.in"

with open(file_path, 'r', encoding='utf-8') as file:
    lines = file.readlines()

#Read data into array of sentence arrays
sentences = [[]]
sentence_number = 0
for line in lines:
    line = line.strip()
    if line:
        sentences[sentence_number].append(line)
    else:
        sentences.append([])
        sentence_number += 1

second_sentences_with_prediction = []
eighth_sentences_with_prediction = []
for sentence in sentences:
    try:
        prediction = predict_sentence_sentiment_multiple(sentence, 8)
        second_sentence_with_prediction = [s1 + ' ' + s2 for s1, s2 in zip(sentence, prediction[1])]
        second_sentences_with_prediction.append(second_sentence_with_prediction)
        eighth_sentence_with_prediction = [s1 + ' ' + s2 for s1, s2 in zip(sentence, prediction[7])]
        eighth_sentences_with_prediction.append(eighth_sentence_with_prediction)
    except:
        print("Error at sentence: ")
        print(sentence)

file_name = f"Outputs/{language}/dev.p3.2nd.out"
with open(file_name, "w", encoding='utf-8') as file:
    for sentence in second_sentences_with_prediction:
        for line in sentence:
            file.write(line + "\n")
        file.write("\n")

file_name = f"Outputs/{language}/dev.p3.8th.out"
with open(file_name, "w", encoding='utf-8') as file:
    for sentence in eighth_sentences_with_prediction:
        for line in sentence:
            file.write(line + "\n")
        file.write("\n")

In [32]:
#Part 4
#Design Challenge
#Introduce 'entity penalty': a normalization hyperparameter that penalizes the score based on the number of entities.
#The previous algorithm tend to generate too many entities: twice as many as gold entities.
#Higher entity penalty -> less entities predicted
#Test different entity penalties to find best hyperparameter

def viterbi_multiple(sentence, k): #sentence eg ['START','Plato','degustación',':','un','poco','abundante','de','más',',','pero','bien','cocinado','.','STOP']
    viterbi_matrix = np.zeros((len(labels), len(sentence), k)) 
    best_back_path_matrix = np.empty((len(labels), len(sentence), k), dtype=object) #best_bath_path_matrix[0,1,2] = (2,1) means third best score to first label, second word points back to second label, second best score

    START_LABEL_INDEX = np.where(labels == 'START')[0][0]
    STOP_LABEL_INDEX = np.where(labels == 'STOP')[0][0]

    #Init START emitting START to 1
    #viterbi_matrix[0,1,2] = 0.001 means third best score to first label, second word
    viterbi_matrix[START_LABEL_INDEX, 0, 0] = 1 #Init (START, START, path 0) to 1
    best_back_path_matrix[:, :, :] = None

    #DP algorithm
    for word_index in range(1,len(sentence)): #Start from second element, skip START
        for label_index, label in enumerate(labels):
            for prev_label_index, prev_label in enumerate(labels):
                for depth in range(k): #Iterate over all k best paths from previous node
                    score = viterbi_matrix[prev_label_index, word_index-1, depth] * emission_matrix[label][sentence[word_index]] * transition_matrix[label][prev_label]
   
                    #Find the position to insert the new value to maintain descending order
                    #Use negation as searchsorted only works for ascending
                    position = np.searchsorted(-viterbi_matrix[label_index, word_index, :], -score, side='right')

                    #If position == k, the score is smaller than existing scores
                    #Shift all values in both matrices from 'position' by one, then insert new element
                    if position < k:
                        for pointer in range(k-1, position, -1):
                            viterbi_matrix[label_index, word_index, pointer] = viterbi_matrix[label_index, word_index, pointer-1]
                            best_back_path_matrix[label_index, word_index, pointer] = best_back_path_matrix[label_index, word_index, pointer-1]
                        viterbi_matrix[label_index, word_index, position] = score
                        best_back_path_matrix[label_index, word_index, position] = (prev_label_index, depth)

    final_scores = viterbi_matrix[STOP_LABEL_INDEX, len(sentence)-1, :]

    #From all the highest scoring paths at STOP, find the best back path.
    best_paths = []
    best_path = []
    for path_number in range(k):
        if final_scores[path_number] != 0: #If score is 0 (no path), just put as previous path.
            pointer = STOP_LABEL_INDEX
            depth_pointer = path_number
            best_path = []
            for word_index in range(len(sentence)-1,-1,-1):
                best_path.append(labels[pointer])
                if word_index > 0:
                    back_path = best_back_path_matrix[pointer, word_index, depth_pointer]
                    pointer = back_path[0]
                    depth_pointer = back_path[1]
            best_path = best_path[::-1] #Reverse the array
            best_path = best_path[1:-1] #Remove START and STOP
            best_paths.append(best_path)
        else:
            best_paths.append(best_path)
    return best_paths, final_scores

#Wrapper function, call this.
def predict_sentence_sentiment_entity_penalty(sentence, k, entity_penalty):
    sentence_copy = sentence.copy()
    add_startstop_replace_new(sentence_copy) #Same as qn2
    best_paths, final_scores = viterbi_multiple(sentence_copy, k)

    highest_penalized_score = -1
    best_penalized_path = None
    for path_index, path in enumerate(best_paths):
        entity_count = 0
        for label in path:
            if label == 'B-positive' or label == 'B-negative' or label == 'B_neutral':
                entity_count += 1
        penalized_score = final_scores[path_index] * (1 - entity_count * entity_penalty)
        if penalized_score > highest_penalized_score:
            best_penalized_path = path
            highest_penalized_score = penalized_score
            
    return best_penalized_path

#sentence = ['Bonito', 'restaurante', ',', 'buen', 'ambiente', 'y', 'servicio', '.']
#print(predict_sentence_sentiment_entity_penalty(sentence, 4, 0.99))

In [33]:
#Part 4
#Predict sentiment of dev.in, output to dev.p4.out

#Read the data
file_path = f"Data/{language}/dev.in"

with open(file_path, 'r', encoding='utf-8') as file:
    lines = file.readlines()

#Read data into array of sentence arrays
sentences = [[]]
sentence_number = 0
for line in lines:
    line = line.strip()
    if line:
        sentences[sentence_number].append(line)
    else:
        sentences.append([])
        sentence_number += 1

entity_penalty = 0.1 #Tried several different values, this works well for ES dataset
k = 4

sentences_with_prediction = []
for sentence in sentences:
    try:
        prediction = predict_sentence_sentiment_entity_penalty(sentence, k, entity_penalty)
        # Using list comprehension
        sentence_with_prediction = [s1 + ' ' + s2 for s1, s2 in zip(sentence, prediction)]
        sentences_with_prediction.append(sentence_with_prediction)
    except:
        print("Error at sentence: ")
        print(sentence)

# Name of the output file
file_name = f"Outputs/{language}/dev.p4.out"

# Open the file in write mode and write each element of the array as a new line
with open(file_name, "w", encoding='utf-8') as file:
    for sentence in sentences_with_prediction:
        for line in sentence:
            file.write(line + "\n")
        file.write("\n")