In [1]:
import numpy as np
import os

In [7]:
train_file_path = os.path.join("Data", "RU", "train")
test_file_path = os.path.join("Data", "RU", "dev.in")

train_words = []
tags = []

with open(train_file_path, "r", encoding="utf-8") as train_file:
    for l in train_file:
        if l.strip():  # Check if the line is not empty after stripping whitespace
            lst = l.split()
            x = " ".join(lst[:-1])  # Join all elements except the last as the word
            y = lst[-1]
            train_words.append(x)
            tags.append(y)

# Now you have train_words and tags populated with data


In [6]:
test_file=open(test_file_path,"r")
test_words = []

possible_encodings = ["utf-8", "utf-16", "latin-1"]  # List of possible encodings to try

for encoding in possible_encodings:
    try:
        with open(test_file_path, "r", encoding=encoding) as test_file:
            for l in test_file:
                if l.strip():
                    test_words.append(l[0:-1])
        break  # Exit the loop if successful
    except UnicodeDecodeError:
        pass  # Try the next encoding

In [8]:
def estimate_transition_parameters(train_tags):
    transition_counts = {}  # To count transitions between states
    state_counts = {}  # To count occurrences of each state

    # Count transitions and state occurrences
    for i in range(len(train_tags) - 1):
        current_state = train_tags[i]
        next_state = train_tags[i + 1]

        if current_state not in state_counts:
            state_counts[current_state] = 0
        state_counts[current_state] += 1

        if current_state not in transition_counts:
            transition_counts[current_state] = {}
        if next_state not in transition_counts[current_state]:
            transition_counts[current_state][next_state] = 0
        transition_counts[current_state][next_state] += 1
    
    # Handle q(STOP|yn) and q(y1|START)
    state_counts['START'] = len(train_tags)
    for state in state_counts:
        if state != 'START':
            transition_counts[state]['STOP'] = 1
    
    # Calculate transition probabilities
    transition_probs = {}
    for current_state, next_states in transition_counts.items():
        total_count = state_counts[current_state]
        transition_probs[current_state] = {next_state: count / total_count for next_state, count in next_states.items()}

    return transition_probs

In [9]:
### ALTERNATE EMISSION PROBABILITIES GENERATION ###
def estimate_emission_parameters(train_words, train_tags, k=1):
    emission_counts = {}  # To count emissions from states to words
    state_counts = {}  # To count occurrences of each state

    # Count emissions and state occurrences
    for i in range(len(train_words)):
        word = train_words[i]
        tag = train_tags[i]

        if tag not in state_counts:
            state_counts[tag] = 0
        state_counts[tag] += 1

        if tag not in emission_counts:
            emission_counts[tag] = {}
        if word not in emission_counts[tag]:
            emission_counts[tag][word] = 0
        emission_counts[tag][word] += 1
    
    # Calculate emission probabilities with #UNK# handling
    emission_probs = {}
    for tag, word_counts in emission_counts.items():
        total_count = state_counts[tag]
        emission_probs[tag] = {}
        for word, count in word_counts.items():
            emission_probs[tag][word] = count / total_count
        emission_probs[tag]['#UNK#'] = k / (total_count + k)

    return emission_probs

In [10]:
def emission_helper(x, y,tags, words=None, k=1):
    count_y=0
    if x=="#UNK#":
        for i in tags:
            if i==y:
                count_y+=1
        return k/(count_y+k)
    count_y_to_x=0
    for i in range(len(tags)):
        if tags[i] == y:
            count_y += 1
            if words[i] == x:
                count_y_to_x += 1
    return (count_y_to_x) / (count_y + k)

def emission(tags,train_words,test_words):
    emission_word_tag={}
    for x in np.unique(np.array(test_words)):
        y_val={}
        for y in np.unique(np.array(tags)):
            if x in train_words:
                ep = emission_helper(x, y,tags,train_words)
                y_val.update({y:ep})
            else:
                ep = emission_helper("#UNK#", y,tags)
                y_val.update({y:ep})
        emission_word_tag[x] = y_val
    return emission_word_tag

In [11]:
emission_word_tag=emission(tags,train_words,test_words)
transition_parameters=estimate_transition_parameters(tags)

print(emission_word_tag)
print(transition_parameters)

{'!': {'B-negative': 0.0, 'B-neutral': 0.0, 'B-positive': 0.0, 'I-negative': 0.0, 'I-neutral': 0.0, 'I-positive': 0.0, 'O': 0.021911318380339034}, '"': {'B-negative': 0.0, 'B-neutral': 0.009615384615384616, 'B-positive': 0.002691065662002153, 'I-negative': 0.014084507042253521, 'I-neutral': 0.057971014492753624, 'I-positive': 0.0451505016722408, 'O': 0.004589532903989933}, '%': {'B-negative': 0.0, 'B-neutral': 0.0, 'B-positive': 0.0, 'I-negative': 0.0, 'I-neutral': 0.0, 'I-positive': 0.0, 'O': 0.00029609889703160856}, '(': {'B-negative': 0.0, 'B-neutral': 0.0, 'B-positive': 0.0, 'I-negative': 0.0, 'I-neutral': 0.0, 'I-positive': 0.0, 'O': 0.00668690009129716}, ')': {'B-negative': 0.0, 'B-neutral': 0.0, 'B-positive': 0.0, 'I-negative': 0.0, 'I-neutral': 0.0, 'I-positive': 0.0, 'O': 0.013595874355368026}, ',': {'B-negative': 0.0, 'B-neutral': 0.0, 'B-positive': 0.0, 'I-negative': 0.014084507042253521, 'I-neutral': 0.057971014492753624, 'I-positive': 0.008361204013377926, 'O': 0.091790658

In [12]:
import numpy as np

def viterbi(sequence, tags, transition_parameters, emission_parameters):
    n = len(sequence)
    num_tags = len(tags)

    LTR=np.zeros((num_tags,n))
    RTL=np.zeros((num_tags,n))


    first_word=sequence[0]
    for i in range(num_tags):
        if ("START", tags[i]) not in transition_parameters:
            LTR[i, 0] = 1e-10*emission_parameters[first_word][tags[i]]
        else:
            LTR[i, 0] = transition_parameters[("START", tags[i])] * emission_parameters[first_word][tags[i]]

    for w in range(1, n):
        for i in range(0,num_tags):
            max_prob = -1
            max_backpointer = -1
            for j in range(0,num_tags):
                if (tags[j],tags[i]) not in transition_parameters:
                    prob = LTR[j, w-1] * 1e-10 * emission_parameters[sequence[w]][tags[i]]
                else:
                    prob = LTR[j, w-1] * transition_parameters[(tags[j], tags[i])] * emission_parameters[sequence[w]][tags[i]]

                if prob > max_prob:
                    max_prob = prob
                    max_backpointer = j
            
            LTR[i, w] = max_prob
            RTL[i, w] = max_backpointer
    
    stop_max_prob = -1
    stop_max_backpointer = -1
    for i in range(num_tags):
        if (tags[i],"STOP") not in transition_parameters:
            prob = LTR[i, n-1] * 1e-10
        else:
            prob = LTR[i, n-1] * transition_parameters[(tags[i], "STOP")]

        if prob > stop_max_prob:
            stop_max_prob = prob
            stop_max_backpointer = i
    
    # Retrieve the best path using backpointers
    best_path = []
    for w in range(n-1,-1,-1):
        best_path.insert(0,tags[stop_max_backpointer])
        if w==0:
            break
        stop_max_backpointer=int(RTL[stop_max_backpointer,w])

    return best_path


In [19]:
test_file=open(test_file_path,"r")

pred_output_path = os.path.join(os.getcwd(), "Data", "RU", "dev.p2.out")
pred_output=open(pred_output_path,"w")
sequence=[]

for l in test_file:
    if l!="\n":
        sequence.append(l[0:-1])
    else:
        predicted_tags=viterbi(sequence,np.unique(tags),transition_parameters,emission_word_tag)
        for i in range(0,len(predicted_tags)):
            pred_output.write(sequence[i]+" "+predicted_tags[i]+"\n")
        pred_output.write(l)
        pred_output.flush()
        sequence=[]

UnicodeDecodeError: 'charmap' codec can't decode byte 0x8f in position 150: character maps to <undefined>