# Лабораторная работа #5: Part-of-Speech tagging

В данной лабораторной работе вам предстоит построить модель, решающую задачу классификации слов в предложении на части речи. В основе этой модели должна лежать скрытая марковская модель (HMM). О том, что это такое и как с этим работать рассказывалось на лекции.

Каждая из возможных частей речи соовтетствует некоторому скрытому HMM-модели, слова играют роль наблюдений, а матрица вероятностей переходов определяется подсчётом би-грамм и уни-грамм в тренировочном датасете. Допустим мы имеем некоторую последовательность наблюдений (слов) $W = {w_1..w_N}$ и некоторую настроенную HMM-модель. Тогда, применив ним алгоритм Витерби $\href{https://neerc.ifmo.ru/wiki/index.php?title=Алгоритм_Витерби}{алгоритм Витерби}$, мы можем получить наиболее вероятную последовательность скрытых состояний $Q = {q_1..q_N}$, каждое из которых соответствует некоторой части речи. Таким образом, для каждого слова мы получим его часть речи.

In [291]:
import matplotlib.pyplot as plt
import numpy as np
from IPython.core.display import HTML
from itertools import chain
from collections import Counter, defaultdict
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution
import random
import string
from collections import namedtuple, OrderedDict

### Чтение данных и feature extraction

В качестве датасета используем упрощённую версию $\href{https://www.kaggle.com/nltkdata/brown-corpus}{Brown Corpus}$. Этот датасет содержит в себе ~57k последовательностей слов (предложений) на английском языке. Для каждого слово в каждой последовательности указана его часть речи. Суммарно, датасет включает в себя слова 11 различных частей речи. 

Как уже было отмечено, вероятностная матрица переходов определяется через подсчёт случаев (т.е. вероятность $P({w_i}|{q_j}) = \dfrac{Count(q_j, w_i)}{Count(q_j)}$). Если модель встречает слово $w$, которого не было в тренировочном наборе данных, то $P(w|q)$ будет равна нулю для любого скрытого состояния q. Соответственно, вероятность любой последовательности слов, содежащей w, будет равна нулю, и модель не сможет оценить такую последовательность. Аналогичная ситуация будет в случае со словами с опечатками, с очень редкими словами и прочими.

Мы хотим, чтобы наша модель работала на любых тестовых данных, в том числе содержащих неизвестные модели слова. Для этого давайте вычленять из слов некоторые признаки (features, фичи) и использовать их вектора для расчёта вероятностей вместо слов. Например, возможный набор фичей может выглядеть так:
(длина слова если она не более $5$ иначе $5$, является ли слово первым в предложении, содержит ли слово цифры, начинается ли предыдущее слово с большой буквы). Для предложенного набора фич, последовательность слов ['Kill', 'me', 'plea5e'] закодируется как $[[4, 1, 0], [2, 0, 0, 1], [6, 0, 1, 0]]$. Несложно заметить, что область возможных значений векторов-признаков равна 5 * 2 * 2 * 2 = 40. Хорошей идеей будет попытаться выбрать вектора так, чтобы слова одной части речи с бОльшей вероятностью кодировались одинаковыми векторами-признаками.

Чем меньше фич, чем меньше область возможных значений вектора признаков, тем больше слов кодируются одинаковыми векторами, тем хуже предсказания модели. Однако чем больше область воможных значений векторов-признаков, тем больше вероятность встретить в тестовом наборе данных слово, кодирование которого не встретилось в тренировочном датасете.

$\bf{Задание.}$ Придумайте какие-нибудь (потенциально полезные) признаки, и добавьте их в функцию $\textit{extract_features}$, принимающую последовательность слов в качестве tuple. и возвращающую последовательность векторов-признаков. Область возможных значений векторов-фичей должна быть достаточно широкой, чтобы модель чему-то обучилась, но при этом достаточно маленькой, чтобы ни одно слово из тестового набора не закодировалось невстреченным доселе  вектором-признаком.

In [292]:
def extract_features(tupled_words):
    
    feat = []
    n = len(tupled_words)
    for i in range(n):
        
        #очередное слово
        word = tupled_words[i]
        
        #вектор признаков для текущего слова
        res = []
        
        #длина слова, если она меньше 5, иначе 5
        res.append(5 if len(word) >= 5 else len(word))

        #является ли первый символ заглавной буквой
        res.append(int(word[0] <= 'Z' and word[0] >= 'A'))

        # has ing
        res.append(int(word[-3:] == 'ing'))

        # has ed
        res.append(int(word[-2:] == 'ed'))

        # has -
        res.append(int('-' in word))

        # has –
        res.append(int('–' in word))

        # num of vowels
        res.append(len([letter for letter in word.upper() if letter in 'AIEOUY']))

        # num of consonants
        res.append(len([letter for letter in word.upper() if letter in 'BCDFGHJKLMNPQRSTVWXZ']))

        # ratio of letters dictionary to number of letters
        res.append( len(set(word)) // len(word) )

        if (i > 0):
            # начинается ли предыдущее слово с заглавной буквы
            res.append(int(tupled_words[i - 1][0].isupper()))

            # previous is article
            res.append(int(tupled_words[i - 1].lower() == 'a'))
            res.append(int(tupled_words[i - 1].lower() == 'an'))
            res.append(int(tupled_words[i - 1].lower() == 'the'))
        else:
            res.append(1) #first one

        # mean of previous 2 letters of the word
        res.append(np.mean([ord(letter) for letter in word[-2:]])) if (len(word) > 1) else res.append(0)
                    
        feat.append(res)
    return feat


def hash_feature_list(coded_word):
    """вспомогательная функция, позволяющая преобразовать список 
    в хешируемый вид (строку)"""
    return ' '.join(map(lambda x: str(x), coded_word))

example = tuple(['Kill', 'me', 'plea5e'])
print(example)
print(extract_features(example))
print(hash_feature_list(extract_features(example)))

('Kill', 'me', 'plea5e')
[[4, 1, 0, 0, 0, 0, 1, 3, 0, 1, 108.0], [2, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 105.0], [5, 0, 0, 0, 0, 0, 3, 2, 0, 0, 0, 0, 0, 77.0]]
[4, 1, 0, 0, 0, 0, 1, 3, 0, 1, 108.0] [2, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 105.0] [5, 0, 0, 0, 0, 0, 3, 2, 0, 0, 0, 0, 0, 77.0]


Класс Dataset имплементирует чтение последовательностей слов (и их тегов) из файла, разделение на тренировочную и тестовую выборки и подсчёт некоторой статистики.

In [293]:
Sentence = namedtuple("Sentence", "words tags")

"""Функции для парсинга предложений и тегов"""
def read_data(filename):
    with open(filename, 'r') as f:
        sentence_lines = [l.split("\n") for l in f.read().split("\n\n")]
    return OrderedDict(((s[0], Sentence(*zip(*[l.strip().split("\t")
                        for l in s[1:]]))) for s in sentence_lines if s[0]))
def read_tags(filename):
    with open(filename, 'r') as f:
        tags = f.read().split("\n")
    return frozenset(tags)

class Subset(namedtuple("BaseSet", "sentences keys vocab c_vocab X tagset Y N stream")):
    def __new__(cls, sentences, keys):
        word_sequences = tuple([extract_features(sentences[k].words) for k in keys])
        uncoded_word_sequences = tuple([tuple([word for word in sentences[k].words]) for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        wordset = frozenset(chain(*uncoded_word_sequences))
        codeset = frozenset([hash_feature_list(x) for x in chain(*word_sequences)])
        tagset = frozenset(chain(*tag_sequences))
        N = sum(1 for _ in chain(*(sentences[k].words for k in keys)))
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        return super().__new__(cls, {k: sentences[k] for k in keys}, keys, wordset, codeset, word_sequences,
                               tagset, tag_sequences, N, stream.__iter__)
def __len__(self):
        return len(self.sentences)
def __iter__(self):
        return iter(self.sentences.items())

In [294]:
class Dataset(namedtuple("_Dataset", "sentences keys vocab X tagset Y training_set testing_set N stream ustream")):
    def __new__(cls, tagfile, datafile, train_test_split=0.8, seed=239):
        tagset = read_tags(tagfile)
        sentences = read_data(datafile)
                
        keys = tuple(sentences.keys())
        wordset = frozenset(chain(*[s.words for s in sentences.values()]))
                
        word_sequences = tuple([extract_features(sentences[k].words) for k in keys])
        uncoded_word_sequences = tuple([sentences[k].words for k in keys])
        tag_sequences = tuple([sentences[k].tags for k in keys])
        N = sum(1 for _ in chain(*(s.words for s in sentences.values())))
        codeset = frozenset([hash_feature_list(x) for x in chain(*word_sequences)])
        
        # делим на трейн/тест
        _keys = list(keys)
        if seed is not None: random.seed(seed)
        random.shuffle(_keys)
        split = int(train_test_split * len(_keys))
        training_data = Subset(sentences, _keys[:split])
        testing_data = Subset(sentences, _keys[split:])
        stream = tuple(zip(chain(*word_sequences), chain(*tag_sequences)))
        ustream = tuple(zip(chain(*uncoded_word_sequences), chain(*tag_sequences)))
        
        return super().__new__(cls, dict(sentences), keys, wordset, word_sequences, tagset,
                               tag_sequences, training_data, testing_data, N, stream.__iter__, ustream.__iter__)
def __len__(self):
        return len(self.sentences)
def __iter__(self):
        return iter(self.sentences.items())
data = Dataset("tags-universal.txt", "brown-universal.txt", train_test_split=0.8)
print("Всего {} предложений.".format(len(data.sentences)))
print("Из них {} -- в тренировочном наборе.".format(len(data.training_set.sentences)))
print("А остальные {} -- в тестовом.".format(len(data.testing_set.sentences)))
print("В тренировочном наборе данных встречается {} уникальных слов, они кодируются в {} различных векторов-признаков."
      .format(len(data.training_set.vocab), len(data.training_set.c_vocab)))
print("В тестовом наборе данных {} уникальных слов, каждое из которых соответствует одному из {} различных векторов."
      .format(len(data.testing_set.vocab), len(data.testing_set.c_vocab)))
print("В тестировочном датасете встречается {} уникальных слов, которые не встречались в тренировочном наборе, а также {} векторов-признаков, не встречающихся в трейне."
      .format(len(data.testing_set.vocab - data.training_set.vocab), len(data.testing_set.c_vocab - data.training_set.c_vocab)))

Всего 57340 предложений.
Из них 45872 -- в тренировочном наборе.
А остальные 11468 -- в тестовом.
В тренировочном наборе данных встречается 50656 уникальных слов, они кодируются в 18650 различных векторов-признаков.
В тестовом наборе данных 25002 уникальных слов, каждое из которых соответствует одному из 10995 различных векторов.
В тестировочном датасете встречается 5401 уникальных слов, которые не встречались в тренировочном наборе, а также 1462 векторов-признаков, не встречающихся в трейне.


In [295]:
print("\nStream (feature_vector, tag) pairs:\n")
i = 5
for word, pair in zip(data.ustream(), data.stream()):
    print("\t", word[0], pair)
    i = i - 1
    if i == 0: break


Stream (feature_vector, tag) pairs:

	 Mr. ([3, 1, 0, 0, 0, 0, 0, 2, 1, 1, 80.0], 'NOUN')
	 Podger ([5, 1, 0, 0, 0, 0, 2, 4, 1, 1, 0, 0, 0, 107.5], 'NOUN')
	 had ([3, 0, 0, 0, 0, 0, 1, 2, 1, 1, 0, 0, 0, 98.5], 'VERB')
	 thanked ([5, 0, 0, 1, 0, 0, 2, 5, 1, 0, 0, 0, 0, 100.5], 'VERB')
	 him ([3, 0, 0, 0, 0, 0, 1, 2, 1, 0, 0, 0, 0, 107.0], 'PRON')


## Подсчёт вероятностей и построение модели

Посчитаем количество встречающихся би- и уни-грамм тэгов (надеюсь вы не забыли что такое n-граммы, с тех пор как сдавали ДЗ #2). Это необходимо, чтобы определить матрицу переходов между скрытыми состояниями.

In [296]:
def unigram_counts(sequences):
    return Counter(sequences)

tags = [tag for i, (word, tag) in enumerate(data.training_set.stream())]
tag_unigrams = unigram_counts(tags)

def bigram_counts(sequences):
    d = Counter(sequences)
    return d

tags = [tag for i, (word, tag) in enumerate(data.training_set.stream())]
o = [(tags[i],tags[i+1]) for i in range(0,len(tags)-2,2)]
tag_bigrams = bigram_counts(o)

Для вычисления условных вероятностей, а также стартового и конечного распределения состояний, нам необходимо посчитать для каждого лейбла (скрытого состояния): (А) количество векторов-признаков, помеченных этим лейблом (Б) число раз, когда предложение начиналось с этого лейбла (В) число раз, когда предложение заканчивалось этим лейблом

$\mathbf{Задание.}$ По аналогии допишите сниппет: запишите в переменную tag_ends словарь, для каждого лейбла указывающий, сколько позиций. $\textit{Хинт.}$ Можете воспользоваться специальным словарём Counter.

In [297]:
#функция для подсчёта (А)
def pair_counts(tags, words):
    d = defaultdict(lambda: defaultdict(int))
    for tag, word in zip(tags, words):
        d[tag][hash_feature_list(word)] += 1
    return d

#считаем (Б)
def starting_counts(sequences):
    d = Counter(sequences)
    return d

tags = [tag for i, (word, tag) in enumerate(data.stream())]
starts_tag = [i[0] for i in data.Y]
tag_starts = starting_counts(starts_tag)

#считаем (В)
def ending_counts(sequences):
    d = Counter(sequences)
    return d

end_tag = [i[len(i) - 1] for i in data.Y]
tag_ends = ending_counts(end_tag)

tags = [tag for i, (word, tag) in enumerate(data.training_set.stream())]
words = [word for i, (word, tag) in enumerate(data.training_set.stream())]

С помощью библиотеки $\href{https://pomegranate.readthedocs.io/en/latest/HiddenMarkovModel.html}{pomegranate}$ создадим и инициируем правильными весами скрытую марковскую модель. 

$\mathbf{Задание.}$ Посмотрите, как вычисляется вероятность результата (emission probabilities), матрица переходов (transition probabilities) и вектор начального распределения, и по аналогии вычислите конечное вероятностное распределение (т.е. вектор, в каждой компоненте которого содержится вероятность того, что предложение кончается соответствующим тегом).

In [298]:
EPS = 1e-11

#инициализация 
basic_model = HiddenMarkovModel(name="base-hmm-tagger")

tags = [tag for i, (word, tag) in enumerate(data.stream())]
words = [word for i, (word, tag) in enumerate(data.stream())]

#посчитаем (А)
tags_count=unigram_counts(tags)
tag_words_count=pair_counts(tags,words)

starting_tag_list=[i[0] for i in data.Y]
ending_tag_list=[i[-1] for i in data.Y]

#количество ралзличных тегов, появлявшихся в начале и конце предложения
starting_tag_count=starting_counts(starting_tag_list)
ending_tag_count=ending_counts(ending_tag_list)      

#список всех хотя бы раз задействованных состояний
to_pass_states = []

# вычислим вероятности результата:
# переберём список тегов вместе со списком
# фича-векторов, помеченных соответствующим лейблом
for tag, words_dict in tag_words_count.items():
    
    #Сумма вероятностей наблюдений по всем
    #векторам-состояниям, имевшим метку tag 
    total = float(sum(words_dict.values()))
    
    #для каждого вектора-состояния вычислим его условную вероятность
    #при текущем состоянии как его долю в сумме вероятностей по всем векторам
    distribution = {word: count/total for word, count in words_dict.items()}
    tag_emissions = DiscreteDistribution(distribution)
    tag_state = State(tag_emissions, name=tag)
    to_pass_states.append(tag_state)

basic_model.add_states()

#вычислим начальное распределение вероятностей
start_prob={}

# Для каждого тега посчитаем его компонент в начальном распределении
# как долю случаев, когда текущий тег был первым в предложении относительно
# всех появлений данного тега в датасете
for tag in tags:
    start_prob[tag]=starting_tag_count[tag]/tags_count[tag]

#добавляем соответсвующие ребро в модель
for tag_state in to_pass_states :
    basic_model.add_transition(basic_model.start,tag_state,start_prob[tag_state.name])

#аналогично вычислим итоговое распределение
end_prob={}
for tag in tags:
    end_prob[tag]=ending_tag_count[tag]/tags_count[tag]


#добавляем соответсвующие ребро в модель
for tag_state in to_pass_states :
    basic_model.add_transition(tag_state,basic_model.end,end_prob[tag_state.name])

# вычислим матрицу переходов между скрытыми состояниями:
# перебем все встреченные биграммы для каждой 
# пары лейблов key = (fr, to) посчитаем вероятность перехода 
# из fr в to, как частное количества би-грамм (fr, to) 
# и общего количества вхождений тега fr.
transition_prob_pair={}
for key in tag_bigrams.keys():
    transition_prob_pair[key]=tag_bigrams.get(key)/tags_count[key[0]]
for tag_state in to_pass_states :
    for next_tag_state in to_pass_states :
        try:
            prob_pair = transition_prob_pair[(tag_state.name,next_tag_state.name)]
        except KeyError:
            #если такой биграммы не встретилось -- возвращаем очень маленькое значение
            prob_pair = EPS
        basic_model.add_transition(tag_state,next_tag_state,prob_pair)

#"выпекаем" HMM
basic_model.bake()

In [299]:
# функция, вычисляющая наиболее вероятную последовательность
# скрытых состояний (частей речи) с помощью алгоритма Витерби
def simplify_decoding(X, model, extracted=False):
    if(not extracted):
        X = extract_features(X)
    _, state_path = model.viterbi([hash_feature_list(t) for t in X])
    return [state[1].name for state in state_path[1:-1]]

In [300]:
for key in data.testing_set.keys[:2]:
    print("Sentence Key: {}\n".format(key))
    print(str(data.sentences[key].words) + str(":\n-----------------"))
    print("Predicted labels:\n-----------------")
    print(simplify_decoding(data.sentences[key].words, basic_model))
    print()
    print("Actual labels:\n--------------")
    print(data.sentences[key].tags)
    print("\n")

Sentence Key: b100-56005

('But', ',', 'at', 'the', 'start', ',', 'his', 'new', 'life', 'felt', 'invigorating', '.'):
-----------------
Predicted labels:
-----------------
['CONJ', '.', 'ADP', 'DET', 'NOUN', '.', 'DET', 'ADJ', 'NOUN', 'VERB', 'VERB', '.']

Actual labels:
--------------
('CONJ', '.', 'ADP', 'DET', 'NOUN', '.', 'DET', 'ADJ', 'NOUN', 'VERB', 'VERB', '.')


Sentence Key: b100-15268

('Then', ',', 'too', ',', 'European', 'drivers', 'have', 'reputations', 'for', 'being', 'somewhat', 'crazy', 'on', 'the', 'road', 'and', 'some', 'Americans', 'are', 'not', 'particularly', 'keen', 'on', 'getting', 'mixed', 'up', 'with', 'them', '.'):
-----------------
Predicted labels:
-----------------
['ADV', '.', 'ADV', '.', 'ADJ', 'NOUN', 'VERB', 'NOUN', 'ADP', 'VERB', 'ADV', 'ADJ', 'ADP', 'DET', 'NOUN', 'CONJ', 'DET', 'NOUN', 'VERB', 'ADV', 'ADV', 'VERB', 'PRON', 'VERB', 'VERB', 'PRT', 'ADP', 'NOUN', '.']

Actual labels:
--------------
('ADV', '.', 'ADV', '.', 'ADJ', 'NOUN', 'VERB', 'NOUN',

## Предсказание частей речи и оценка результатов

In [301]:
def predict(X, model):
    """функция, предсказывающая лейбл для каждого слова каждого предложения из X"""
    answer = []
    for observations in X:
        most_likely_tags = simplify_decoding(observations, model, extracted=True)
        answer.append(tuple(most_likely_tags))
    return tuple(answer)

Теперь хотелось бы как-то оценить результаты работы нашей модели.

Самой простой и наивной score-функцией для задачи классификации является точность. Точность вычисляется как частное правильно классифицированных примеров и всех примеров. 

$\bf{Задание.} Если вы всё сделали грамотно -- ваша модель получит точность более 0.7. Если нет -- попробуйте добавить признаков в кодирование слов.

Однако такая метрика имеет ряд недостатков и зачастую является непоказательной. Пожалуй, наиболее популярной функцией оценки качества классифицирующей модели является $\href{https://en.wikipedia.org/wiki/F1_score}{F1-score}$ (F1-мера). F1-мера считается отдельно для каждого класса $k$, и является средним гармоническим между $\bf{precision}$ (доля правильно классифицированных примеров из $k$ среди всех примеров, классифицированных как k) и $\bf{recall}$ (доля правильно классифицированных примеров из $k$ среди всех примеров из k). 

$\bf{Задание.}$ Реализуйте функцию F1score. При желании можете воспользоваться готовым решением из библиотеки sklearn, но написание "ручками" всячески приветствуется.

In [302]:
from sklearn.metrics import f1_score
from sklearn.preprocessing import MultiLabelBinarizer

def accuracy(Y_pred, Y_true):
    correct, total = 0, 0
    for i in range(len(Y_pred)):
        correct = correct + sum([int(f == s) for f, s in zip(Y_pred[i], Y_true[i])])
        total += len(Y_pred[i])
    return float(correct / total)

def f1score(Y_pred, Y_true, macro = False):
    """macro:
        если False: вернуть список, содержащий значения f1-score для каждого лейбла
        если True: вернуть значение макро f1-score
    """
    #TODO: YOUR CODE
    if(macro):
        return 0
    else:
        return [0 for _ in data.tagset]

Y_pred = predict(data.testing_set.X, basic_model)

acc = accuracy(Y_pred, data.testing_set.Y)
print("Точность: {:.2f}%".format(100 * acc))

assert(acc < 0.7, "Вы получили точность менее 0.7 на тестовом датасете. Попробоуйте изменить список признаков.")

f1 = f1_score(y_true=MultiLabelBinarizer().fit_transform(data.testing_set.Y), y_pred=MultiLabelBinarizer().fit_transform(Y_pred), average=None)
print("F1-scores: " + str(f1))

mf1 = f1_score(y_true=MultiLabelBinarizer().fit_transform(data.testing_set.Y), y_pred=MultiLabelBinarizer().fit_transform(Y_pred), average='macro')
print("macro F1 score: {:.2f}%".format(100 * mf1))

Точность: 88.43%
F1-scores: [0.99973378 0.89485428 0.97490569 0.86641814 0.98863636 0.9755603
 0.9849016  0.80642805 0.92306396 0.87944757 0.9831758  0.61354582]
macro F1 score: 90.76%
