In [5]:
from __future__ import division
import nltk
import copy
import re
import matplotlib.pyplot as plt
import numpy as np
import sklearn_crfsuite
from sklearn_crfsuite import scorers
from sklearn_crfsuite import metrics
from nltk.corpus import brown
from nltk import bigrams, ngrams, trigrams 
from nltk.probability import FreqDist as FreqDist  
from numpy.polynomial.polynomial import polyfit
from collections import Counter
from string import digits
from scipy import stats
import operator
from collections import OrderedDict

In [6]:
from nltk.corpus import treebank,brown

corpus = brown.tagged_sents(tagset='universal')[:-100] 

tag_dict={}
word_dict={}

for sent in corpus:
    for elem in sent:
        w = elem[0]
        tag= elem[1]

        if w not in word_dict:
            word_dict[w]=0

        if tag not in tag_dict:
            tag_dict[tag]=0

        word_dict[w]+=1
        tag_dict[tag]+=1
unigram_prob=dict()
V=len(word_dict)
for word in word_dict.keys():
    unigram_prob[word]=word_dict[word]/V
test_data= brown.tagged_sents(tagset='universal')[-100:]


# Part I: POS Tagging with HMM

## Build the start, emission and transition probability matrices for the HMM

In [3]:
start={}
transition={}
emission={}
context={}
for line in corpus:
    previous='<s>'
    for word,tag in line:
        if(previous=='<s>'):                           # First tag of the sentence
            if tag not in context:
                context[tag]=0
            context[tag]+=1
            if tag not in start:
                start[tag]=0.0
            start[tag]+=1
            previous=tag
        else:                                          # Rest of the word-tag pairs
            if tag not in context:
                context[tag]=0
            context[tag]+=1
            if previous not in transition:
                transition[previous]={}
            if tag not in transition[previous]:
                transition[previous][tag]=0
            transition[previous][tag]+=1
            previous=tag
        if tag not in emission:
            emission[tag]={}
        if word not in emission[tag]:
            emission[tag][word]=0
        emission[tag][word]+=1
    if '</s>' not in transition[previous]:
        transition[previous]['</s>']=0
    transition[previous]['</s>']+=1
emission_raw=copy.deepcopy(emission)
for tag in transition:                                 # Normalizing the counts into probability scores
    total_count=float(sum(transition[tag].values())) 
    for next_tag in transition[tag]:
        transition[tag][next_tag]/=(total_count)
for tag in start:                                      # Normalizing the counts into probability scores
    start[tag]/=float(len(corpus))

for tag in emission:                                   # Normalizing the counts into probability scores
    total_count=float(sum(emission[tag].values())) 
    for word in emission[tag]:
        emission[tag][word]/=(total_count)        

## POS Taggging with HMM Model and Additive Smoothing

In [4]:
def smoothed_emission(tag, word,delta):                          # Emission probabilities with additive smoothing
    count_pair=emission_raw[tag].get(word,0)
    prob=(count_pair+delta)/(context[tag]+delta*V)
    return prob

def log(x):
    if (x==0 ):
        return -np.inf
    else:
        return np.log(x)
    
def pos_tagger(sentence,delta):
    viterbi_scores=dict()
    backpointer=dict()
    N=len(context.keys())                                         # Number of states except accept state '</s>'
    T=len(sentence)
    words=['dummy']
    words.extend(sentence)
    for s in context.keys():                                      # Initialization Step
        viterbi_scores[s,1]=log((start.get(s,0))*smoothed_emission(s,words[1],delta))
        backpointer[s,1]=0
    for t in range(2,T+1):                                        # Recursion Step
        for s in context.keys():
            max_prob=-np.inf
            for s1 in context.keys():
                prob=viterbi_scores[s1,t-1]
                prob+=log(transition[s1].get(s,0))
                prob+=log(smoothed_emission(s,words[t],delta))
                if(prob>=max_prob):
                    viterbi_scores[s,t]=prob
                    backpointer[s,t]=s1
                    max_prob=prob
    max_prob=-np.inf
    for s in context.keys():                                      # Termination Step                                 
        prob=(viterbi_scores[s,T])+log(transition[s].get('</s>',0))
        if prob>=max_prob:
            max_prob=prob
            viterbi_scores['</s>',T+1]=prob
            backpointer['</s>',T+1]=s      
    tag_sequence=list()
    tag='</s>'
    tag_sequence.append(tag)
    for t in range(T+1,0,-1):                                     # Trace the backpointers to get the tag sequence
        tag=backpointer[tag,t]
        tag_sequence.append(tag)
    tag_sequence.reverse()
    tag_sequence=tag_sequence[1:len(tag_sequence)-1]
    return tag_sequence

## Model States

In [5]:
context.keys()

dict_keys(['DET', 'NOUN', 'ADJ', 'VERB', 'ADP', '.', 'ADV', 'CONJ', 'PRT', 'PRON', 'NUM', 'X'])

## Measuring accuracy on the test set

In [6]:
def find_accuracy(predicted_tags, test_tags):
    num_sentences=len(predicted_tags)
    scores=list()
    total_count=0
    num_tags=0
    for i in range(num_sentences):
        count=0
        predicted=predicted_tags[i]
        test=test_tags[i]
        len_sentence=len(test)
        for j in range(len_sentence):
            if(predicted[j]==test[j]):
                count+=1
                total_count+=1
            #else:
                #print(predicted[j],test[j])
        #if(count/len_sentence<0.7):
            #print(test_sentences[i])
            #print(predicted)
            #print(test)
            #print("\n")
        scores.append(count/len_sentence)
        num_tags+=len_sentence
    print(total_count/num_tags)
    return np.mean(np.asarray(scores))

In [7]:
test_sentences=[[word[0] for word in sentence] for sentence in test_data]
test_tags=[[word[1] for word in sentence] for sentence in test_data]

In [28]:
predicted_tags=[pos_tagger(sentence,0.001) for sentence in test_sentences]

In [29]:
find_accuracy(predicted_tags,test_tags)

0.9227215455690886


0.9103542356640717

In [None]:
lis=['S.', 'J.', 'Perelman']
pos_tagger(lis)

# Part II: POS Tagging with CRF

In [76]:
train_sents= corpus

def baselineFeatures(sent,i):
    word = sent[i][0]
    postag = sent[i][1]

    # Common features for all words
    features = {
        'bias':1.0,
        #'word.lower': word.lower(),
        'postag=': postag
    }
    return features
def word2features(sent,i):
    #word = sent[i][0]
    
    #features ={
    #'bias': 1.0,
    #}
    word = sent[i][0]
    postag = sent[i][1]

    # Common features for all words
    features = {
        'bias':1.0,
        'word.lower': word.lower(),
        'word[-3:]': word[-3:],
        'word[-2:]=' : word[-2:],
        'word.isupper': word.isupper(),
        'word.istitle': word.istitle(),
        'word.isdigit': word.isdigit(),
        'postag=': postag
    }

    # Features for words that are not
    # at the beginning of a sentument
    if i > 0:
        word1 = sent[i-1][0]
        postag1 = sent[i-1][1]
        features.update({
            '-1:word.lower=' : word1.lower(),
            '-1:word.istitle' : word1.istitle(),
            '-1:word.isupper' : word1.isupper(),
            '-1:word.isdigit' : word1.isdigit(),
            '-1:postag': postag1
        })
    else:
        # Indicate that it is the 'beginning of a sentence'
        features.update({'BOS':True})

    # Features for words that are not
    # at the end of a sentument
    if i < len(sent)-1:
        word1 = sent[i+1][0]
        postag1 = sent[i+1][1]
        features.update({
            '+1:word.lower': word1.lower(),
            '+1:word.istitle' :word1.istitle(),
            '+1:word.isupper' : word1.isupper(),
            '+1:word.isdigit' : word1.isdigit(),
            '+1:postag':postag1
        })
    else:
        # Indicate that it is the 'end of a sentument'
        features.update({'EOS':True})

    return features
def sent2features(sent):
    return [baselineFeatures(sent,i) for i in range(len(sent))]

def sent2labels(sent):
    return [label for _,label in sent]



In [77]:
sent2features(corpus[0])

[{'bias': 1.0, 'postag=': 'DET'},
 {'bias': 1.0, 'postag=': 'NOUN'},
 {'bias': 1.0, 'postag=': 'NOUN'},
 {'bias': 1.0, 'postag=': 'ADJ'},
 {'bias': 1.0, 'postag=': 'NOUN'},
 {'bias': 1.0, 'postag=': 'VERB'},
 {'bias': 1.0, 'postag=': 'NOUN'},
 {'bias': 1.0, 'postag=': 'DET'},
 {'bias': 1.0, 'postag=': 'NOUN'},
 {'bias': 1.0, 'postag=': 'ADP'},
 {'bias': 1.0, 'postag=': 'NOUN'},
 {'bias': 1.0, 'postag=': 'ADJ'},
 {'bias': 1.0, 'postag=': 'NOUN'},
 {'bias': 1.0, 'postag=': 'NOUN'},
 {'bias': 1.0, 'postag=': 'VERB'},
 {'bias': 1.0, 'postag=': '.'},
 {'bias': 1.0, 'postag=': 'DET'},
 {'bias': 1.0, 'postag=': 'NOUN'},
 {'bias': 1.0, 'postag=': '.'},
 {'bias': 1.0, 'postag=': 'ADP'},
 {'bias': 1.0, 'postag=': 'DET'},
 {'bias': 1.0, 'postag=': 'NOUN'},
 {'bias': 1.0, 'postag=': 'VERB'},
 {'bias': 1.0, 'postag=': 'NOUN'},
 {'bias': 1.0, 'postag=': '.'}]

In [78]:
X_train=[sent2features(s) for s in train_sents]
y_train=[sent2labels(s) for s in train_sents]

X_test=[sent2features(s) for s in test_data]
y_test=[sent2labels(s) for s in test_data]


In [None]:

crf = sklearn_crfsuite.CRF(
    algorithm='lbfgs', 
    c1=0.1, 
    c2=0.1, 
    max_iterations=100, 
    all_possible_transitions=True
)
crf.fit(X_train, y_train)

In [None]:
y_pred = crf.predict(X_test)
labels=list(crf.classes_)

metrics.flat_f1_score(y_test, y_pred, 
                      average='weighted', labels=labels)

In [None]:
sorted_labels = sorted(
    labels, 
    key=lambda name: (name[1:], name[0])
)
print(metrics.flat_classification_report(
    y_test, y_pred, labels=sorted_labels, digits=3
))

In [74]:
len(train_sents)

57240

In [75]:
len(test_data)

100

In [48]:
word='ej'
word[-1:]

'j'