**Import Libraries**


In [3]:
import os
import math
import pandas as pd

**Functions for Part 1**

In [4]:
def output_prediction(prediction, data, path):
    assert(len(prediction) == len(data))
    file = open(path, "w", encoding="utf-8")
    n = len(data)
    print("Writing", n, "lines")
    for i in range(n):
        assert(len(data[i]) == len(prediction[i]))
        m = len(data[i])
        for j in range(m):
            file.write(data[i][j] + " " + prediction[i][j] + "\n")
        file.write("\n")
    print("Wrote predictions to", path)

def get_training_set_words(data):
    words = set()
    for i in data:
        if len(data) > 1:
            words.add(i[0])
    return words

def dev_open(path):
  out = [[]]
  f = open(path, "r", encoding="utf-8")
  lines_in = f.readlines()
  for word in lines_in:
    if word == "\n":
      out.append([])
    else:
      out[-1].append(word.rstrip())
  return out[:-1]


def count_words_not_in_train(dev_data, train_words):
    words_not_in_train = 0
    for sentence in dev_data:
        for word in sentence:
            if word not in train_words:
                words_not_in_train += 1
    return words_not_in_train


def read_data(path):
  dataset = []
  f = open(path,"r", encoding="utf-8")
  training_set = f.readlines()
  for line in training_set:
    if len(line) == 1:
      dataset.append("\n")
    else:
      line = line.rstrip('\n')
      line = line.rpartition(' ')
      line = list(line)
      del line[1]
      if line != ['', '']:
        dataset.append(line)
  Edataset = [ele for ele in dataset]
  return Edataset


def count_tags(training_set):
  unique_tag_count = {'START':0,'O':0,'B-positive':0,'B-neutral':0,'B-negative':0,'I-positive':0,'I-neutral':0,'I-negative':0,'STOP':0}
  for data_pair in training_set:
    if len(data_pair) > 1:
      if data_pair[1] in unique_tag_count.keys():
        unique_tag_count[data_pair[1]] += 1
    elif len(data_pair)==1:
      unique_tag_count['START'] += 1 
      unique_tag_count['STOP'] += 1
  return unique_tag_count


def count_words_for_each_tag(training_set):
  label_generate_all = {'O':{},'B-positive':{},'B-neutral':{},'B-negative':{},'I-positive':{},'I-neutral':{},'I-negative':{}}
  for data in training_set:
    if len(data) > 1:
        if data[0] not in label_generate_all[data[1]].keys():
          label_generate_all[data[1]][data[0]] = 1
        else:
          label_generate_all[data[1]][data[0]] += 1 

  return label_generate_all


def get_tags(data):
    unique_labels = set()
    for sentence in data:
        for _, label in sentence:
            unique_labels.add(label)
    return list(unique_labels)


def estimate_emission_params(count_tags, count_tag_words, smoothing_factor=1):
    emission_params = {}
    for tag_tuple, tag_word_counts in count_tag_words.items():
        tag_estimations = {}
        for word, word_count in tag_word_counts.items():
            estimated_value = word_count / (count_tags[tag_tuple] + smoothing_factor)
            tag_estimations[word] = estimated_value
        tag_estimations['#UNK#'] = smoothing_factor / (count_tags[tag_tuple] + smoothing_factor)
        emission_params[tag_tuple] = tag_estimations
    
    return emission_params


def predict_sentiment(words, e_params, word_set):
    sentiment_labels = ['O', 'B-positive', 'B-neutral', 'B-negative', 'I-positive', 'I-neutral', 'I-negative']
    predictions = []
    for word in words:
        max_label = 'O'
        max_probability = 0
        if word not in word_set:
            word = "#UNK#"
        for label in sentiment_labels:
            if word not in e_params[label]:
                continue
            current_prob = e_params[label][word]
            if current_prob > max_probability:
                max_label = label
                max_probability = current_prob
        
        predictions.append(max_label)
    
    return predictions



def make_predictions(data, e_params, word_set):
    all_predictions = []
    for sentence in data:
        all_predictions.append(predict_sentiment(sentence, e_params, word_set))
    
    return all_predictions

**Calling functions, estimating emission params and tagging words for RU dev.in**

In [58]:
train_path_RU = "RU/train"
test_path_RU = "RU/dev.in"
output_path_RU = "RU/dev.p1.out"
train_data_RU = read_data(train_path_RU)
train_words_RU = get_training_set_words(train_data_RU)
tag_counts_RU = count_tags(train_data_RU)
test_data_RU = dev_open(test_path_RU)
tags_RU = count_tags(train_data_RU)
tag_word_counts_RU = count_words_for_each_tag(train_data_RU)
emission_params_RU = estimate_emission_params(tag_counts_RU, tag_word_counts_RU)
predictions_RU = make_predictions(test_data_RU, emission_params_RU, train_words_RU)
output_prediction(predictions_RU, test_data_RU, output_path_RU)

7881
Writing 437 lines
Wrote predictions to RU/dev.p1.out


**Calling functions, estimating emission params and tagging words for ES dev.in**

In [59]:
train_path_ES = "ES/train"
test_path_ES = "ES/dev.in"
output_path_ES = "ES/dev.p1.out"
output_path_part_2_ES = "ES/dev.p2.out"
train_data_ES = read_data(train_path_ES)
train_words_ES = get_training_set_words(train_data_ES)
tag_counts_ES = count_tags(train_data_ES)
test_data_ES = dev_open(test_path_ES)
tags_ES = count_tags(train_data_ES)
tag_word_counts_ES = count_words_for_each_tag(train_data_ES)
emission_params_ES = estimate_emission_params(tag_counts_ES, tag_word_counts_ES)
predictions_ES = make_predictions(test_data_ES, emission_params_ES, train_words_ES)
output_prediction(predictions_ES, test_data_ES, output_path_ES)

print(train_words_ES)

5041
Writing 266 lines
Wrote predictions to ES/dev.p1.out
{'cobrados', 'balalao', 'magnifica', 'BADAJOZ', 'pasada', 'dice', 'Disfrutamos', 'ahumado', 'fiasco', 'aclarar', 'Ternasco', 'ha', 'apunto', 'mantel', 'etabas', 'parecidas', 'pelín', 'coeficientes', 'mousse', 'nuestras', 'Jiloca', 'ejemplo', 'internet.Aunque', 'excesivo', 'revuelto', 'precios', 'J', 'sorprendía', 'defecto', 'agrado', 'anunciado', 'supera', 'vapor', 'genial', 'cogollo', 'dorada', 'preguntaran', 'repito', 'Recuerdo', 'mostaza', 'joya', 'aprendido', 'fortuna', 'dió', '1ª', 'legales', 'gracias', 'sed', 'colaboracion', 'encargaré', 'grado', 'pocas', 'encantó', 'LOS', 'llenarte.No', 'Visité', 'guías', 'venía', 'configuración', 'Jamon', 'exceso', 'trufa', 'hosteleria', 'nuestra', 'rape', 'Extraurinadio', 'resaltar', 'empezar', 'cuidasen', 'pudiera', 'ninguna', 'Maur', 'conejo', 'espuma', 'bullicioso', 'restauración', 'ect', 'fino', 'clarisima', 'suministraba', 'Universitas', 'dto', 'empalmamos', 'desapercibido', 'decid

**Functions for Part 2**

In [29]:
def estimate_transition_parameters(data, tag_counts):
    transition_counts = {'START': {}, 'O': {}, 'B-positive': {}, 'B-neutral': {}, 'B-negative': {}, 'I-positive': {}, 'I-neutral': {}, 'I-negative': {}}
    
    for label in transition_counts.keys():
        transition_counts[label] = {'O': 0, 'B-positive': 0, 'B-neutral': 0, 'B-negative': 0, 'I-positive': 0, 'I-neutral': 0, 'I-negative': 0, 'STOP': 0}
    
    last_position = 'START'
    
    for line in data:
        if line == "\n":
            transition_counts[last_position]["STOP"] += 1
            last_position = 'START'
        else:
            next_label = line[1]
            transition_counts[last_position][next_label] += 1
            last_position = next_label
    
    transition_probabilities = {'START': {}, 'O': {}, 'B-positive': {}, 'B-neutral': {}, 'B-negative': {}, 'I-positive': {}, 'I-neutral': {}, 'I-negative': {}, 'STOP': {}}
    
    for label_in in transition_probabilities.keys():
        transition_probabilities[label_in] = {'START': 0, 'O': 0, 'B-positive': 0, 'B-neutral': 0, 'B-negative': 0, 'I-positive': 0, 'I-neutral': 0, 'I-negative': 0, 'STOP': 0}
    
    for label_in, t_counts in transition_counts.items():
        for label_out, count in t_counts.items():
            transition_probabilities[label_in][label_out] = count / tag_counts[label_in]
    
    return transition_probabilities


def viterbi_algo(data, trans_probs, emiss_probs, vocab):
    n = len(data)
    labels = ['O', 'B-positive', 'B-neutral', 'B-negative', 'I-positive', 'I-neutral', 'I-negative', 'START']

    n_inf = -math.inf
    memo = [{'START': [n_inf, None],
             'STOP': [n_inf, None],
             'O': [n_inf, None],
             'B-positive': [n_inf, None],
             'B-neutral': [n_inf, None],
             'B-negative': [n_inf, None],
             'I-positive': [n_inf, None],
             'I-neutral': [n_inf, None],
             'I-negative': [n_inf, None]} for i in range(n + 2)]

    memo[0]['START'][0] = 0

    for j in range(0, n):
        next_wd = data[j]

        for u in labels:
            max_val = n_inf
            max_lbl = None
            for v in labels:
                if (memo[j][v][0] == n_inf or trans_probs[v][u] == 0):
                    continue
                prev_val = memo[j][v][0]
                if next_wd in vocab:
                    if next_wd not in emiss_probs[u].keys():
                        continue
                    else:
                        em_prob = emiss_probs[u][next_wd]
                else:
                    em_prob = emiss_probs[u]['#UNK#']
                trans_prob = trans_probs[v][u]
                prob = prev_val + math.log(em_prob) + math.log(trans_prob)
                if max_val < prob:
                    max_val = prob
                    max_lbl = v
            
            if max_val == n_inf:
                continue
            memo[j + 1][u][0] = max_val
            memo[j + 1][u][1] = max_lbl
    
    max_val = n_inf
    max_lbl = None
    for v in labels:
        prev_val = memo[n][v][0]
        trans_prob = trans_probs[v]['STOP']
        if (prev_val == 0 or trans_prob == 0):
            continue
        prob = prev_val + math.log(trans_prob)
        if max_val < prob:
            max_val = prob
            max_lbl = v

    if max_val != n_inf:
        memo[n + 1]['STOP'][0] = max_val
        memo[n + 1]['STOP'][1] = max_lbl

    output = ['' for i in range(n)]

    if max_lbl == None:
        max_lbl = "O"

    for j in range(n + 1, 1, -1):
        max_lbl = memo[j][max_lbl][1]
        if max_lbl == None:
            max_lbl = "O"
        output[j - 2] = max_lbl
    
    return output

def viterbi_call(sep_data, trans_probs, emiss_probs, vocab):
    final_result = []
    for doc in sep_data:
        final_result.append(viterbi_algo(doc, trans_probs, emiss_probs, vocab))
    return final_result


def viterbi_to_file(test, predictions, output_path):
    tags = []
    text = []
    
    for prediction in predictions:
        for tag in prediction:
            tags.append(tag)

    for words in test:
        for word in words:
            text.append(word)
    
    df = pd.DataFrame({
        0: text,
        1: tags
    })
    
    df.to_csv(output_path, sep='\t', index=False)
    print(f"Output written to {output_path}")
    
    return df

**Calling functions for part 2 RU**

In [30]:
output_path_part_2_RU = "RU/dev.p2.out"
transition_params_RU = estimate_transition_parameters(train_data_RU, tag_counts_RU)
prediction_RU = viterbi_call(test_data_RU, transition_params_RU, emission_params_RU, train_words_RU)
result_RU = viterbi_to_file(test_data_RU, prediction_RU, output_path_part_2_RU)

Output written to RU/dev.p2.out


**Calling functions for part 2 ES**

In [60]:
output_path_part_2_RU = "RU/dev.p2.out"
transition_params_RU = estimate_transition_parameters(train_data_RU, tag_counts_RU)
print(test_data_RU)
prediction_RU = viterbi_call(test_data_RU, transition_params_RU, emission_params_RU, train_words_RU)
result_RU = viterbi_to_file(test_data_RU, prediction_RU, output_path_part_2_RU)

[['Интерьер', ',', 'интерьер', ',', 'и', 'еще', 'раз', 'интерьер', '!', '!', '!', 'общее', 'цветовое', 'решение', 'и', 'каждая', 'деталь', '-', 'просто', 'восхитительны', '!', '!', '!', 'особенно', 'на', 'контрасте', 'с', 'популярным', 'сейчас', 'минимализмом', ',', 'Дача', '-', 'самое', 'домашне-уютное', 'и', 'при', 'этом', 'очень', 'красивое', 'место', '.'], ['Средний', 'счет', 'оказался', 'весьма', 'приличным', '.', 'но', 'только', 'сугубо', 'из-за', 'вина', '.'], ['То', 'же', 'касается', 'и', 'внешнего', 'вида', 'официанток', '.'], ['Затем', 'мы', 'попросили', 'счет', ',', 'его', 'так', 'же', 'долго', 'не', 'могли', 'принести', ',', 'как', 'и', 'заказ', '.'], ['В', 'день', 'банкета', 'с', 'первой', 'же', 'секунды', 'в', 'ресторане', 'ощущалось', 'настроение', 'праздника', 'весь', 'персонал', 'улыбался', 'и', 'поздравлял', '!', 'было', 'очень', 'приятно', '!', '=)', 'кухня', 'бесподобная', ',', 'все', 'гости', 'наелись', '"', 'до', 'отвала', '"', 'все', 'было', 'очень', 'вкусно', '!

In [61]:
max_length = 0
for i in test_data_RU:
    n = len(i)
    if n>max_length:
        max_length = n
        
        
print(max_length)

103


**Functions for part 3**


In [10]:
def viterbi_algo_topk(data, trans_probs, emiss_probs, vocab, k=5):
    n = len(data)
    labels = ['O', 'B-positive', 'B-neutral', 'B-negative', 'I-positive', 'I-neutral', 'I-negative', 'START']

    n_inf = -math.inf
    memo = [{'START': [(n_inf, None)] * k,
             'STOP': [(n_inf, None)] * k, 
             'O': [(n_inf, None)] * k,
             'B-positive': [(n_inf, None)] * k,
             'B-neutral': [(n_inf, None)] * k,
             'B-negative': [(n_inf, None)] * k,
             'I-positive': [(n_inf, None)] * k,
             'I-neutral': [(n_inf, None)] * k,
             'I-negative': [(n_inf, None)] * k} for i in range(n + 2)]

    memo[0]['START'][0] = (0, None) 

    for j in range(0, n):
        next_wd = data[j]

        for u in labels:
            topk_probs = []
            for r in range(k):
                max_val = n_inf
                max_lbl = None
                for v in labels:
                    if (memo[j][v][r][0] == n_inf or trans_probs[v][u] == 0):
                        continue
                    prev_val = memo[j][v][r][0]
                    if next_wd in vocab:
                        if next_wd not in emiss_probs[u].keys():
                            continue
                        else:
                            em_prob = emiss_probs[u][next_wd]
                    else:
                        em_prob = emiss_probs[u]['#UNK#']
                    trans_prob = trans_probs[v][u]
                    prob = prev_val + math.log(em_prob) + math.log(trans_prob)
                    if max_val < prob:
                        max_val = prob
                        max_lbl = v
                topk_probs.append((max_val, max_lbl))
            
            topk_probs.sort(reverse=True, key=lambda x: x[0])

            for r in range(k):
                memo[j + 1][u][r] = topk_probs[r]
    
    topk_end_probs = []
    for r in range(k):
        max_val = n_inf
        max_lbl = None
        for v in labels:
            prev_val = memo[n][v][r][0]
            trans_prob = trans_probs[v]['STOP']
            if (prev_val == 0 or trans_prob == 0):
                continue
            prob = prev_val + math.log(trans_prob)
            if max_val < prob:
                max_val = prob
                max_lbl = v
        topk_end_probs.append((max_val, max_lbl))

    topk_end_probs.sort(reverse=True, key=lambda x: x[0])

    for r in range(k):
        if topk_end_probs[r][0] != n_inf:
            memo[n + 1]['STOP'][r] = topk_end_probs[r]

    output = [['' for _ in range(k)] for _ in range(n)]

    for r in range(k):
        max_lbl = topk_end_probs[r][1]
        if max_lbl is None:
            max_lbl = "O"
        for j in range(n + 1, 1, -1):
            max_lbl = memo[j][max_lbl][r][1]
            if max_lbl is None:
                max_lbl = "O"
            output[j - 2][r] = max_lbl
    
    return output

def viterbi_call_topk(sep_data, trans_probs, emiss_probs, vocab, k=5):
    final_result = []
    for doc in sep_data:
        final_result.append(viterbi_algo_topk(doc, trans_probs, emiss_probs, vocab, k))
    return final_result

def viterbi_to_file_topk(test, predictions, output_path, k=5):
    tags = []
    text = []
    
    for prediction in predictions:
        for tag in prediction:
            tags.append(tag)

    for words in test:
        for word in words:
            text.append(word)
    
    df = pd.DataFrame({
        0: text,
        1: tags
    })
    
    df.to_csv(output_path, sep='\t', index=False)
    print(f"Output written to {output_path}")
    
    return df

**Calling functions for part 3 RU**

In [31]:
sep_data = [
    ['Средний', 'счет', 'оказался', 'весьма', 'приличным', '.', 'но', 'только', 'сугубо', 'из-за', 'вина', '.']
]
k = 4
predictions = viterbi_call_topk(sep_data, transition_params_RU, emission_params_RU, train_words_RU, k)
output_path = "RU/dev.p3.out"
viterbi_to_file_topk(sep_data, predictions, output_path, k)

Output written to RU/dev.p3.out


Unnamed: 0,0,1
0,Средний,"[B-neutral, O, O, O]"
1,счет,"[O, O, O, O]"
2,оказался,"[O, O, O, O]"
3,весьма,"[O, O, O, O]"
4,приличным,"[O, O, O, O]"
5,.,"[O, O, O, O]"
6,но,"[O, O, O, O]"
7,только,"[O, O, O, O]"
8,сугубо,"[O, O, O, O]"
9,из-за,"[O, O, O, O]"


In [52]:
# Assuming your data is already read into a list called input_lines
input_lines = read_data("RU/dev.p2.out")

def remove_whitespace(data):
    cleaned_data = []

    for inner_list in data:
        cleaned_inner_list = [' '.join(element.split('\t')) for element in inner_list if element.strip()]
        cleaned_data.append(cleaned_inner_list)

    return cleaned_data

cleaned_data = remove_whitespace(input_lines)

In [53]:
output_file_path = "dev.p2.out"
with open(output_file_path, "w", encoding="utf-8") as output_file:
    for line in cleaned_data:
        output_file.write(" ".join(line) + "\n")

print(f"Cleaned data saved to {output_file_path}")

Cleaned data saved to dev.p2.out


In [None]:
import numpy as np

def viterbi_5thbest(sentence, unique_labels, unique_tokens, unk_token, e_table, q_table):
    rank = 5
    # Initialisation step
    n = len(sentence)
    sentence = [None] + sentence
    m = len(unique_labels)
    pi = np.zeros((n + 2, m, rank))
    
    # Forward algorithm
    for j in range(n):
        if sentence[j + 1] in unique_tokens:
            cur_word = sentence[j + 1]
        else:
            cur_word = unk_token

        for cur_index in range(0, m):
            current_e = e_table[cur_index, unique_tokens.index(cur_word)]
            if j == 0:
                current_q = q_table[0, cur_index]
                pi[j + 1, cur_index, :] = 1 * current_e * current_q
            else:
                max_probs = []
                for prev_index in range(0, m):
                    for r in range(rank):
                        current_q = q_table[prev_index + 1, cur_index]
                        cur_prob = pi[j, prev_index, r] * current_e * current_q

                        max_probs.append(cur_prob)
                max_probs.sort(reverse=True)

                if len(max_probs) > rank:
                    max_probs = max_probs[:rank]
                pi[j + 1, cur_index] = max_probs

    # Termination step
    max_probs = []
    for prev_index in range(0, m):
        for r in range(rank):
            current_q = q_table[prev_index + 1, -1]
            cur_prob = pi[-1, prev_index, r] * current_q
            max_probs.append(cur_prob)

    max_probs.sort(reverse=True)
    if len(max_probs) > rank:
        max_probs = max_probs[:rank]
    pi[n + 1, -1] = max_probs

    # Backward algorithm
    yxs = np.zeros((n + 1, rank), dtype=int) + unique_labels.index("O")
    max_probs = []

    def take_last(elem):
        return elem[-1]

    for prev_index in range(0, m):
        for r in range(rank):
            current_q = q_table[prev_index + 1, -1]
            cur_prob = pi[-1, prev_index, r] * current_q

            max_probs.append([cur_index, cur_prob])
    max_probs.sort(reverse=True, key=take_last)

    def removeRepeated(lst):
        new = []
        for elem in lst:
            if elem[1] != 0 and elem not in new:
                new.append(elem)
        return new

    max_probs = removeRepeated(max_probs)

    if len(max_probs) > rank:
        max_probs = max_probs[:rank]

    parents = [i[0] for i in max_probs]

    yxs[n, :len(max_probs)] = parents

    for j in range(n - 1, 0, -1):
        max_probs = []
        for yx in yxs[j + 1]:
            for cur_index in range(0, m):
                for r in range(rank):
                    current_q = q_table[cur_index + 1, yx]
                    cur_prob = pi[j, cur_index, r] * current_q

                    max_probs.append([cur_index, cur_prob])

        max_probs.sort(reverse=True, key=take_last)
        max_probs = removeRepeated(max_probs)

        if len(max_probs) > rank:
            max_probs = max_probs[:rank]

        parents = [i[0] for i in max_probs]
        yxs[j, :len(max_probs)] = parents

    labelled_preds = [unique_labels[y] for y in yxs.T[-1][1:]]

    return labelled_preds
