In [1]:
import collections
import itertools
import numpy as np
import re

np.set_printoptions(2)

In [2]:
sos = '<s>'
eos = '</s>'
unk = '<unk>'
def read_docs(dataset_year, max_docs_count: int):
  is_doc = False
  docs_count = 0

  with open(f'dataset/cg.Folha.{dataset_year}', encoding='windows-1252') as file: 
    for line in file:
      if docs_count >= max_docs_count:
        break

      if line.startswith('<DOC>'):
        is_doc = True
        continue

      if line.startswith('</DOC>'):
        is_doc = False
        docs_count += 1
        continue

      if is_doc is False:
        continue

      if line.startswith(sos):
        yield (sos, sos)
      elif line.startswith(eos):
        yield (eos, eos)
      else:
        splitted_line = line.split('\t')
        if len(splitted_line) == 2:
          word = splitted_line[0]
          tag = list(filter(lambda x: re.match(r'^[A-Z]', x), splitted_line[1].split()))[0]
          yield (word, tag)

In [3]:
def get_transition_counter(tags):
  current_it, next_it = itertools.tee(tags)
  next(next_it)

  counter = collections.Counter(list(zip(current_it, next_it)))

  return counter

In [4]:
def train(dataset):
  word_tag_counter = collections.Counter()
  tag_counter = collections.Counter()
  transition_counter = collections.Counter()
  sentence_tags = []

  for word, tag in dataset:
    tag_counter.update({ tag: 1 })
    word_tag_counter.update({ (word, tag): 1 })
    
    sentence_tags.append(tag)
    if tag == eos:
      transition_counter += get_transition_counter(sentence_tags)
      sentence_tags = []

  return {
    'word_tag_counter': word_tag_counter,
    'tag_counter': tag_counter,
    'transition_counter': transition_counter
  }

In [5]:
def emission_probability(model, word_tag):
  words_count = len(set([word for (word, _) in model['word_tag_counter'].keys()]))
  tag_count = model['tag_counter'][word_tag[1]]

  return (model['word_tag_counter'][word_tag] + 1) / (tag_count + words_count)

def transition_probability(model, tag_transition):
  words_count = len(set([word for (word, _) in model['word_tag_counter'].keys()]))
  tag_count = model['tag_counter'][tag_transition[0]]
  return (model['transition_counter'][tag_transition] + 1) / (tag_count + words_count)

def get_initial_probability(model):
  initial_prob = { sos: 1 }
  for tag in model['tag_counter'].keys():
    if tag == sos:
      continue
    
    initial_prob.update({ tag: 0 })

  return initial_prob

In [6]:
def viterbi(model, word_tags):
  initial_probability = get_initial_probability(model)
  tags = list(model['tag_counter'].keys())
  probability_matrix = np.zeros((len(tags), len(word_tags)))
  tag_matrix = np.full((len(tags), len(word_tags)), '', dtype=object)

  for index, tag in enumerate(tags):
    probability_matrix[index, 0] = initial_probability[tag] * emission_probability(model, word_tags[0])

  for idx_word_tag in range(1, len(word_tags)):
    for idx_next_tag, next_tag in enumerate(tags):
      for idx_current_tag, current_tag in enumerate(tags):
        current_prob = probability_matrix[idx_current_tag, idx_word_tag - 1] * transition_probability(model, (current_tag, next_tag)) * emission_probability(model, (word_tags[idx_word_tag][0], next_tag))
        if probability_matrix[idx_next_tag, idx_word_tag] < current_prob:
          probability_matrix[idx_next_tag, idx_word_tag] = current_prob
          tag_matrix[idx_next_tag, idx_word_tag] = idx_current_tag

  predict_tag_index = int(np.argmax(probability_matrix[:, -1]))
  predict_tags_indexes = [predict_tag_index]
  for idx in range(len(word_tags) - 1, 0, -1):
    predict_tag_index = int(tag_matrix[predict_tag_index, idx])
    predict_tags_indexes.append(predict_tag_index)

  predict_tags = [tags[i] for i in reversed(predict_tags_indexes)]

  return predict_tags

In [7]:
model = train(read_docs('1995', max_docs_count=100))

word_tag_acc = []
sentences = []
for word_tag in read_docs('1994', max_docs_count=2):
  word_tag_acc.append(word_tag)
  if word_tag[0] == eos:
    sentences.append(word_tag_acc)
    word_tag_acc = []

total = success = 0
for s in sentences:
  target_tags = [tag for _, tag in s]
  predict_tags = viterbi(model, s)
  success += sum([1 if target == predict else 0 for target, predict in zip(target_tags, predict_tags)])
  total += len(target_tags)

print(f'acuracy is {success/total}')

acuracy is 0.7151335311572701
