In [1]:
from collections import defaultdict
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import re
import numpy as np

In [2]:
def parseData(filePath):
    data = []
    listOfSentences = []
    
    with open(filePath, 'r') as file:
        sentId = ""
        
        for line in file:
            line = line.strip()
            if line.startswith("# sent_id"):
                newLine = line.split("=")
                newSentence = newLine[-1].strip()
                sentId = newSentence
            
            elif line.startswith("# text"):
                newLine = line.split("=")
                newSentence = newLine[-1].strip()
                listOfSentences.append(newSentence)
                
                
            elif line and not line.startswith("#"):
                divisions = line.split()
                if len(divisions) >= 5:
                    data.append([sentId, divisions[0], divisions[1], divisions[3]])
                    
                    
    return data, listOfSentences


data, trainSentences = parseData("./NLP2/train.txt")

In [3]:
def calculateTagProbs(data):
    tagsAfterTagsCounts = defaultdict(lambda : defaultdict(int))
    tagCounts = defaultdict(int)
    tagProbs = defaultdict(lambda : defaultdict(float))
    
    for i in range(len(data)-1):
        currentTag = data[i][3]
        nextTag = data[i+1][3]
        tagsAfterTagsCounts[currentTag][nextTag] += 1
        tagCounts[currentTag] += 1
        
    for current, nextDict in tagsAfterTagsCounts.items():
        for nextT, count in nextDict.items():
            tagProbs[current][nextT] = count / tagCounts[current]
    
    return tagProbs

def calculateWordTagProbs(data):
    vocabularySet = set([])
    wordTagCounts = defaultdict(lambda: defaultdict(int))
    wordTagProbs = defaultdict(lambda: defaultdict(float))
    tagCounts = defaultdict(int)
    wordCounts = defaultdict(int)
    
    for i in range(len(data)):
        tag = data[i][3]
        word = data[i][2]
        wordTagCounts[tag][word] += 1
        tagCounts[tag]+=1
        vocabularySet.add(word)
    
    tagCnts = len(tagCounts)
        
    for tag, words in wordTagCounts.items():
        for word, count in words.items():
            wordTagProbs[tag][word] = (count + 1) / tagCounts[tag]
    
    return wordTagProbs, tagCnts, vocabularySet, wordTagCounts, tagCounts

In [4]:
def viterbi(sentence, tagProbs, wordTagProbs):
    v = [{1: "a"}]
    pathFollowed = {}
    
    # Base Case:
    for tag in wordTagProbs.keys():
        v[0][tag] = (1/tagCnts)*wordTagProbs[tag][sentence[0]]
        pathFollowed[tag] =[tag]
    
    # Recursive case
    for t in range(1, len(sentence)):
        v.append({})
        newpath = {}

        for tag in wordTagProbs.keys():
            (prob, state) = max((v[t-1][prevTag] * tagProbs[prevTag][tag] * wordTagProbs[tag][sentence[t]], prevTag)
                                for prevTag in wordTagProbs.keys())
            v[t][tag] = prob
            newpath[tag] = pathFollowed[state] + [tag]

        pathFollowed = newpath
    
    
    n = len(sentence) - 1
    (prob, state) = max((v[n][tag], tag) for tag in wordTagProbs.keys())
    
    return [prob, pathFollowed[state]]

In [5]:
# We need to perform add one smoothing on the training as well as the test data
# No need to perform smoothing on the tag probs
sentence = ["I", "am", "a", "boy", "."]
wordTagProbs, tagCnts, vocabularySet, x, y = calculateWordTagProbs(data)
tagProbs = calculateTagProbs(data)
viterbi(sentence, tagProbs, wordTagProbs)[1]

['PRP', 'VBP', 'DT', 'NN', '.']

In [6]:
def getCorrespondingTags(data):
    dictData = {}
    for smallArr in data:
        key = smallArr[0]
        value = smallArr[3]
        if key in dictData:
            dictData[key].append(value)
        else:
            dictData[key] = [value]

    result = []
    for key, values in dictData.items():
        result.append(values)

    return result

def getCorrespondingSentence(data):
    dictData = {}
    for smallArr in data:
        key = smallArr[0]
        value = smallArr[2]
        if key in dictData:
            dictData[key].append(value)
        else:
            dictData[key] = [value]

    result = []
    for key, values in dictData.items():
        result.append(values)

    return result

def getSmoothingCount(data):
    smoothingCount = 0
    for i in range(len(data)):
        if wordTagProbs[data[i][3]][data[i][2]] == 0:
            smoothingCount+=1
    return smoothingCount

testData, _ = parseData("./NLP2/test.txt")
getTags = getCorrespondingTags(testData)
smoothingCount = getSmoothingCount(testData)
sentences = getCorrespondingSentence(testData)
print(sentences)

[['The', 'prevalence', 'of', 'discrimination', 'across', 'racial', 'groups', 'in', 'contemporary', 'America', ':'], ['Results', 'from', 'a', 'nationally', 'representative', 'sample', 'of', 'adults'], ['Introduction', '.'], ['Personal', 'experiences', 'of', 'discrimination', 'and', 'bias', 'have', 'been', 'the', 'focus', 'of', 'much', 'social', 'science', 'research', '.', '[', '1', '-', '3', ']'], ['Sociologists', 'have', 'explored', 'the', 'adverse', 'consequences', 'of', 'discrimination', '[', '3', '–', '5', ']', ';'], ['psychologists', 'have', 'examined', 'the', 'mental', 'processes', 'that', 'underpin', 'conscious', 'and', 'unconscious', 'biases', '[', '6', ']', ';'], ['neuroscientists', 'have', 'examined', 'the', 'neurobiological', 'underpinnings', 'of', 'discrimination', '[', '7', '–', '9', ']', ';'], ['and', 'evolutionary', 'theorists', 'have', 'explored', 'the', 'various', 'ways', 'that', 'in', '-', 'group', '/', 'out', '-', 'group', 'biases', 'emerged', 'across', 'the', 'histor

In [7]:
# Perform Smoothing
def smoothenTransitionProbability(data, tagProbs, wordTagProbs, countTags, countWords):
    v = len(vocabularySet)
    for i in range(len(data)):
        wordTagProbs[data[i][3]][data[i][2]] = (countWords[data[i][3]][data[i][2]] + 1) / (v + countTags[data[i][3]])
    
    for tag, words in wordTagProbs.items():
        for word, prob in words.items():
            wordTagProbs[tag][word] = (countWords[tag][word] + 1) / (v + countTags[tag])
    
    return wordTagProbs

wordTagProbs = smoothenTransitionProbability(testData, tagProbs, wordTagProbs, y, x)

In [9]:
results = []
for i in range(len(sentences)):
    sentence = sentences[i]
    result = viterbi(sentence, tagProbs, wordTagProbs)[1]
    results.append(result)

# Flatten the lists
A_flat = [item for sublist in getTags for item in sublist]
B_flat = [item for sublist in results for item in sublist]

# Calculate metrics
accuracy = accuracy_score(A_flat, B_flat)
precision = precision_score(A_flat, B_flat, average='weighted')
recall = recall_score(A_flat, B_flat, average='weighted')
f1 = f1_score(A_flat, B_flat, average='weighted')

print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)
print("F1 Score:", f1)

Accuracy: 0.918040293040293
Precision: 0.9743806923244058
Recall: 0.918040293040293
F1 Score: 0.9429857781143647


  _warn_prf(average, modifier, msg_start, len(result))


In [None]:
def evaluate_model(test_data, transition_probabilities, emission_probabilities):
    gold_labels = []
    predicted_labels = []
    smoothing_count = 0

    for _, sentence, _, gold_tag in test_data:
        words = sentence.split()
        gold_labels.extend(gold_tag for _ in range(len(words)))
        _, predicted_tags, smoothing = viterbi_algorithm(words, transition_probabilities, emission_probabilities, smoothing_count)
        predicted_labels.extend(predicted_tags)
        smoothing_count += smoothing

    accuracy = accuracy_score(gold_labels, predicted_labels)
    precision = precision_score(gold_labels, predicted_labels, average='weighted')
    recall = recall_score(gold_labels, predicted_labels, average='weighted')
    f1 = f1_score(gold_labels, predicted_labels, average='weighted')

    return accuracy, precision, recall, f1, smoothing_count

def main():
    train_file_path = "./NLP2/test.txt"
    test_file_path = "./NLP2/test.txt"

    train_data = read_data(train_file_path)
    test_data = read_data(test_file_path)

    transition_probabilities = compute_transition_probabilities(train_data)
    emission_probabilities = compute_emission_probabilities(train_data)

    train_accuracy, train_precision, train_recall, train_f1, _ = evaluate_model(train_data, transition_probabilities, emission_probabilities)
    test_accuracy, test_precision, test_recall, test_f1, smoothing_count = evaluate_model(test_data, transition_probabilities, emission_probabilities)

    print("Evaluation Metrics for Train Data:")
    print(f"Accuracy: {train_accuracy}")
    print(f"Precision: {train_precision}")
    print(f"Recall: {train_recall}")
    print(f"F1-score: {train_f1}")

    print("\nEvaluation Metrics for Test Data:")
    print(f"Accuracy: {test_accuracy}")
    print(f"Precision: {test_precision}")
    print(f"Recall: {test_recall}")
    print(f"F1-score: {test_f1}")

    print(f"\nWords where Add-One Smoothing was applied in the test dataset: {smoothing_count}")

if __name__ == "__main__":
    main()