# Скрытые марковские модели
Курс NLP, семинар 5.  
Осень 2015.

В данном задании вы реализуете:

- Obvious POS-tagging
- Contextless POS-tagging
- Viterbi POS-tagging

**Подпись**: *(введите сюда свои ФИО + отделение + курс)*

In [13]:
import nltk
from collections import Counter, defaultdict
from nltk.corpus import conll2000

## Введение

В данном семинаре мы будем решать задачу [POS (Part Of Speech)-tagging](https://en.wikipedia.org/wiki/Part-of-speech_tagging)'а, используя несколько подходов. 

## Очевидный POS-tagging

Для начала напишем простейший POS-теггер. У каждого слова определим какими тегами оно бывает отмечено в обучающем корпусе, а для предсказания выбираем наиболее частотный тег. Формула может быть записана следующим образом:
$$
    tag(w) = \arg \max_{i \in 1 .. |Tags| } P(tag_i \mid w).
$$
Достаточно крупный минус данного метода заключается в том, что если какое-то слово в обучающем корпусе мы не встретили, то тег на нем никаким образом уже поставить не сможем. 

### Реализуйте:
1. Класс **SimpleNormalizer**, осуществляющий обычную нормализацию частот Counter'ов, превращающих их в вероятностное распределение.
2. Класс **WordTagModel**, хранящий для каждого слова вероятности быть отмеченным тем или иным тегом.
3. Класс **ObviousPOSTagger**, сопоставляющий последовательности слов последовательность тегов.

In [100]:
class BaseNormalizer:
    def normalize(self, counter):
        ### YOUR CODE HERE
        # Normalize counter to get a probability distribution.
        freqs = [counter[x] for x in counter]
        S = sum(freqs)
        probs = {}
        for x in counter:
            probs[x] = counter[x]/S
        return probs
        ### END YOUR CODE

class WordTagModel:
    def __init__(self, tagged_sents, normalizer=BaseNormalizer()):
        self.normalizer = normalizer
        self.model = defaultdict(Counter)
        ### YOUR CODE HERE
        # For each word add Counter with tags corresponding to this word.
        # Don't forget to normalize your counters!
        tuples = []
        for x in tagged_sents:
            tuples+= x
        counted_wt_pairs = Counter(tuples)
        model = Counter()
        for pair in counted_wt_pairs:
            word = pair[0]
            model[word] = {} 
        
        for pair in counted_wt_pairs:
            word = pair[0]
            model[word][pair[1]] = counted_wt_pairs[pair]
        
        for x in model:
            model[x] = self.normalizer.normalize(model[x])
            
        self.model = model
            
        ### END YOUR CODE
    
    def __getitem__(self, word):
        return self.model[word]

    
class ObviousPOSTagger:
    def __init__(self, word_tag_storage):
        self.wts = word_tag_storage
        
    def tag(self, sent):
        tags = []
        ### YOUR CODE HERE
        # Get the sequence of tags corresponding to this sentence.
        # If your algorithm never saw the word then return 'NN'.
        for x in sent:
            
                if x in self.wts.model.keys():
                    key = self.wts[x].most_common(1)[0]
                    tags.append(key)
                else:
                    tags.append('NN')
            
        ### END YOUR CODE
        return list(zip(sent, tags))

In [101]:
ts = [ [('s','t')],[('s','l')],[('s','l')]]
wtm = WordTagModel(ts)
print(wtm.model)
l = {'a':3, 'b':2}
am = max(l)
print(am)
b = ObviousPOSTagger(wtm)
s = ['s']
print(b.tag(s))

Counter({'s': {'t': 0.3333333333333333, 'l': 0.6666666666666666}})
b


AttributeError: 'dict' object has no attribute 'most_common'

Загрузим корпус, обучим на нем нашу модель и создадим POS-теггер.

In [None]:
# Don't forget to download conll2000 corpus using nltk.download()
nltk.download('conll2000')
train_sents = conll2000.tagged_sents()[:8000]
test_sents = conll2000.tagged_sents()[8000:]

In [98]:
wtm = WordTagModel(train_sents)
obv_postagger = ObviousPOSTagger(wtm)

Проверим, все ли работает. Для этого Вам предоставляется простенький классик **SanityChecker**.

In [92]:
class SanityChecker:
    def __init__(self, test_phrases, test_answers):
        self.test_phrases = test_phrases
        self.test_answers = test_answers
    
    def check(self, postagger):
        for phrase, answer in zip(self.test_phrases, self.test_answers):
            your_answer = postagger.tag(phrase.split())
            if your_answer != answer:
                print('Real answer = {}.\nYour answer = {}.'.format(answer, your_answer))
                print('Correct your code!')
                return
        print('Sanity check passed.')

In [99]:
test_phrases = [
    'The students model the language .',
    'They present excellent results .',
    'The fitting is perfect .',
    'Students master the craft of computer programming .',
]
test_answers = [
    [('The', 'DT'), ('students', 'NNS'), ('model', 'NN'), ('the', 'DT'), ('language', 'NN'), ('.', '.')],
    [('They', 'PRP'), ('present', 'JJ'), ('excellent', 'NN'), ('results', 'NNS'), ('.', '.')],
    [('The', 'DT'), ('fitting', 'JJ'), ('is', 'VBZ'), ('perfect', 'JJ'), ('.', '.')],
    [('Students', 'NN'), ('master', 'NN'), ('the', 'DT'), ('craft', 'NN'), ('of', 'IN'), ('computer', 'NN'), ('programming', 'NN'), ('.', '.')],
]

sanity_cheker = SanityChecker(test_phrases, test_answers)
sanity_cheker.check(obv_postagger)

ValueError: too many values to unpack (expected 2)

При POS-теггинге естественно измерять точность посредством **accuracy**. Напишите функцию, ее реализующую.

In [None]:
def accuracy(test_sents, postagger):
    ### YOUR CODE HERE

    ### END YOUR CODE

Барабанная дробь! С какой же точностью в итоге рабоает наш пос-теггер?

In [None]:
print('Accuracy of ObviousPOSTagger:', accuracy(test_sents, obv_postagger))
print('Accuracy of ObviousPOSTagger on last 200 sents:', accuracy(test_sents[-200:], obv_postagger))

___
<font color='red'>Ответьте на следующие вопросы (внутри ipython ноутбука):</font>

Каким образом можно было бы улучшить **ObviousPOSTagger**? 
___

## Бесконтекстный POS-tagging

Вторым разминочным этапом будет написание бесконтекстного pos-теггера. Мы стараемся максимизировать вероятность слова, считая, что на это влияет лишь его тег. Принцип его работы можно записать следующей формулой:
$$
    tag(w) = \arg \max_{i \in 1 .. |Tags| } P(w \mid tag_i)P(tag_i),
$$

Минуса с тем, что незнакомому слову мы не сможем поставить тег, уже нет, ибо мы можем некоторым образом сгладить $P(word \mid tag)$ и избавиться от нулевых вероятностей.  

### Реализуйте:
1. Класс **CustomNormalizer**, осуществляющий кастомную нормализацию Counter'ов (придумайте что-нибудь на Ваш вкус).
2. Класс **EmissionModel**, хранящий для каждого тега вероятности быть присвоенным тому или иному слову.
3. Класс **TransitionModel**, отвечающий за вероятность $P(tag_i)$.
4. Класс **ObviousPOSTagger**, сопоставляющий последовательности слов последовательность тегов.

In [None]:
class CustomNormalizer:
    ### YOUR CODE HERE
    def __init__(self):
        # Maybe you will add some new parameters to normalizer.
        
    ### END YOUR CODE
    
    def normalize(self, counter):
        ### YOUR CODE HERE
        # You can make some custom normaliztion, but add all additional
        # parameters in the initialization section.
        # In fact it must not be even a probability distribution :).

        ### END YOUR CODE

        
class EmissionModel:
    def __init__(self, tagged_sents, normalizer=BaseNormalizer()):
        self.normalizer = normalizer
        self.model = defaultdict(Counter)
        ### YOUR CODE HERE
        # For each tag add Counter with words corresponding to this tag.
        # Don't forget to add UNK token and to normalize!
        # This model should be able to give probabilities P(word | tag).

        ### END YOUR CODE
        
    def add_unk_token(self):
        for tag in self.model:
            self.model[tag]['UNK'] = 0.1
        
    def tags(self):
        return self.model.keys()
    
    def __getitem__(self, tag):
        return self.model[tag]
    
    def __call__(self, word, tag):
        if word not in self[tag]:
            return self[tag]['UNK']
        return self[tag][word]

    
class TransitionModel:
    def __init__(self, tag_seqs, normalizer=BaseNormalizer()):
        self.normalizer = normalizer
        self.model = Counter()
        ### YOUR CODE HERE
        # Create counter with tag counts.
        # Don't forget to normalize!
        # This model should be able to give probabilities P(tag).

        ### END YOUR CODE
        
    def tags(self):
        return self.model.keys()
    
    def __getitem__(self, tag):
        return self.model[tag]
    
    def __call__(self, tag):
        return self.model[tag]

    
class ContextlessPOSTagger:
    def __init__(self, emission_model, transition_model):
        self.em = emission_model
        self.tm = transition_model
        
    def tag(self, sent):
        tags = []
        ### YOUR CODE HERE
        # Get the sequence of tags corresponding to this sentence.

        ### END YOUR CODE
        return list(zip(sent, tags))

Скачаем корпус и обучим на нем **EmissionModel** и **TransitionModel** с **BaseNormalizer**.

In [None]:
em = EmissionModel(train_sents)
tm = TransitionModel([[tag for word, tag in sent] for sent in train_sents])
cl_postagger = ContextlessPOSTagger(em, tm)

Проверочка, все ли работает.

In [None]:
test_phrases = [
    'The students model the language .',
    'They present excellent results .',
    'The fitting is perfect .',
    'Students master the craft of computer programming .',
]
test_answers = [
    [('The', 'DT'), ('students', 'NNS'), ('model', 'NN'), ('the', 'DT'), ('language', 'NN'), ('.', '.')],
    [('They', 'PRP'), ('present', 'JJ'), ('excellent', 'NN'), ('results', 'NNS'), ('.', '.')],
    [('The', 'DT'), ('fitting', 'JJ'), ('is', 'VBZ'), ('perfect', 'JJ'), ('.', '.')],
    [('Students', 'NN'), ('master', 'NN'), ('the', 'DT'), ('craft', 'NN'), ('of', 'IN'), ('computer', 'NN'), ('programming', 'NN'), ('.', '.')],
]

sanity_cheker = SanityChecker(test_phrases, test_answers)
sanity_cheker.check(cl_postagger)

Вроде как Ваш POS-теггер работает корректно, теперь можно опробовать его с кастомной нормализацией.

In [None]:
### YOUR CODE HERE
# Change BaseNormalizer by your CustomNormalizer.
em = EmissionModel(train_sents, BaseNormalizer())
tm = TransitionModel([[tag for word, tag in sent] for sent in train_sents], BaseNormalizer())
cl_postagger = ContextlessPOSTagger(em, tm)
### END YOUR CODE

Посчитаем точность нашего бесконтекстного пос-теггера.  
В идеале она может достичь порога **92**.

In [None]:
print('Accuracy of ContextlessPOSTagger:', accuracy(test_sents, cl_postagger))
print('Accuracy of ContextlessPOSTagger on last 200 sents:', accuracy(test_sents[-200:], cl_postagger))

## POS-tagging на основе алгоритма Витерби

Ну вот, мы и добрались! 

Картинка для ~~привлечения внимания~~ понимания скрытых марковских моделей ([HMM](https://en.wikipedia.org/wiki/Hidden_Markov_model)).
![](http://i.imgur.com/BIRG7PM.png?)

Тег будет отвечать за скрытую переменную (на рисунке $x(t)$), а слово из предложения за наблюдаемую переменную (на рисунке $y(t)$). В итоге нам понадобятся две модели:

* Первая будет отвечать за вероятности $P(word \mid tag)$.
* Вторая же будет отвечать за вероятности $P(tag_n \mid tag_1, \dots, tag_{n-1})$.

### Задание
Прежде чем приступать к написанию POS-теггера на основе алгоритма Витерби, нужно написать **TransitionModel**, которая умеет выдавать вероятности $P(tag_2 | tag_1)$. Договоримся, что сглаживать ее будем посредством StupidBackoff.

In [None]:
class TransitionModel:
    def __init__(self, tag_seqs, normalizer=BaseNormalizer(), multiplier=0.05):
        self.normalizer = normalizer
        self.mult = multiplier
        # Some strange realization but nevertheless.
        # In 1 you should store tag probabilities P(tag).
        # In 2 you should store conditional tag probabilities P(tag_2 | tag_1)
        # for each tag_1.
        self.model = {1: Counter(), 2: defaultdict(Counter)}
        ### YOUR CODE HERE
        # Don't forget normalization.

        ### END YOUR CODE
        
    def tags(self):
        return self.model.keys()
    
    def __call__(self, tag, old_tag=None):
        # Stupid backoff smoothing is sitting here.
        if old_tag == None:
            return self.model[1][tag]
        if tag not in self.model[2][old_tag]:
            return self.model[1][tag] * self.mult
        return self.model[2][old_tag][tag]

Цель алгоритма Витерби является определение наиболее вероятной последовательности скрытых переменных $x_1, \dots, x_T$. В нашем распоряжении есть:

- пространство состояний $S$
- начальные вероятности $\pi_i$ нахождения в состоянии $i$
- вероятностями $a_{i,j}$ перехода из состояния $i$ в состояние $j$
- вероятности $P(y_i \mid k), k \in S$
- наблюдаемые $y_1, \dots, y_T$

Несложно поверить, что наиболее вероятная последовательность состояний $x_1, \dots, x_T$ задается рекуррентными соотношениями:

- $V_{1,k} = P(y_1 \mid k) \cdot \pi_k$
- $V_{t,k} = P(y_t \mid k) \cdot \max_{x \in S} \{a_{x,k} \cdot V_{t-1,x}\}$

$V_{t, k}$ — наибольшая вероятность последовательности состояний длины $t$, заканчивающееся состоянием $k$. В это несложно поверить по одной простой причине, что $x_t$ зависит только от $x_{t-1}$, поэтому нам нужно помнить лишь наибольшие вероятности попадания в каждое из состояний на предыдущем шаге. На этом же основании можно найти не только вероятности, но и самый вероятный путь.

In [None]:
class HMMPOSTagger:
    def __init__(self, transition_model, emission_model):
        self.tm = transition_model
        self.em = emission_model
        
    def tag(self, sent):
        # Viterbi algorithm should be realized here.
        ### YOUR CODE HERE

        ### END YOUR CODE

Обучаем **EmissionModel** и **TransitionModel** с **BaseNormalizer**.

In [None]:
em = EmissionModel(train_sents)
tm = TransitionModel([[tag for word, tag in sent] for sent in train_sents])
hmm_postagger = HMMPOSTagger(tm, em)

Проверочка, все ли работает.

In [None]:
test_phrases = [
    'The students model the language .',
    'They present excellent results .',
    'The fitting is perfect .',
    'Students master the craft of computer programming .',
]
test_answers = [
    [('The', 'SYM'), ('students', 'FW'), ('model', 'FW'), ('the', 'DT'), ('language', 'NN'), ('.', '.')],
    [('They', 'SYM'), ('present', 'FW'), ('excellent', 'FW'), ('results', 'FW'), ('.', '.')],
    [('The', 'SYM'), ('fitting', 'FW'), ('is', 'FW'), ('perfect', 'FW'), ('.', '.')],
    [('Students', 'SYM'), ('master', 'FW'), ('the', 'DT'), ('craft', 'NN'), ('of', 'IN'), ('computer', 'NN'), ('programming', 'NN'), ('.', '.')],
]

sanity_cheker = SanityChecker(test_phrases, test_answers)
sanity_cheker.check(hmm_postagger)

Вроде как Ваш POS-теггер работает корректно, теперь можно опробовать его с кастомной нормализацией.

In [None]:
### YOUR CODE HERE
em = EmissionModel(train_sents, BaseNormalizer())
tm = TransitionModel([[tag for word, tag in sent] for sent in train_sents], BaseNormalizer())
hmm_postagger = HMMPOSTagger(tm, em)
### END YOUR CODE

Посчитаем точность нашего бесконтекстного пос-теггера.  
В идеале она может достичь порога **93.5**.

In [None]:
print('Accuracy of HMMPOSTagger:', accuracy(test_sents[-200:], hmm_postagger))

### Развернутый HMM-POStagger
Суть крайне проста, давайте будем считать, что не предыдущий тег влияет на следующий за ним, а наоборот. Посмотрим, что при этом у нас выйдет. Реализуйте эту идею в **RevHMMPOSTagger** просто переписав лишь функцию **tag**.

**Примечание**. Для того, чтобы вызвать какую-то функцию у класса от которого Вы наследовались используйте следующую конструкцию:
```python
class Father:
    def hit(self):
        print('Take this!')

class Son(Father):
    def hit(self):
        print('Call father.')
        super(Son, self).hit()
```

In [None]:
class RevHMMPOSTagger(HMMPOSTagger):
    def tag(self, sent):
        ### YOUR CODE HERE
        # Just reverse the sentence and then reverse the output.

        ### END YOUR CODE

Тренируем модель с кастомными нормализаторами.

In [None]:
### YOUR CODE HERE
# Don't forget to reverse train.
em = EmissionModel([sent[::-1] for sent in train_sents], BaseNormalizer())
tm = TransitionModel([[tag for word, tag in sent][::-1] for sent in train_sents], BaseNormalizer())
rev_hmm_postagger = RevHMMPOSTagger(tm, em)
### END YOUR CODE

Возможно, получится что-то лучше?  
В идеале можно получить что-то порядка **94.5**.

In [None]:
print('Accuracy of HMMPOSTagger:', accuracy(test_sents[-200:], rev_hmm_postagger))

### Композиция POS-теггеров
Напишем совершенно простенькую композицию POS-теггеров и поймем, получим ли мы от этого какую-то пользу.

In [None]:
class POSTaggersComposition:
    def __init__(self, pos_taggers):
        self.pos_taggers = pos_taggers
    
    def vote(self, objects):
        return Counter(objects).most_common(1)[0][0]
    
    def tag(self, sent):
        tags = [[tag for word, tag in postagger.tag(sent)] for postagger in self.pos_taggers]
        tags = [self.vote(cur_tags) for cur_tags in zip(*tags)]
        return list(zip(sent, tags))

Тренируем...

In [None]:
composition = POSTaggersComposition([hmm_postagger, rev_hmm_postagger, cl_postagger])

Барабанная дробь!

In [None]:
print('Accuracy of HMMPOSTagger:', accuracy(test_sents[-200:], composition))

## NLTK Taggers

В NLTK тоже есть pos-теггеры! Так как использовать их достаточно просто, сразу привожу кусочек кода.

In [None]:
unigram_tagger = nltk.UnigramTagger(train_sents)
bigram_tagger = nltk.BigramTagger(train_sents)
combined_bigram_tagger = nltk.BigramTagger(train_sents, backoff=unigram_tagger)

In [None]:
print('Accuracy of UnigramTagger:', accuracy(test_sents[-200:], unigram_tagger))
print('Accuracy of BigramTagger:', accuracy(test_sents[-200:], bigram_tagger))
print('Accuracy of Unigram+BigramTagger:', accuracy(test_sents[-200:], combined_bigram_tagger))

Как видим, pos-теггеры семинаристов отработали лучше, чем NLTK-шные.  
**Вывод:** не все хорошо, что библиотечное!