In [35]:
from collections import defaultdict

with open("Data/ES/train") as f:
    es = f.read().splitlines()

with open("Data/RU/train") as f:
    ru = f.read().splitlines()

In [36]:
with open("Data/ES/dev.in") as f:
    dev_in_es = f.read().splitlines()
with open("Data/ES/dev.out") as f:
    dev_out_es = f.read().splitlines()
with open("Data/RU/dev.in") as f:
    dev_in_ru = f.read().splitlines()
with open("Data/RU/dev.out") as f:
    test_out_ru = f.read().splitlines()

In [37]:
def format_sentences(data):
    formatted =[]
    innerlist = []
    for sentence in data:
        try:
            if(sentence!=""):
                x, label = sentence.split(" ")
                innerlist.append(label)
            if(sentence==""):
                formatted.append(innerlist.copy())
                innerlist.clear()
                
        except:
            continue
    # print(formatted)
    return formatted

In [38]:
from itertools import groupby

def estimate_transition_parameters_test(sentences):
    transition_counts = {}
    state_counts = {}
    list_of_sentences = format_sentences(sentences)
    for one_sentence in list_of_sentences:
        # print(one_sentence)
        prev_state = 'START'
        for state in one_sentence:
            if state_counts.get(prev_state):
                state_counts[prev_state] +=1
            else:
                state_counts[prev_state] = 1

            if prev_state not in transition_counts:
                transition_counts[prev_state] = {}
            if state not in transition_counts[prev_state]:
                transition_counts[prev_state][state] = 1
            else:
                transition_counts[prev_state][state] += 1
            prev_state = state
        if "END" not in transition_counts[prev_state]:
            transition_counts[prev_state]["END"] = 1
            state_counts[prev_state] +=1
        else: 
            transition_counts[prev_state]["END"] +=1
            state_counts[prev_state] +=1
    # print(state_counts) 


    # Calculate transition probabilities and assign a default value of 0 if the entry doesn't exist
    for from_state, to_states in transition_counts.items():
        for state, x in to_states.items():
            # print(from_state,state)
            if state not in transition_counts[from_state]:
                transition_counts[from_state][state] = 0
            else:
                transition_counts[from_state][state] = transition_counts[from_state][state] / state_counts[from_state]

            

    # print("transition", transition_counts, "state", state_counts)       
    return transition_counts

In [39]:
estimate_transition_parameters_test(es)

{'START': {'O': 0.9289176090468497,
  'B-positive': 0.052234787291330104,
  'B-negative': 0.014001077005923533,
  'B-neutral': 0.004846526655896607},
 'O': {'O': 0.8856896848630963,
  'B-positive': 0.03650766316514551,
  'END': 0.06344067504735663,
  'B-negative': 0.012226623041157224,
  'B-neutral': 0.0021353538832443605},
 'B-positive': {'O': 0.871551724137931,
  'I-positive': 0.11637931034482758,
  'END': 0.008620689655172414,
  'B-neutral': 0.0008620689655172414,
  'B-positive': 0.002586206896551724},
 'B-negative': {'O': 0.8110236220472441,
  'I-negative': 0.1784776902887139,
  'END': 0.010498687664041995},
 'B-neutral': {'I-neutral': 0.20833333333333334, 'O': 0.7916666666666666},
 'I-neutral': {'I-neutral': 0.6511627906976745, 'O': 0.3488372093023256},
 'I-positive': {'I-positive': 0.5700636942675159,
  'O': 0.4267515923566879,
  'END': 0.0031847133757961785},
 'I-negative': {'O': 0.39766081871345027, 'I-negative': 0.6023391812865497}}

In [40]:
def estimate_emission_params(train_data,k=1):

    word_sentiment_counts = defaultdict(lambda: defaultdict(int))
    sentiment_counts =  defaultdict(int)
    emission_params = {}
    
    #getting the count(y) and count(y --> x)
    for sentence in train_data:
        try:
            if(sentence!=""):
                x, label = sentence.split(" ")
        except:
            continue

        sentiment_counts[label]+=1

        word_sentiment_counts[label][x]+=1

    #calculating the emission parameters

    for key in word_sentiment_counts:
        for word in word_sentiment_counts[key]:
            emission_params[(word,key)] = word_sentiment_counts[key][word]/(sentiment_counts[key])
    # print(emission_params)
    return emission_params, sentiment_counts

In [49]:
import math
import heapq

def viterbi_topk(sentence, transition_params, emission_params, states, k):
    n = len(sentence)
    viterbi = [{} for _ in range(n)]
    backpointers = [{} for _ in range(n)]

    # Initialization at time step 0
    for state in states:
        emission_prob = emission_params.get((sentence[0], state), 1e-10)
        viterbi[0][state] = math.log(transition_params['START'].get(state, 1e-10)) + math.log(emission_prob)
        backpointers[0][state] = state

    for z in range(1,n):
        k_best = []
        for state in states:
            k_best_heap = []
            for prev_seq in backpointers[z-1].keys():
                # print("test",prev_seq)
                prev_state = backpointers[z-1][prev_seq]
                emission_prob = emission_params.get((sentence[z], state), 1e-10)
                transition_prob = transition_params[prev_state].get(state, 1e-10)
                prob = viterbi[z - 1][prev_seq] + math.log(transition_prob) + math.log(emission_prob)
                heapq.heappush(k_best_heap,(prob, prev_seq +" "+ state, state))
                if len(k_best_heap) > k:
                    heapq.heappop(k_best_heap)
            for probability, state_list, status in k_best_heap:
                viterbi[z][state_list] = probability
                backpointers[z][state_list] = status
            # print(k_best_heap)

    # Termination step
    k_best_seqs = []
    for prev_seq in backpointers[n - 1]:
        prev_state = backpointers[n-1][prev_seq]
        transition_prob = transition_params[prev_state].get('STOP', 1e-10)
        prob = viterbi[n - 1][prev_seq] + math.log(transition_prob)
        heapq.heappush(k_best_seqs,(prob, prev_seq, state))
        if len(k_best_seqs) > k:
            heapq.heappop(k_best_seqs)
    # print(k_best_seqs)
    k_best = []
    for probability, state_list, status in k_best_seqs:
        k_best.append(state_list.split(" "))
    
    k_best = k_best[::-1]
    return k_best

# # # Example usage
# transition_params = {
#     'START': {'Noun': 0.7, 'Verb': 0.2, 'Adj': 0.1},
#     'Noun': {'Noun': 0.4, 'Verb': 0.3, 'Adj': 0.3, 'STOP': 0.1},
#     'Verb': {'Noun': 0.1, 'Verb': 0.5, 'Adj': 0.3, 'STOP': 0.1},
#     'Adj': {'Noun': 0.3, 'Verb': 0.2, 'Adj': 0.5, 'STOP': 0.1},
# }

# emission_params = {
#     ('apple', 'Noun'): 0.6,
#     ('eats', 'Verb'): 0.8,
#     ('red', 'Adj'): 0.7,
#     ('apple', 'Verb'): 0.1,
#     ('eats', 'Adj'): 0.1,
#     ('red', 'Noun'): 0.1,
#     ('apple', 'Adj'): 0.1,
#     ('eats', 'Noun'): 0.1,
#     ('red', 'Verb'): 0.1,
# }

# states = ['Noun', 'Verb', 'Adj']

# sentence = ['apple', 'eats', 'red']
# k_best_seqs = viterbi_topk(sentence, transition_params, emission_params, states, k=8)
# print(k_best_seqs)


In [50]:
# import math
# import heapq

# def viterbi_topk(sentence, transition_params, emission_params, states, k):
#     n = len(sentence)
#     viterbi = [{} for _ in range(n)]
#     backpointers = [{} for _ in range(n)]

#     # Initialization at time step 0
#     for state in states:
#         emission_prob = emission_params.get((sentence[0], state), 1e-10)
#         viterbi[0][state] = math.log(transition_params['START'].get(state, 1e-10)) + math.log(emission_prob)
#         backpointers[0][state] = state

#     for z in range(1,n):
#         k_best = []
#         for state in states:
#             k_best_heap = []
#             for prev_seq in backpointers[z-1].keys():
#                 # print("test",prev_seq)
#                 prev_state = backpointers[z-1][prev_seq]
#                 emission_prob = emission_params.get((sentence[z], state), 1e-10)
#                 transition_prob = transition_params[prev_state].get(state, 1e-10)
#                 prob = viterbi[z - 1][prev_seq] + math.log(transition_prob) + math.log(emission_prob)
#                 heapq.heappush(k_best_heap,(prob, prev_seq +" "+ state, state))
#                 if len(k_best_heap) > 30:
#                     heapq.heappop(k_best_heap)
#             for probability, state_list, status in k_best_heap:
#                 viterbi[z][state_list] = probability
#                 backpointers[z][state_list] = status
#             # print(k_best_heap)

#     # Termination step
#     k_best_seqs = []
#     for prev_seq in backpointers[n - 1]:
#         # print("prev1",prev_seq,"prev_s: ",backpointers[n-1][prev_seq])
#         prev_state = backpointers[n-1][prev_seq]

#         transition_prob = transition_params[prev_state].get('STOP', 1e-10)
#         prob = viterbi[n - 1][prev_seq] + math.log(transition_prob)
#         # print(prob, prev_seq, state)
#         heapq.heappush(k_best_seqs,(prob, prev_seq, prev_state))
#         if len(k_best_seqs) > k:
#             heapq.heappop(k_best_seqs)
#     # print("k-seq",k_best_seqs)
    
#     k_best = []
#     for probability, state_list, status in k_best_seqs:
#         k_best.append(state_list.split(" "))
    
#     k_best = k_best[::-1]
#     # print(k_best)
#     return k_best

# # # # Example usage
# # transition_params = {
# #     'START': {'Noun': 0.7, 'Verb': 0.2, 'Adj': 0.1},
# #     'Noun': {'Noun': 0.4, 'Verb': 0.3, 'Adj': 0.3, 'STOP': 0.1},
# #     'Verb': {'Noun': 0.1, 'Verb': 0.5, 'Adj': 0.3, 'STOP': 0.1},
# #     'Adj': {'Noun': 0.3, 'Verb': 0.2, 'Adj': 0.5, 'STOP': 0.1},
# # }

# # emission_params = {
# #     ('apple', 'Noun'): 0.6,
# #     ('eats', 'Verb'): 0.8,
# #     ('red', 'Adj'): 0.7,
# #     ('apple', 'Verb'): 0.1,
# #     ('eats', 'Adj'): 0.1,
# #     ('red', 'Noun'): 0.1,
# #     ('apple', 'Adj'): 0.1,
# #     ('eats', 'Noun'): 0.1,
# #     ('red', 'Verb'): 0.1,
# # }

# # states = ['Noun', 'Verb', 'Adj']

# # sentence = ['apple', 'eats', 'red']
# # k_best_seqs = viterbi_topk(sentence, transition_params, emission_params, states, k=8)
# # print(k_best_seqs)


In [51]:

def run_viterbi_on_dev_set_k(dev_set, transition_params, emission_params, states, k=8):
    output = []
    list_of_sentences = [list(sub) for ele, sub in groupby(dev_set, key = bool) if ele]
    for sentence in list_of_sentences:
        best_path = viterbi_topk(sentence, transition_params, emission_params, states, k)
        output.append(best_path)
    # print(output)
    return output


In [52]:
transition_params_es = estimate_transition_parameters_test(es)
emission_params_es,count = estimate_emission_params(es)
transition_params_ru = estimate_transition_parameters_test(ru)
emission_params_ru,count = estimate_emission_params(ru)
# Run Viterbi algorithm on the development set
states = []
for i in count.keys():
    states.append(i)
predicted_tags_ES = run_viterbi_on_dev_set_k(dev_in_es, transition_params_es, emission_params_es, states)
predicted_tags_RU = run_viterbi_on_dev_set_k(dev_in_ru, transition_params_ru, emission_params_ru, states)

def actual_word(test_set): 
    tags =[]
    list_of_sentences = [list(sub) for ele, sub in groupby(test_set, key = bool) if ele]
    for sentence in list_of_sentences:
        innerlist =[]
        for word in sentence:
            innerlist.append(word)
        tags.append(innerlist)
    return tags

In [53]:
actual_words_ES = actual_word(dev_in_es)
actual_words_RU = actual_word(dev_in_ru)

In [54]:
sentences_2_ES =[]
sentences_8_ES = []
sentences_2_RU =[]
sentences_8_RU = []
for i in predicted_tags_ES:
    sentences_2_ES.append(i[1])
    if len(i)>8:
        sentences_8_ES.append(i[7])
    else:
        sentences_8_ES.append(i[len(i)-1])

for i in predicted_tags_RU:
    sentences_2_RU.append(i[1])
    if len(i)>8:
        sentences_8_RU.append(i[7])
    else:
        sentences_8_RU.append(i[len(i)-1])

In [55]:
import sys

def compute_output(words, predicted_tags):
    output_captured = []

    for word, pred_tag in zip(words, predicted_tags):
        for w, pred in zip(word, pred_tag):
            output_captured.append(w + " " + pred)  # Append the combined word and tag
        output_captured.append("")  # Append an empty line after each sentence
    return output_captured  # Return the captured output as a list


# Call the compute_output function with actual_words and sentences_8
output_captured_8_ES = compute_output(actual_words_ES, sentences_8_ES)
output_captured_2_ES = compute_output(actual_words_ES, sentences_2_ES)
output_captured_8_RU = compute_output(actual_words_RU, sentences_8_RU)
output_captured_2_RU = compute_output(actual_words_RU, sentences_2_RU)

# Write the captured output to a text file
output_filename_2 = "dev.p3.2nd.out"  # Change this to your desired filename
output_filename_8 = "dev.p3.8th.out"
output_path_ES = "Data/ES/"  # Change this to your desired folder
output_path_RU = "Data/RU/"

with open(output_path_ES + output_filename_2, "w") as file:
    for line in output_captured_2_ES:
        file.write(line + "\n")

with open(output_path_ES + output_filename_8, "w") as file:
    for line in output_captured_8_ES:
        file.write(line + "\n")

with open(output_path_RU + output_filename_2, "w") as file:
    for line in output_captured_2_RU:
        file.write(line + "\n")

with open(output_path_RU + output_filename_8, "w") as file:
    for line in output_captured_8_RU:
        file.write(line + "\n")