In [1]:
import sys
import numpy as np
from collections import defaultdict
from itertools import product

<b> Step 1. Calculate transition and emission probabilities </b>

In [228]:
def build_words_tags_sets(data, word_idx, tag_idx):
    # Create a set of tags and set of words to process unseen words & get counts
    set_of_tags = set()
    set_of_tags.add("<s>"); set_of_tags.add("</s>")
    set_of_words = set()
    total_list_of_words = []

    # Add in UNK word class
    set_of_words.add("UNK")

    for line in data:
        if line != '\n':
            word_tag = line.strip().split('\t')
            set_of_tags.add(word_tag[tag_idx])
            set_of_words.add(word_tag[word_idx])
            total_list_of_words.append(word_tag[word_idx])
    return set_of_tags, set_of_words, total_list_of_words

In [229]:
def build_lists_and_probabilities(train_data, set_of_tags, set_of_words, unk_count, smoothing=True):

    # Get counts of tag, words, word-tags
    word_tag_count = defaultdict(int) # C(t, w)
    word_count = defaultdict(int) # C(w)
    tag_count = defaultdict(int) # C(t)

    tag_sequence = ["<s>"]  # Sequence of tags, a sentence ends with .
    tag_sequence_count = 0 
    tag_tag_count = defaultdict(int) # C(t_{i-1}, t_i)

    # print('{}'.format(train_data[0][2:]))

    for line in train_data:
         if line != '\n':
            word_tag = line.strip().split('\t')

            word_tag_count[(word_tag[1], word_tag[2])] += 1
            word_count[word_tag[1]] += 1
            tag_count[word_tag[2]] += 2

            # Get all tags from all lines
            tag_sequence.append(word_tag[2])
            if word_tag[1] == ".":
                tag_sequence.append("</s>")
                for i in range(0, len(tag_sequence)-1):
                    tag_tag_count[(tag_sequence[i], tag_sequence[i+1])] += 1
                tag_sequence = ["<s>"]
                tag_sequence_count += 1

    tag_count["<s>"] = tag_sequence_count
    tag_count["</s>"] = tag_sequence_count
    
    # For UNK words, count from corpus
    word_count["UNK"] = unk_count

    # Combine tag set and word set 
    all_word_tag_combos = list(product(set_of_words, set_of_tags))
    for word_tag_combo in all_word_tag_combos:
        if word_tag_combo not in word_tag_count.keys():
            word_tag_count[word_tag_combo] = 0

    # Same thing for tag_tag_count
    all_tag_tag_combos = list(product(set_of_tags, set_of_tags))
    for tag_tag_combo in all_tag_tag_combos:
        if tag_tag_combo not in tag_tag_count.keys():
            tag_tag_count[tag_tag_combo] = 0

    # Get lists and probs
    # To smooth or to not smooth - that is the question...
    if smoothing:
        transition_probs = laplace_smooth_transition(tag_tag_count, tag_count, word_count)
        emission_probs = laplace_smooth_emission(word_tag_count, tag_count, word_count) 

    transition_probs = calculate_transition(tag_tag_count, tag_count)
    emission_probs = calculate_emission(word_tag_count, tag_count)
    
    list_of_tags = list(sorted(set_of_tags))
    list_of_words = list(sorted(set_of_words))
    
    return list_of_tags, list_of_words, word_count, transition_probs, emission_probs


In [230]:
def calculate_transition(tag_tag_count, tag_count):
    # Calculate transition probabilities
    # P(t_i | t_{i-1}) = C(t_{i-1, t_i}) / C(t_{i-1})
    transition_probs = {}
    for tag_tag in tag_tag_count:
        count_tag1tag2 = tag_tag_count[tag_tag]
        count_tag1 = tag_count[tag_tag[0]]
        transition_probs[tag_tag] = count_tag1tag2 / count_tag1        
    return transition_probs

In [231]:
def calculate_emission(word_tag_count, tag_count):
    # Calculate emission probabilities
    # P(w | t) = C(t, w) / C(t)
    # word-tag pairings with 0 prob not included, will be 0 by default if not in dict
    # No smoothing yet
    emission_probs = {}
    for word_tag in word_tag_count:
        count_word_tag = word_tag_count[word_tag]
        count_tag = tag_count[word_tag[1]]
        emission_probs[word_tag] = count_word_tag / count_tag
    return emission_probs

In [44]:
def laplace_smooth_transition(tag_tag_count, tag_count, word_count):
    vocab_size = len(set(word_count))
    
    transition_probs = {}
    for tag_tag in tag_tag_count:
        count_tag1tag2 = tag_tag_count[tag_tag]
        count_tag1 = tag_count[tag_tag[0]]
        new_prob = (count_tag1tag2 + 1) / (count_tag1 + vocab_size)     
        transition_probs[tag_tag] = new_prob
    return transition_probs

In [45]:
def laplace_smooth_emission(word_tag_count, tag_count, word_count):
    vocab_size = len(set(word_count))
    
    emission_probs = {}
    for word_tag in word_tag_count:
        count_word_tag = word_tag_count[word_tag]
        count_tag = tag_count[word_tag[1]]
        new_prob_1 = (count_word_tag + 1) / (count_tag + vocab_size)
        emission_probs[word_tag] = new_prob_1
    return emission_probs

<b> Step 2. Viterbi algorithm </b>

In [46]:
def viterbi_pos_tagger(list_of_tags, input_sentence, transition_probs, emission_probs):
    # Initializing matrices & vector
    # Matrix with tags vs words
    p = np.zeros(shape=(len(list_of_tags), len(input_sentence)))
    # Matrix for backtrace
    back = np.zeros(shape=(len(list_of_tags), len(input_sentence)), dtype=np.int)

    # Initializing step: P(tag|start) * P(word1|tag)
    for tag_i, tag in enumerate(list_of_tags):
        # Fill first col of matrix p & back matrices
        tag_given_start = ('<s>', tag)
        word_given_tag = (input_sentence[0], tag)
        p[tag_i, 0] = transition_probs[tag_given_start] * emission_probs[word_given_tag]
        back[tag_i, 0] = 0  # RECHECK this - not sure how to initialize back pointer

    # Recursion step - go through every tag for each token:
    for word_i in range(1, len(input_sentence)):
        for tagi1, tag1 in enumerate(list_of_tags):
            # For each tag, get its prob given all other tags:
            # Prev column * P(tag|all tags) * P(word|tag)
            # Fill in viterbi matrix
            p[tagi1, word_i] = np.max([p[tagi2, word_i - 1] * transition_probs[tag2, tag1] * emission_probs[input_sentence[word_i], tag1] for tagi2,tag2 in enumerate(list_of_tags)])
            # Fill in backpointer
            back[tagi1, word_i] = np.argmax([p[tagi2, word_i - 1] * transition_probs[tag2, tag1] * emission_probs[input_sentence[word_i], tag1] for tagi2,tag2 in enumerate(list_of_tags)])

    # Termination steps
    best_path_prob = np.max([p[tag_i, len(input_sentence)-1] for tag_i, tag in enumerate(list_of_tags)])        
    best_path_pointer = np.argmax([p[tag_i, len(input_sentence)-1] for tag_i, tag in enumerate(list_of_tags)])        

    return p, back, best_path_pointer, best_path_prob

In [47]:
def backtrace(back, best_path_pointer, input_sentence, list_of_tags):
    path_idx = [best_path_pointer]
    for column_i, column in enumerate(back.T[::-1]): # Starts at end 
        max_tag_idx = max(column)
        path_idx.append(max_tag_idx)
    # print(path_idx)

    tag_seq = []
    for i in range(0,len(path_idx)-1):
        tag_seq.append(list_of_tags[path_idx[i]])
        
    tag_seq = tag_seq[::-1]

    # print('Input sentence: {}'.format(input_sentence))
    # print('Part of speech: {}'.format(tag_seq))
    
    return tag_seq

After splitting data into training & test set:
    1. Build word & tag sets for entire data & training data
    2. Words in entire data but not training data are turned into UNK & counted 
    3. Use training data and the UNK counts to build transition and emission matrices
    4. Load test set & turn tokens into a list, preserve "/n"
    5. Run viterbi function on test set, get list of tags 
    6. Write output file with idx-token-tag
    7. Compare output file and test set & calculate accuracy

In [208]:
# SHUFFLE TRAINING DATA TO CREATE TRAIN & TEST SET: 80% train, 20% test
# Write to new files in same format

def split_train_test(train_data, train_test_split):
    data_array = np.array([ele.strip().split("\t") for ele in train_data if ele.strip() != ""])
    token_sentence_idxs = data_array[:,0]
    tokens = data_array[:,1]
    tags = data_array[:,2]
    sentences = []
    sentence = []
    
    for token, tag in zip(tokens, tags):
        if token == '.':
            sentence.append(np.array([token, tag]))
            sentences.append(np.array(sentence))
            sentence = []
        else:
            sentence.append(np.array([token, tag]))

    sentences = np.array(sentences)
    np.random.shuffle(sentences)
    
    num_total_sentences = sentences.shape[0]
    num_train_sentences = int(num_total_sentences * train_test_split)
    train_sentences = sentences[:num_train_sentences]
    test_sentences = sentences[num_train_sentences:]
    
    return train_sentences, test_sentences

def write_new_stuff(sentences, dataset_key):
    with open("{}_set_shuffled.txt".format(dataset_key), "w") as outfile:
        for sentence in sentences:
            for i, (token, tag) in enumerate(sentence, 1):
                out_write = "{}\t{}\t{}\n".format(i, token, tag)
                outfile.write(out_write)
            out_write = "\n"
            outfile.write(out_write)

            
with open('berp-POS-training.txt', 'r') as train_file:
    train_data = train_file.readlines()
train_test_split = 0.8
train_sentences, test_sentences = split_train_test(train_data, train_test_split)
write_new_stuff(train_sentences, "training")
write_new_stuff(test_sentences, "test")

In [209]:
# BUILD WORD AND TAG SETS FOR THE DATA FILES

# Entire data
with open('berp-POS-training.txt', 'r') as entire_file:
    entire_set = entire_file.readlines()

# Train set
with open('training_set_shuffled.txt', 'r') as train_file:
    train_set = train_file.readlines()
    
# Test set
with open('test_set_shuffled.txt', 'r') as test_file:
    test_set = test_file.readlines()
    
# Build word & tag sets
entire_tags, entire_words, entire_total_word_list = build_words_tags_sets(entire_set)
train_tags, train_words, train_total = build_words_tags_sets(train_set)
test_tags, test_words, test_total = build_words_tags_sets(test_set)

In [210]:
# DEAL WITH UNSEEN WORDS IN TRAIN SET
# Across the given lexicon, count words unseen in train set
unk_count = 0
unk_words = []
for word in entire_words:
    if word != '/n':
        if word not in train_words:
            unk_words.append(word)
            unk_count += entire_total_word_list.count(word)

In [248]:
# PROCESS TEST SET: REPLACE UNSEEN WORDS WITH 'UNK' & GET LIST OF GROUND-TRUTH TAGS
def process_testset(test_set, train_words, word_idx, tag_idx):
    test_data = np.array([ele.strip().split("\t") for ele in test_set if ele.strip() != ""])

    # nums = test_data[:,0]
    tokens = test_data[:,word_idx]
    tags = test_data[:,tag_idx]
    
    original_sentences = []
    original_sentence = []

    sentences_tags = []
    sentence_tags = []

    sentences_with_unk = []
    sentence_with_unk = []

    for token, tag in zip(tokens, tags):

        if token == '.':
            original_sentence.append(token)
            original_sentences.append(original_sentence)
            original_sentence = []
        else:
            original_sentence.append(token)

        # Replace unseen with 'UNK'
        if token not in train_words:
            token = "UNK"

        if token == '.':
            sentence_with_unk.append(token)
            sentences_with_unk.append(sentence_with_unk)
            sentence_with_unk = []
        else:
            sentence_with_unk.append(token)

        if tag == '.':
            sentence_tags.append(tag)
            sentences_tags.append(sentence_tags)
            sentence_tags = []
        else:
            sentence_tags.append(tag)

    return original_sentences, sentences_with_unk, sentences_tags

original_sentences, sentences_with_unk, sentences_tags = process_testset(test_set, train_words, 1, 2)


In [212]:
# RUN VITERBI ON INPUT TOKENS FROM TEST SET

SMOOTHING = True

# Use training data and the UNK counts to build transition and emission matrices
list_of_tags, list_of_words, word_count, transition_probs, emission_probs = build_lists_and_probabilities(train_set, train_tags, train_words, unk_count, smoothing=SMOOTHING)

# Run viterbi function on test set, get list of tags 
sequence_tags = []
for seq_i, seq in enumerate(sentences_with_unk):
    sys.stdout.write("\rSentence [{}/{}]".format(seq_i+1, len(sentences_with_unk)))
    sys.stdout.flush()
    p, back, best_path_pointer, best_path_prob = viterbi_pos_tagger(list_of_tags, seq, transition_probs, emission_probs)
    output_tags = backtrace(back, best_path_pointer, seq, list_of_tags)
    sequence_tags.append(output_tags)

Sentence [3174/3174]

Calculate accuracy of model on test set:

In [213]:
# Flatten lists
flatten_list = lambda target_list : [tag for tag_list in target_list for tag in tag_list]
ground_truth = flatten_list(sentences_tags)
viterbi_output = flatten_list(sequence_tags)
original_sentences_flatten = flatten_list(original_sentences)
sentences_with_unk_flatten = flatten_list(sentences_with_unk)

In [258]:
ground_truth

['CD',
 'NNS',
 'UH',
 '.',
 'PRP',
 'VBP',
 'TO',
 'VB',
 'JJ',
 'NN',
 '.',
 'PRP',
 'VBP',
 'TO',
 'VB',
 'NN',
 'NN',
 '.',
 'PRP',
 'MD',
 'VB',
 'JJR',
 'NN',
 'IN',
 'NN',
 'NN',
 'UH',
 '.',
 'CD',
 'TO',
 'CD',
 'NNS',
 '.',
 'VBZ',
 'EX',
 'DT',
 'JJ',
 'NN',
 'IN',
 'DT',
 'NN',
 '.',
 'RB',
 'PRP',
 'VBP',
 'DT',
 'JJ',
 'NN',
 '.',
 'PRP',
 'VBP',
 'TO',
 'VB',
 'JJR',
 'IN',
 'DT',
 'NN',
 '.',
 'NN',
 'EX',
 'VBD',
 'DT',
 'JJ',
 'NN',
 'IN',
 'DT',
 'NN',
 'RB',
 'WDT',
 'PRP',
 'VBD',
 'RB',
 'MD',
 'PRP',
 'VB',
 'PRP',
 'IN',
 'DT',
 'NN',
 'NNS',
 'IN',
 'NN',
 '.',
 'NN',
 'UH',
 '.',
 'UH',
 'PRP',
 'MD',
 'VB',
 'TO',
 'VB',
 'DT',
 'JJ',
 'NN',
 'IN',
 'RB',
 'JJR',
 'IN',
 'CD',
 'NNS',
 'IN',
 'NNS',
 '.',
 'NN',
 'POS',
 'NNS',
 'NN',
 '.',
 'NNP',
 'IN',
 'NN',
 '.',
 'JJ',
 'CC',
 'JJ',
 'NN',
 '.',
 'VB',
 'PRP',
 'JJR',
 'IN',
 'DT',
 'NN',
 '.',
 'PRP',
 'MD',
 'VB',
 'TO',
 'VB',
 'CD',
 'NNS',
 '.',
 'DT',
 'NN',
 'VBZ',
 'RB',
 'VB',
 '.',
 'RB',
 '.'

In [220]:
# Get accuracy
def calculate_accuracy(system_output, ground_truth):
    count_correct = 0
    nums = 0
    for tag_pred, tag_target in zip(system_output, ground_truth):
        if tag_pred == tag_target:
            count_correct += 1
        nums += 1
    return count_correct/nums

calculate_accuracy(viterbi_output, ground_truth)

0.9477372648838055

In [261]:
# Accuracy on UNKNOWN words...
total_unk = 0
accurate_tag = 0
for i, word in enumerate(sentences_with_unk_flatten):
    if word == "UNK":
        total_unk += 1
        # print(original_sentences_flatten[i])
        # print('Ground truth tag: {}'.format(ground_truth[i]))
        # print('Viterbi tag: {} \n'.format(viterbi_output[i]))
        if ground_truth[i] == viterbi_output[i]:
            accurate_tag +=1
            
unk_accuracy = accurate_tag / total_unk

0.0

In [223]:
def create_word_tag_list(sentences_flatten, output_tags):
    sentences = []
    sentence = []
    for token, tag in zip(sentences_flatten, output_tags):
        if token == '.':
            sentence.append(np.array([token, tag]))
            sentences.append(np.array(sentence))
            sentence = []
        else:
            sentence.append(np.array([token, tag]))
            
    return sentences

sentences = create_word_tag_list(original_sentences_flatten, viterbi_output)


In [217]:
with open("{}.txt".format('viterbi_output'), "w") as outfile:
    for sentence in sentences:
        for i, (token, tag) in enumerate(sentence, 1):
            out_write = "{}\t{}\t{}\n".format(i, token, tag)
            outfile.write(out_write)
        out_write = '\n'
        outfile.write(out_write)

In [227]:
# Try on given test set
# Read test set
with open('assgn2-test-set.txt', 'r') as assgn2_test_file:
    assgn2_test_set = assgn2_test_file.readlines()

In [253]:
assgn2_test_tags, assgn2_test_words, assgn2_test_total = build_words_tags_sets(assgn2_test_set, 1, 0)
assgn2_original_sentences, assgn2_sentences_with_unk, assgn2_sentences_tags = process_testset(assgn2_test_set, train_words, 1, 0)

In [255]:
# Run viterbi function on test set, get list of tags
assgn2_sequence_tags = []
for seq_i, seq in enumerate(assgn2_sentences_with_unk):
    sys.stdout.write("\rSentence [{}/{}]".format(seq_i+1, len(assgn2_sentences_with_unk)))
    sys.stdout.flush()
    p, back, best_path_pointer, best_path_prob = viterbi_pos_tagger(list_of_tags, seq, transition_probs, emission_probs)
    output_tags = backtrace(back, best_path_pointer, seq, list_of_tags)
    assgn2_sequence_tags.append(output_tags)

Sentence [733/733][['PRP', 'VB', 'JJ', 'NN', 'UH', 'MD', 'PRP', 'VB', 'PRP', '.', '.', '.'], ['UH', 'PRP', 'MD', 'VB', 'TO', 'VB', 'IN', 'NN', 'NN', 'IN', 'UH', 'NN', '.'], ['RB', 'RB', 'RB', 'RB', 'IN', 'CD', 'CC', 'CD', 'NNS', 'IN', 'NN', '.'], ['PRP', 'VBP', 'DT', 'NN', 'RB', 'PRP', 'VBP', 'RB', 'VB', '.'], ['VB', 'PRP', 'RBR', 'IN', 'JJ', 'NN', 'TO', 'VB', '.'], ['UH', 'PRP', 'MD', 'VB', 'TO', 'VB', 'JJ', 'NN', 'IN', 'NN', '.'], ['PRP', 'VBP', 'RB', 'VB', 'PRP', 'VBP', 'RB', 'VBP', 'TO', 'VB', 'NN', 'RBR', 'IN', 'JJ', 'NNS', 'PRP', 'VBP', 'TO', 'VB', 'IN', 'NN', '.'], ['VBP', 'PRP', 'VB', 'IN', 'DT', 'NN', 'VBN', 'NN', 'HYPH', 'NN', '.'], ['UH', 'PRP', 'VBP', 'DT', 'JJ', 'NN', 'VBZ', 'DT', 'RBS', 'RB', 'IN', '.', '.', '.', '.', '.', '.', '.', '.', '.', '.'], ['UH', 'VB', 'PRP', 'JJ', 'NNS', 'IN', 'NN', '.'], ['PRP', 'MD', 'VB', 'TO', 'VB', 'IN', 'NN', 'NN', 'NN', 'IN', 'DT', 'JJ', 'NN', '.'], ['PRP', 'MD', 'VB', 'IN', 'CD', 'NNS', 'IN', 'NN', 'CC', 'PRP', 'MD', 'VB', '.'], ['MD', '




In [257]:
def write_new_txt(sentences, fname):
    with open("{}.txt".format(fname), "w") as outfile:
        for sentence in sentences:
            for i, (token, tag) in enumerate(sentence, 1):
                out_write = "{}\t{}\t{}\n".format(i, token, tag)
                outfile.write(out_write)
            out_write = "\n"
            outfile.write(out_write)
            
assgn2_viterbi_output = flatten_list(assgn2_sequence_tags)
assgn2_original_sentences_flatten = flatten_list(assgn2_original_sentences)
assgn2_sentences_with_unk_flatten = flatten_list(assgn2_sentences_with_unk)
assgn2_sentences = create_word_tag_list(assgn2_original_sentences_flatten, assgn2_viterbi_output)
write_new_txt(assgn2_sentences, "TEST-nguyen-dieumy-assgn2-test-output")
