In [2]:
import nltk
import random
from nltk.corpus import brown
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution

In [3]:
nltk.download('brown')

[nltk_data] Error loading brown: <urlopen error [SSL:
[nltk_data]     CERTIFICATE_VERIFY_FAILED] certificate verify failed:
[nltk_data]     unable to get local issuer certificate (_ssl.c:1135)>


False

In [4]:
# corpus = list(nltk.corpus.brown.tagged_sents())
corpus = list(brown.tagged_sents(tagset='universal'))

random.seed(42)
random.shuffle(corpus)

split = int(0.8 * len(corpus))

train = corpus[:split]
test = corpus[split:]

print('Train samples:', len(train))
print('Test samples:', len(test))

Train samples: 45872
Test samples: 11468


In [5]:
def rearrange_data(sequences):
    x = []
    y = []
    w = set()
    t = set()
    for sequence in sequences:
        sequence_x = []
        sequence_y = []
        for word, tag in sequence:
            sequence_x.append(word)
            sequence_y.append(tag)
            w.add(word)
            t.add(tag)
        x.append(sequence_x)
        y.append(sequence_y)
    return x, y, w, t

In [6]:
train_x, train_y, train_words, train_tagset = rearrange_data(train)
test_x, test_y, test_words, test_tagset = rearrange_data(test)

In [7]:
def pair_counts(sequences_A, sequences_B):
    emission_counts = {}

    for sequence_A, sequence_B in zip(sequences_A, sequences_B):
        
        for a, b in zip(sequence_A, sequence_B):
            
            if a not in emission_counts.keys():
                emission_counts[a] = {}
            
            if b in emission_counts[a].keys():
                emission_counts[a][b]+=1
            else:
                emission_counts[a][b]=1
    
    return emission_counts


In [8]:
def unigram_counts(sequences):
    tag_unigrams = {}
    
    for sequence in sequences:
        
        for tag in sequence:
            
            if tag in tag_unigrams.keys():
                tag_unigrams[tag]+=1
            else:
                tag_unigrams[tag]=1
                
    return tag_unigrams

In [9]:
def bigram_counts_brown(sequences):
    tag_bigrams = {}

    for sequence in sequences:
        
        l = len(sequence)
        
        for index in range(l-1):
            
            a,b = sequence[index], sequence[index+1]
            
            if (a,b) in tag_bigrams.keys():
                tag_bigrams[(a,b)] += 1
            else:
                tag_bigrams[(a,b)] = 1
                
    return tag_bigrams   

In [10]:
def starting_counts(sequences):
    tag_starts = {}
    
    for sequence in sequences:
        if sequence[0] in tag_starts.keys():
            tag_starts[sequence[0]]+=1
        else:
            tag_starts[sequence[0]]=1

    return tag_starts

In [11]:
def ending_counts(sequences):
    tag_ends = {}
    
    for sequence in sequences:
        
        if sequence[-1] in tag_ends.keys():
            tag_ends[sequence[-1]]+=1
        else:
            tag_ends[sequence[-1]]=1
    
    return tag_ends

In [12]:
emission_counts = pair_counts(train_y, train_x)
tag_unigrams = unigram_counts(train_y)
tag_bigrams = bigram_counts_brown(train_y)
tag_starts = starting_counts(train_y)
tag_ends = ending_counts(train_y)

In [18]:
brown_model = HiddenMarkovModel(name="brown-hmm-tagger")

states = dict()
for tag, words in emission_counts.items():
    probs = {w:c / tag_unigrams[tag] for w, c in words.items()}
    emissions = DiscreteDistribution(probs)
    state = State(emissions, name=tag)
    brown_model.add_states(state)
    states[tag] = state

n = sum(tag_starts.values())
for tag, counts in tag_starts.items():
    brown_model.add_transition(brown_model.start, states[tag], counts / n)

for (tag1, tag2), counts in tag_bigrams.items():
    brown_model.add_transition(states[tag1], states[tag2], counts / tag_unigrams[tag1])

for tag, counts in tag_ends.items():
    brown_model.add_transition(states[tag], brown_model.end, counts / tag_unigrams[tag])

brown_model.bake()

print('Edges Brown model:', brown_model.edge_count())

Edges Brown model: 168


In [20]:
def replace_unknown_brown(sequence, vocabulary):
    return [w if w in vocabulary else 'nan' for w in sequence]

def simplify_decoding_brown(X, model, vocabulary):
    _, state_path = model.viterbi(replace_unknown_brown(X, vocabulary))
    return [state[1].name for state in state_path[1:-1]] 

In [21]:
def accuracy_brown(X, Y, model, vocabulary):
    correct = total_predictions = 0
    for observations, actual_tags in zip(X, Y):
        try:
            most_likely_tags = simplify_decoding_brown(observations, model, vocabulary)
            correct += sum(p == t for p, t in zip(most_likely_tags, actual_tags))
        except:
            pass
        total_predictions += len(observations)
    return correct / total_predictions

In [22]:
training_acc = accuracy_brown(train_x, train_y, brown_model, train_words)
print("training accuracy: {:.2f}%".format(100 * training_acc))

testing_acc = accuracy_brown(test_x, test_y, brown_model, train_words)
print("testing accuracy: {:.2f}%".format(100 * testing_acc))

training accuracy: 97.54%
testing accuracy: 96.06%


In [28]:
# sentence = ["I", "love", "you"]
sentence = ['Where', 'can', 'I', 'find', 'the', 'supermarket', 'in', 'this', 'town', '?']

tags = simplify_decoding_brown(sentence, brown_model, train_words)
for word, tag in zip(sentence, tags):
    print(word, tag)

Where ADV
can VERB
I PRON
find VERB
the DET
supermarket NOUN
in ADP
this DET
town NOUN
? .
