# 50.007 Machine Learning Project Part 2

____

### Imports and File Reads

In [10]:
import math
import heapq
from collections import defaultdict
from itertools import groupby

# Read data from files using UTF-8 encoding
with open("Data/ES/train", encoding="utf-8") as f:
    es = f.read().splitlines()
with open("Data/RU/train", encoding="utf-8") as f:
    ru = f.read().splitlines()
with open("Data/ES/dev.in", encoding="utf-8") as f:
    dev_in_es = f.read().splitlines()
with open("Data/ES/dev.out", encoding="utf-8") as f:
    dev_out_es = f.read().splitlines()


### Estimating Emission Parameters for Hidden Markov Model

In [11]:
def estimate_emission_params(train_data, k=1):
    word_sentiment_counts = defaultdict(lambda: defaultdict(int))
    sentiment_counts = defaultdict(int)
    emission_params = {}

    # Count occurrences of (label, word) pairs
    for sentence in train_data:
        try:
            if sentence != "":
                x, label = sentence.split(" ")
        except:
            continue

        sentiment_counts[label] += 1
        word_sentiment_counts[label][x] += 1

    # Calculate emission parameters
    for key in word_sentiment_counts:
        for word in word_sentiment_counts[key]:
            emission_params[(word, key)] = word_sentiment_counts[key][word] / sentiment_counts[key]

    return emission_params, sentiment_counts

# Estimate emission parameters
es_para, count = estimate_emission_params(es)
print("Emission Parameters for ('palo', 'O'): ", es_para[("palo", "O")])

# Extract states
states = list(count.keys())
print("States:", states)


Emission Parameters for ('palo', 'O'):  3.238656605240146e-05
States: ['O', 'B-positive', 'B-negative', 'B-neutral', 'I-neutral', 'I-positive', 'I-negative']


### Estimating Transition Probabilities for Hidden Markov Model


In [12]:
def estimate_transition_parameters_test(sentences):
    transition_counts = {}
    state_counts = {}

    # Group sentences
    list_of_sentences = [list(sub) for ele, sub in groupby(sentences, key=bool) if ele]

    # Count state and transition occurrences
    for one_sentence in list_of_sentences:
        prev_state = 'START'

        for one_word in one_sentence:
            if one_word != "":
                word, state = one_word.split(" ")

                # Count state occurrences
                if state_counts.get(prev_state):
                    state_counts[prev_state] += 1
                else:
                    state_counts[prev_state] = 1

                # Count transition occurrences
                if prev_state not in transition_counts:
                    transition_counts[prev_state] = {}
                if state not in transition_counts[prev_state]:
                    transition_counts[prev_state][state] = 1
                else:
                    transition_counts[prev_state][state] += 1

                prev_state = state

        # Count 'END' transitions
        if "END" not in transition_counts[prev_state]:
            transition_counts[prev_state]["END"] = 1
        else:
            transition_counts[prev_state]["END"] += 1

    # Calculate transition probabilities
    for from_state, to_states in transition_counts.items():
        total_from_state_count = state_counts[from_state]
        for to_state, count in to_states.items():
            transition_counts[from_state][to_state] = count / total_from_state_count

    return transition_counts

# Estimate transition parameters
transition_params = estimate_transition_parameters_test(es)
print("Transition Parameters:", transition_params)


Transition Parameters: {'START': {'O': 0.9289176090468497, 'B-positive': 0.052234787291330104, 'B-negative': 0.014001077005923533, 'B-neutral': 0.004846526655896607}, 'O': {'O': 0.9456845511712573, 'B-positive': 0.038980620012503214, 'END': 0.06773802081418012, 'B-negative': 0.013054830287206266, 'B-neutral': 0.002279998529033207}, 'B-positive': {'O': 0.8791304347826087, 'I-positive': 0.11739130434782609, 'END': 0.008695652173913044, 'B-neutral': 0.0008695652173913044, 'B-positive': 0.0026086956521739132}, 'B-negative': {'O': 0.8196286472148541, 'I-negative': 0.18037135278514588, 'END': 0.010610079575596816}, 'B-neutral': {'I-neutral': 0.20833333333333334, 'O': 0.7916666666666666}, 'I-neutral': {'I-neutral': 0.6511627906976745, 'O': 0.3488372093023256}, 'I-positive': {'I-positive': 0.5718849840255591, 'O': 0.4281150159744409, 'END': 0.003194888178913738}, 'I-negative': {'O': 0.39766081871345027, 'I-negative': 0.6023391812865497}}


### Enhanced Viterbi Algorithm with Backpointers for Sequence Tagging

In [13]:
def vertibi_algorithm_test(sentence, transition_params, emission_params, states):
    n = len(sentence)
    num_states = len(states)
    viterbi = [{} for _ in range(n)]
    backpointers = [{} for _ in range(n)]

    # Initialization at time step 0
    word = sentence[0]
    for i in range(num_states):
        viterbi[0][states[i]] = math.log(emission_params.get((word, states[i]), 1e-10) + transition_params["START"].get(states[i], 1e-10))
    
    # Forward pass
    for z in range(1, n):
        word = sentence[z]
        for i in range(num_states):
            max_prob = float('-inf')
            save_state = ""
            for j in range(num_states):
                prob = viterbi[z - 1].get(states[j], float('-inf')) + math.log(emission_params.get((word, states[i]), 1e-10) + transition_params[states[j]].get(states[i], 1e-10))
                if prob > max_prob:
                    max_prob = prob
                    save_state = states[j]
            viterbi[z][states[i]] = max_prob
            backpointers[z][states[i]] = save_state

    # Termination step
    max_prob = float('-inf')
    final_state = 'STOP'
    for j in range(num_states):
        prob = viterbi[n - 1].get(states[j], float('-inf')) + math.log(transition_params[states[j]].get("STOP", 1e-10))
        if prob > max_prob:
            max_prob = prob
            final_state = states[j]
    
    # Backtracking step
    best_path = [final_state]
    for m in range(n - 1, 0, -1):
        best_path.insert(0, backpointers[m][best_path[0]])
    
    return best_path

def run_viterbi_on_dev_set(dev_set, transition_params, emission_params, states):
    output = []
    list_of_sentences = [list(sub) for ele, sub in groupby(dev_set, key=bool) if ele]
    for sentence in list_of_sentences:
        best_path = vertibi_algorithm_test(sentence, transition_params, emission_params, states)
        output.append(best_path)

    return output

### Model Evaluation Metrics

In [14]:
def compute_metrics(true_tags, predicted_tags):
    true_positive = 0
    false_positive = 0
    false_negative = 0
    
    for true_seq, pred_seq in zip(true_tags, predicted_tags):
        for true, pred in zip(true_seq, pred_seq):
            # Print for debugging purposes
            print(true, pred, true == pred)
            
            if true == pred and true != 'O':
                true_positive += 1
                print("True Positive Count:", true_positive)
            elif true != pred and true != 'O' and pred != 'O':
                false_positive += 1
                false_negative += 1  # Counting false negatives
    
    print("True Positive:", true_positive)
    print("False Positive:", false_positive)
    print("False Negative:", false_negative)
    
    # Calculate precision, recall, and F-score
    precision = true_positive / (true_positive + false_positive) if true_positive + false_positive > 0 else 0
    recall = true_positive / (true_positive + false_negative) if true_positive + false_negative > 0 else 0
    f_score = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0
    
    return precision, recall, f_score


### Precision and F-score Calculation Functions

In [15]:
# Calculate precision from true and predicted labels
def calculate_precision(true_labels, predicted_labels):
    correct_count = 0
    total_count = 0
    
    for true_seq, pred_seq in zip(true_labels, predicted_labels):
        for true, pred in zip(true_seq, pred_seq):
            if true == pred:
                correct_count += 1
            total_count += 1
    
    precision = correct_count / total_count if total_count > 0 else 0
    return precision

# Calculate F-score using recall and precision
def calculate_f_score(recall, precision):
    v1 = 1 / precision if precision > 0 else 1
    v2 = 1 / recall if recall > 0 else 1
    f_score = 2 / (v1 + v2) if (v1 + v2) > 0 else 0
    return f_score


### Enhanced Viterbi Algorithm with Backpointers for Analysis Tagging

In [16]:
# Viterbi Algorithm Implementation with Backpointers
def viterbi_algorithm_2(sentence, transition_params, emission_params, states):
    n = len(sentence)
    num_states = len(states)
    viterbi = [{} for _ in range(n)]
    backpointers = [{} for _ in range(n)]

    # Initialization at time step 0
    for state in states:
        emission_prob = emission_params.get((sentence[0], state), 1e-10)
        viterbi[0][state] = math.log(transition_params['START'].get(state, 1e-10)) + math.log(emission_prob)
        backpointers[0][state] = 'START'

    # Forward pass
    for t in range(1, n):
        for state in states:
            max_prob = float('-inf')
            prev_state = None
            for prev_state in states:
                transition_prob = transition_params[prev_state].get(state, 1e-10)
                emission_prob = emission_params.get((sentence[t], state), 1e-10)
                prob = viterbi[t - 1].get(prev_state, 1e-10) + math.log(transition_prob) + math.log(emission_prob)
                if prob > max_prob:
                    max_prob = prob
                    backpointers[t][state] = prev_state
            viterbi[t][state] = max_prob

    # Termination step
    max_prob = float('-inf')
    final_state = None
    for state in states:
        transition_prob = transition_params[state].get('STOP', 1e-10)
        prob = viterbi[n - 1][state] + math.log(transition_prob)
        if prob > max_prob:
            max_prob = prob
            final_state = state

    # Backtracking step
    best_path = [final_state]
    for t in range(n - 1, 0, -1):
        best_path.insert(0, backpointers[t][best_path[0]])

    return best_path

# Run Viterbi algorithm on the development set using viterbi_algorithm_2
def run_viterbi_on_dev_set_2(dev_set, transition_params, emission_params, states):
    output = []
    list_of_sentences = [list(sub) for ele, sub in groupby(dev_set, key=bool) if ele]
    for sentence in list_of_sentences:
        best_path = viterbi_algorithm_2(sentence, transition_params, emission_params, states)
        output.append(best_path)

    return output


## Final Model Training

In [17]:
# Train the model on the training set
transition_params = estimate_transition_parameters_test(es)
emission_params, count = estimate_emission_params(es)

# Extract the list of states from the count dictionary
states = list(count.keys())

# Run Viterbi algorithm on the development set using both implementations
predicted_tags = run_viterbi_on_dev_set(dev_in_es, transition_params, emission_params, states)
predicted_tags_2 = run_viterbi_on_dev_set_2(dev_in_es, transition_params, emission_params, states)

# Function to extract actual tags from a test set
def actual_tags(test_set):
    tags = []
    list_of_sentences = [list(sub) for ele, sub in groupby(test_set, key=bool) if ele]
    for sentence in list_of_sentences:
        innerlist = []
        for word in sentence:
            w, state = word.split()
            innerlist.append(state)
        tags.append(innerlist)
    return tags

# Calculate precision using the scores function
precision = calculate_precision(actual_tags(dev_out_es), predicted_tags)
print("Precision using implementation 1:", precision)

precision_2 = calculate_precision(actual_tags(dev_out_es), predicted_tags_2)
print("Precision using implementation 2:", precision_2)

# The following lines are commented out as they depend on the compute_metrics function:
# precision, recall, f_score = compute_metrics(actual_tags(dev_out_es), predicted_tags)
# print("Precision:", precision)
# print("Recall:", recall)
# print("F-score:", f_score)


Precision using implementation 1: 0.9271799628942486
Precision using implementation 2: 0.935064935064935
