# EECS 498 - Assignment 3 - Viterbi.py
### By: Alexander "AJ" Goldstein - uniquename: ajva

In [154]:
import csv
import sys
import pandas as pd
import numpy as np
import math
from __future__ import division
import random

## Pre-processing Functions:

### 1) train data processing

In [155]:
# input 1: trainFile (string) - name of train data file
# output 1: 
# output 2: 

def read_in_and_process_train_data(trainFile):
    
    # collect and store raw counts required for algorithm
    allTags = [] # collect a list of all unique tags
    tagFreqs_dict = {} # store corresponding frequency for each unique tag (KEY: tag, VALUE: count)
    tagPairsFreq_dict = {} # store corresponding freq for each unique tag pair (KEY: concatenated tags, VALUE: count)
    newWordTag_dict  = {} # store each new word as an individual dictionary of tag frequencies (KEY: tag, VALUE: count)
    corpusWords_dict = {} # store all unique words (KEY: word, VALUE: newWordTag_dict)
    
    with open(trainFile) as data:
        
        # Step One: store all unique tags, tag pairs, and their corresponding counts
        for line in data:
            splitLine = line.split()
            prevTag = 'BOSE'
            for unit in splitLine:
                word, slash, tag = unit.partition('/')
                
                # handle individual tag counts
                if tag not in allTags:
                    allTags.append(tag)
                    tagFreqs_dict[tag] = 1
                else:
                    tagFreqs_dict[tag] += 1
                
                # handle tag pair counts
                tagPair = prevTag + '_' + tag
                
                if tagPair not in tagPairsFreq_dict.keys():
                    tagPairsFreq_dict[tagPair] = 1
                else:
                    tagPairsFreq_dict[tagPair] += 1
                    
                prevTag = tag
        
        # reset read file pointer
        data.seek(0)
        
        # Step Two: store all unique words and their tag counts
        for line in data:
            splitLine = line.split()
            for unit in splitLine:
                word, slash, tag = unit.partition('/')
                
                if word not in corpusWords_dict.keys():
                    corpusWords_dict[word] = dict.fromkeys(allTags, 0)
                    corpusWords_dict[word][tag] += 1
                    
                else:
                    corpusWords_dict[word][tag] += 1
        
        
    return allTags, tagFreqs_dict, tagPairsFreq_dict, newWordTag_dict, corpusWords_dict

### 2) test data processing

In [156]:
# separate out sentences from their tags
def read_in_and_strip_test_data(testFile):
    
    # collect and store sentences (list of words) and their corresponding tags
    sentToTags_dict = {}
    
    with open(testFile) as data:
        
        for line in data:
            splitLine = line.split()
            sentence = []
            tags = []
            for unit in splitLine:
                word, slash, tag = unit.partition('/')
                sentence.append(word)
                tags.append(tag)
            
            sentence = tuple(sentence) # convert to tuple to store as dict key
            sentToTags_dict[sentence] = tags
        
    testSentences = sentToTags_dict.keys()
    testSolTags = sentToTags_dict.values()
    
    return testSentences, testSolTags, sentToTags_dict

## Probability Functions

In [157]:
def tag_given_tag_prob(tag, prevTag, tagPairsFreq_dict):
    
    tagPair = prevTag + '_' + tag
    
    if tagPair in tagPairsFreq_dict:
        tagPairCount = tagPairsFreq_dict[tagPair]
        totalTagPairs = sum(tagPairsFreq_dict.values())
        probability = float(tagPairCount/totalTagPairs)
    else:
        # if new tagPair, assume random
        probability = float(1/len(tagPairsFreq_dict.keys()))
    
    return probability

In [158]:
def word_given_tag_prob(word, tag, corpusWords_dict, tagFreqs_dict):
    
    if word in corpusWords_dict:
        wordTagFreq = corpusWords_dict[word][tag]
        totalTagFreq = tagFreqs_dict[tag]
        probability = float(wordTagFreq/totalTagFreq)
        
    else:
        # if new word, assume random
        probability = float(1/len(tagFreqs_dict.keys()))
        
    return probability

In [159]:
def maxFunction(probScores, allTags, currTag, prevWord, tagPairsFreq_dict):
    maxProb = 0
    maxTag = ''
    for tag in allTags:
        prob = probScores[(tag,prevWord)] * tag_given_tag_prob(currTag, tag, tagPairsFreq_dict)
        if prob > maxProb:
            maxProb = prob
            maxTag = tag
    
    return maxProb, maxTag

In [160]:
# find the tag with the maximum probability given a word
def maxProbTag(probScores, allTags, word):
    maxProb = 0
    maxTag = ''
    for tag in allTags:
        prob = probScores[(tag,word)]
        if prob >= maxProb:
            maxProb = prob
            maxTag = tag
            
    return maxTag

## Viterbi Algorithm

In [161]:
def run_Viterbi_algo(testSentences, allTags, tagFreqs_dict, tagPairsFreq_dict, corpusWords_dict):
    
    # create storage for sentence tag predictions
    sentToTagPreds_dict = {}
    
    # create and set up storage for tag-word combinations
    tagWordTuples = []
    allWords = corpusWords_dict.keys()
    for word in allWords:
        for tag in allTags:
            tagWordTuples.append((tag, word))
    
    # iterate through every test sentence
    for num, sentence in enumerate(testSentences):

        # 0) create storage for this sentence's predictions, probability scores, and back tag pointer
        tagPreds = ['']*len(sentence)
        probScores = dict.fromkeys(tagWordTuples, 0)
        backTagPtr = dict.fromkeys(tagWordTuples, 0)

        # 1) Initialization
        for tag in allTags:
            wordTagProb = word_given_tag_prob(sentence[0], tag, corpusWords_dict, tagFreqs_dict)
            tagTagProb = tag_given_tag_prob(tag,'BOSE', tagPairsFreq_dict)
            probScores[(tag, sentence[0])] =  wordTagProb * tagTagProb
            backTagPtr[(tag, sentence[0])] = None

        # 2) Iteration
        prevWord = sentence[0]
        for word in sentence[1:]:
            for tag in allTags:
                maxProb, maxPrevTag = maxFunction(probScores, allTags, tag, prevWord, tagPairsFreq_dict)
                wordTagProb = word_given_tag_prob(word, tag, corpusWords_dict, tagFreqs_dict)
                probScores[(tag, word)] = wordTagProb * maxProb
                backTagPtr[(tag, word)] = maxPrevTag
            prevWord = word
        
        # 3) Sequence Identification
        
        # 3.1) find most probable tag for last word
        lastWord = sentence[len(sentence)-1]
        if lastWord in allWords:
            lastTagPred = maxProbTag(probScores, allTags, lastWord)
        else:
            lastTagPred = random.choice(allTags)
        tagPreds[len(sentence)-1] = lastTagPred # store tag
            
        # 3.2) loop through all words (except last) in reverse order
        for index, word in enumerate(reversed(sentence[:-1])):
            
            # get nextWord in sentence to use for backTagPtr
            nextWord = sentence[len(sentence)-1-index]
            
            # if last word, set tag to above
            if nextWord == lastWord:
                nextTagPred = lastTagPred
            # if nextWord was in training data, find it's most probable tag
            elif nextWord in allWords:
                nextTagPred = maxProbTag(probScores, allTags, nextWord)
            else:
                # otherwise, randomize the tag
                nextTagPred = random.choice(allTags)
            
            # store tag for previous word using next word
            tagPreds[len(sentence)-2-index] = backTagPtr[(nextTagPred, nextWord)]

        # 3.3) store predictions for this sentence
        sentence = tuple(sentence)
        sentToTagPreds_dict[sentence] = tagPreds
        
    return sentToTagPreds_dict

## Simple Baseline Algorithm

In [162]:
def run_simple_baseline(testSentences, allTags, corpusWords_dict):
    
    # define all words from training set
    allWords = corpusWords_dict.keys()
    
    # create storage for sentence tag predictions
    sentToTagPreds_dict = {}
    
    # iterate through every test sentence
    for sentNum, sentence in enumerate(testSentences):
        # create storage
        tagPreds = ['']*len(sentence)
        for wordNum, word in enumerate(sentence):
            
            # generate tag prediction
            if word in allWords:
                wordTag_dict = corpusWords_dict[word]
                maxTag = max(wordTag_dict, key=wordTag_dict.get)
            else:
                maxTag = random.choice(allTags)
            
            # store predicted tag
            tagPreds[wordNum] = maxTag
        
        # store predicted sentence of tags
        sentence = tuple(sentence)
        sentToTagPreds_dict[sentence] = tagPreds
    
    return sentToTagPreds_dict

## Calculate Model Accuracy

In [163]:
def calculate_accuracy(sentToTagPreds_dict, sentToTagTruth_dict):
    
    correctTagCount = 0
    totalTags = 0
    incorrectPreds = []
    
    for sentence in sentToTagPreds_dict.keys():
        truthSentTags = sentToTagTruth_dict[sentence]
        predSentTags = sentToTagPreds_dict[sentence]
        for wordIndex, (truthTag, predTag) in enumerate(zip(truthSentTags, predSentTags)):
            totalTags += 1
            if predTag == truthTag:
                correctTagCount += 1
            else:
                # store word, incorrect prediction, and truth tag
                incorrectPreds.append([sentence[wordIndex], predTag, truthTag])
    
    accuracy = float(correctTagCount/totalTags)
    
    return accuracy, incorrectPreds

## Write Predictions Output File

In [173]:
def write_predictions_output(sentToTagPreds_dict):
    
    output = "POS.test.out"
    f = open(output, "w")

    for sentence, sentTags in sentToTagPreds_dict.items():
        for word, tag in zip(sentence, sentTags):
            f.write(word + '/' + tag + ' ')
        f.write('\n')

    f.close()

## Print Incorrect Predictions

In [174]:
def print_incorrect_predictions(incorrectPreds):
    print('incorrect predictions:' + '\n')
    for pred in incorrectPreds[:10]:
        print('word: ' + pred[0] + ' ')
        print('prediction: ' + pred[1] + ' ')
        print('truth: ' + pred[2] + '\n')

## Main Function

### 1) Read in file names

In [175]:
# SCRIPT NOTE: switch out arguments
# read in file names
trainFile = "POS.test"
testFile = "POS.train"

#trainFile = sys.argv[1]
#testFile = sys.argv[2]

largeTrain = "POS.train.large"

### 2) Read in and process data

In [176]:
# train data
allTags, tagFreqs_dict, tagPairsFreq_dict, newWordTag_dict, corpusWords_dict = read_in_and_process_train_data(trainFile)

# test data
testSentences, testSolTags, sentToTagsTruth_dict = read_in_and_strip_test_data(testFile)

### 3) Run Viterbi algorithm on test data

In [177]:
# run algorithm
sentToTagPreds_dict = run_Viterbi_algo(testSentences[:50], allTags, tagFreqs_dict, tagPairsFreq_dict, corpusWords_dict)

### 4) Calculate Viterbi accuracy

In [178]:
# compare sentToTagPreds_dict to sentToTagsTruth_dict
accuracy, incorrectPreds = calculate_accuracy(sentToTagPreds_dict, sentToTagsTruth_dict)
print('Viterbi model accuracy:', accuracy)

# print incorrect predictions
print_incorrect_predictions(incorrectPreds)

('Viterbi model accuracy:', 0.7231245166279969)
incorrect predictions:

word: EST 
prediction: NN 
truth: NP

word: Nov. 
prediction: DT 
truth: NP

word: extended 
prediction: DT 
truth: VBN

word: year 
prediction: NP 
truth: NN

word: , 
prediction: CC 
truth: ,

word: , 
prediction: IN 
truth: ,

word: New 
prediction: JJ 
truth: NP

word: York-based 
prediction: JJ 
truth: NP

word: of 
prediction: CC 
truth: IN

word: bought 
prediction: VBN 
truth: VBD



### 5) Write predictions output file

In [179]:
# write predictions output file
write_predictions_output(sentToTagPreds_dict)

### 6) Run simple baseline model

In [180]:
baselinePreds = run_simple_baseline(testSentences[:50], allTags, corpusWords_dict)

### 7) Calculate baseline accuracy

In [181]:
# compare sentToTagPreds_dict to sentToTagsTruth_dict
accuracy, incorrectPreds = calculate_accuracy(baselinePreds, sentToTagsTruth_dict)
print('Baseline model accuracy:', accuracy)

# print incorrect predictions
print_incorrect_predictions(incorrectPreds)

('Baseline model accuracy:', 0.679814385150812)
incorrect predictions:

word: offer 
prediction: VB 
truth: NN

word: EST 
prediction: VBN 
truth: NP

word: Nov. 
prediction: EX 
truth: NP

word: extended 
prediction: ) 
truth: VBN

word: York-based 
prediction: JJ 
truth: NP

word: arm 
prediction: ABC/NP 
truth: NN

word: bought 
prediction: VBN 
truth: VBD

word: controlling 
prediction: VBN 
truth: VBG

word: glass 
prediction: '' 
truth: NN

word: venture 
prediction: MD 
truth: NN



### 8) Process & test using large training data

In [None]:
# train data
allTags, tagFreqs_dict, tagPairsFreq_dict, newWordTag_dict, corpusWords_dict = read_in_and_process_train_data(largeTrain)

In [183]:
# run Viterbi algorithm
sentToTagPreds_dict = run_Viterbi_algo(testSentences[:50], allTags, tagFreqs_dict, tagPairsFreq_dict, corpusWords_dict)

In [184]:
# calculate Viterbi accuracy
accuracy, incorrectPreds = calculate_accuracy(sentToTagPreds_dict, sentToTagsTruth_dict)
print('Viterbi model accuracy (LARGE TRAIN):', accuracy)

('Viterbi model accuracy (LARGE TRAIN):', 0.7215777262180975)


In [185]:
# run Baseline model
baselinePreds = run_simple_baseline(testSentences[:50], allTags, corpusWords_dict)

In [186]:
# calculate baseline accuracy
accuracy, incorrectPreds = calculate_accuracy(baselinePreds, sentToTagsTruth_dict)
print('Baseline model accuracy (LARGE TRAIN):', accuracy)

('Baseline model accuracy (LARGE TRAIN):', 0.6767208043310131)
