In [None]:
import numpy as np

def read_sentence(f):
  sentence = []
  while True:
    line = f.readline()
    if not line or line == '\n':
      return sentence
    line = line.strip()
    word, tag = line.split("\t", 1)
    sentence.append((word, tag))

def read_corpus(file):
  f = open(file, 'r', encoding='utf-8')
  sentences = []
  while True:
    sentence = read_sentence(f)
    if sentence == []:
      return sentences
    sentences.append(sentence)

In [None]:
training = read_corpus('train.upos.tsv')
TAGS = ['ADJ', 'ADP', 'ADV', 'AUX', 'CCONJ', 'DET', 'INTJ', 'NOUN', 'NUM', 
        'PART', 'PRON', 'PROPN', 'PUNCT', 'SCONJ', 'SYM', 'VERB', 'X']
NUM_TAGS = len(TAGS)

alpha = 0.1
tag_counts = np.zeros(NUM_TAGS)
transition_counts = np.zeros((NUM_TAGS,NUM_TAGS))
obs_counts = {}

for sent in training:
  for i in range(len(sent)):
    word = sent[i][0]
    pos = TAGS.index(sent[i][1])
    tag_counts[pos] += 1
    if i < len(sent)-1:
      transition_counts[TAGS.index(sent[i+1][1]), pos] += 1
    if word not in obs_counts:
      obs_counts[word] = np.zeros(NUM_TAGS)
    (obs_counts[word])[pos] += 1

X0 = tag_counts / np.sum(tag_counts)
TPROBS = transition_counts / np.sum(transition_counts, axis=0)
OPROBS = {'#UNSEEN': np.divide(alpha*np.ones(NUM_TAGS), tag_counts+alpha)}
for word, counts in obs_counts.items():
  OPROBS[word] = np.divide(counts, tag_counts+alpha)

In [None]:
def unigram(obs):
  # Returns the tag of the word obs, as predicted by a unigram model
  # 
  # loop through X0 to get prob of each tag
  # go to OPROBS[obs] and then loop through to see Pr(word|tag)
  finalprob = np.empty(len(X0))
  for x in range(len(X0)):
    if obs in OPROBS.keys():
        finalprob[x]=X0[x]*OPROBS[obs][x]
    else:
        finalprob[x]=X0[x]*OPROBS['#UNSEEN'][x]
  index = np.argmax(finalprob)
  return TAGS[index]

In [None]:
def evaluate(sentences, method):
  correct = 0
  correct_unseen = 0
  num_words = 0
  num_unseen_words = 0

  for sentence in sentences:
    words = [sent[0] for sent in sentence]
    pos = [sent[1] for sent in sentence]
    unseen = [word not in OPROBS for word in words]
    if method == 'unigram':
      predict = [unigram(w) for w in words]
    elif method == 'viterbi':
      predict = viterbi(words)
    else:
      print("invalid method!")
      return

    if len(predict) != len(pos):
      print("incorrect number of predictions")
      return
    correct += sum(1 for i,j in zip(pos, predict) if i==j)
    correct_unseen += sum(1 for i,j,k in zip(pos, predict, unseen) if i==j and k)
    num_words += len(words)
    num_unseen_words += sum(unseen)
  
  print("Accuracy rate on all words: ", correct/num_words)
  if num_unseen_words > 0:
    print("Accuracy rate on unseen words: ", correct_unseen/num_unseen_words)

print("Training data evaluation")
evaluate(training, 'unigram')
test = read_corpus('test.upos.tsv')
print("")
print("Test data evaluation")
evaluate(test, 'unigram')

In [None]:
def elapse_time(m):
  """
  Given a "message" distribution over tags, return an updated distribution
  after a single timestep using Viterbi update, along with a list of the 
  indices of the most likely prior tag for each current tag
  """
  mprime = np.zeros(NUM_TAGS)
  prior_tags = np.zeros(NUM_TAGS, dtype=np.int8)
  #YOUR CODE HERE
  numcols = np.shape(TPROBS)[1]
  numrows = np.shape(TPROBS)[0]
  for i in range(numrows):
    probs = np.empty(numcols)
    for j in range(numcols):
      probs[j] = TPROBS[i,j]*m[j]
    mprime[i] = np.amax(probs)
    index = np.argmax(probs)
    prior_tags[i] = index
  return mprime, prior_tags
#testing m0 = X0
m1prime, tags = elapse_time(X0)
print(m1prime)

In [None]:
def observe(mprime, obs):
  """
  Given a "message" distribution over tags, return an updated distribution
  by weighting mprime by the emission probabilities corresponding to obs
  """
  m = np.zeros(NUM_TAGS)
  # YOUR CODE HERE
  if obs in OPROBS.keys():
    obsmatrix = np.zeros((len(OPROBS[obs]), len(OPROBS[obs])))
    for i in range(len(OPROBS[obs])):
      obsmatrix[i,i] = OPROBS[obs][i]
  else:
    obsmatrix = np.zeros((len(OPROBS["#UNSEEN"]), len(OPROBS["#UNSEEN"])))
    for i in range(len(OPROBS["#UNSEEN"])):
      obsmatrix[i,i] = OPROBS["#UNSEEN"][i]
  m = np.matmul(obsmatrix, mprime)
  return m
#test with #UNSEEN word
print(observe(m1prime, "asd"))
  

In [None]:
def viterbi(observations):
  """
  Given a list of word observations, return a list of predicted tags
  """
  m = X0
  pointers = []
  # YOUR CODE HERE
  # "Forward" phase of the Viterbi algorithm
  for x in range(len(observations)):
    mprime, predictedtags = elapse_time(m)
    newm = observe(mprime, observations[x])
    pointers.append(predictedtags)
    m = newm
  # "Backward" phase of the Viterbi algorithm
  indexsequence = []
  max = float("-inf")
  index = 0
  size = len(pointers)
  for i in range(len(pointers[-1])):
    if m[i]>max:
      max = m[i]
      index = i
  indexsequence.insert(0,index)
  while size>1:
    size-=1
    indexsequence.insert(0, pointers[size][index])
    index = pointers[size][index] 
  tagsequence = []
  for num in indexsequence:
    tagsequence.append(TAGS[num])
  return tagsequence
print(viterbi(["#UNSEEN"]))


In [None]:
print(viterbi(['a', 'round', 'circle']))
print(viterbi(['play', 'another', 'round']))
print(viterbi(['walk', 'round', 'the', 'fence']))

In [None]:
print("Training data evaluation")
evaluate(training, 'viterbi')
print("")
print("Test data evaluation")
evaluate(test, 'viterbi')