In [23]:
# models.py

from nerdata import *
from utils import *

import numpy as np

In [24]:
# Scoring function for sequence models based on conditional probabilities.
# Scores are provided for three potentials in the model: initial scores (applied to the first tag),
# emissions, and transitions. Note that CRFs typically don't use potentials of the first type.
class ProbabilisticSequenceScorer(object):
    def __init__(self, tag_indexer, word_indexer, init_log_probs, transition_log_probs, emission_log_probs):
        self.tag_indexer = tag_indexer
        self.word_indexer = word_indexer
        self.init_log_probs = init_log_probs
        self.transition_log_probs = transition_log_probs
        self.emission_log_probs = emission_log_probs

    def score_init(self, sentence, tag_idx):
        return self.init_log_probs[tag_idx]

    def score_transition(self, sentence, prev_tag_idx, curr_tag_idx):
        return self.transition_log_probs[prev_tag_idx, curr_tag_idx]

    def score_emission(self, sentence, tag_idx, word_posn):
        word = sentence.tokens[word_posn].word
        word_idx = self.word_indexer.index_of(word) if self.word_indexer.contains(word) else self.word_indexer.get_index("UNK")
        return self.emission_log_probs[tag_idx, word_idx]

In [25]:
class HmmNerModel(object):
    def __init__(self, tag_indexer, word_indexer, init_log_probs, transition_log_probs, emission_log_probs):
        self.tag_indexer = tag_indexer
        self.word_indexer = word_indexer
        self.init_log_probs = init_log_probs
        self.transition_log_probs = transition_log_probs
        self.emission_log_probs = emission_log_probs

    # Takes a LabeledSentence object and returns a new copy of that sentence with a set of chunks predicted by
    # the HMM model. See BadNerModel for an example implementation
    def decode(self, sentence):
        raise Exception("IMPLEMENT ME")

In [30]:
# T is the number of observations, N is the number of states
def viterbi (T,N, init_counts, emission_counts, transition_counts):
    path_prob_mat = np.ones((N,T), dtype=float) * 0.00
    backpointer = np.ones((N,T), dtype=int) * 0

    for state in range(N):
        path_prob_mat[state, 1] = np.log(init_counts[state]) + np.log(emission_counts[state,1])
        backpointer[state,1] = 0

    for t in range(1,T):
        for s in range(0,N):
            probabilities = np.log(np.multiply(np.exp(path_prob_mat[0:N,t-1]),np.exp(transition_counts[0:N,s]))* np.exp(emission_counts[s,t]))
            path_prob_mat[s,t] = probabilities.max()
            backpointer[s,t] = probabilities.argmax()

    best_path_prob = np.exp(path_prob_mat[0:N,T-1].max())
    best_path_pointer = path_prob_mat[0:N,T-1].argmax()
    best_path = []
    s = best_path_pointer
    for i in range(T-1,-1,-1):
        best_path.append(s)
        s = backpointer[s,i]
    return best_path, np.exp(best_path_prob)

In [41]:
init_counts = np.array([0.2767, 0.0006, 0.0031, 0.0453, 0.0449, 0.0510, 0.2026])
transition_counts = np.array([[0.3777,0.0110,0.0009,0.0084,0.0584, 0.0090,0.0025],
                              [0.0008,0.0002,0.7968,0.0005,0.0008,0.1698,0.0041],
                              [0.0322,0.0005,0.0050,0.0837,0.0615,0.0514,0.2231],
                              [0.0366,0.0004,0.0001,0.0733,0.4509,0.0036,0.0036],
                              [0.0096,0.0176,0.0014,0.0086,0.1216,0.0177,0.0068],
                              [0.0068,0.0102,0.1011,0.1012,0.0120,0.0728,0.0479],
                              [0.1147,0.0021,0.0002,0.2157,0.4744,0.0102,0.0017]])
emission_counts = np.array([[0.000032,0,0,0.000048,0],
                           [0,0.308431,0,0,0],
                           [0,0.000028,0.000672,0,0.000028],
                           [0,0,0.000340,0,0],
                           [0,0.000200,0.000223,0,0.002337],
                           [0,0,0.010446,0,0],
                           [0,0,0,0.506099,0]])

size_t = np.size(transition_counts)
size_e = np.size(emission_counts)

size_t
#transition_counts = np.log(transition_counts)
#emission_counts = np.log(transition_counts)
#viterbi(5,7,init_counts, emission_counts, transition_counts)

49

In [32]:
# Retrieves a word's index based on its count. If the word occurs only once, treat it as an "UNK" token
# At test time, unknown words will be replaced by UNKs.
def get_word_index(word_indexer, word_counter, word):
    if word_counter.get_count(word) < 1.5:
        return word_indexer.get_index("UNK")
    else:
        return word_indexer.get_index(word)

In [7]:
class CrfNerModel(object):
    def __init__(self, tag_indexer, feature_indexer, feature_weights):
        self.tag_indexer = tag_indexer
        self.feature_indexer = feature_indexer
        self.feature_weights = feature_weights

    # Takes a LabeledSentence object and returns a new copy of that sentence with a set of chunks predicted by
    # the CRF model. See BadNerModel for an example implementation
    def decode(self, sentence):
        raise Exception("IMPLEMENT ME")

In [8]:
# Trains a CrfNerModel on the given corpus of sentences.
def train_crf_model(sentences):
    tag_indexer = Indexer()
    for sentence in sentences:
        for tag in sentence.get_bio_tags():
            tag_indexer.get_index(tag)
    print("Extracting features")
    feature_indexer = Indexer()
    # 4-d list indexed by sentence index, word index, tag index, feature index
    feature_cache = [[[[] for k in range(0, len(tag_indexer))] for j in range(0, len(sentences[i]))] for i in range(0, len(sentences))]
    for sentence_idx in range(0, len(sentences)):
        if sentence_idx % 100 == 0:
            print("Ex %i/%i" % (sentence_idx, len(sentences)))
        for word_idx in range(0, len(sentences[sentence_idx])):
            for tag_idx in range(0, len(tag_indexer)):
                feature_cache[sentence_idx][word_idx][tag_idx] = extract_emission_features(sentences[sentence_idx], word_idx, tag_indexer.get_object(tag_idx), feature_indexer, add_to_indexer=True)
    print("Training")
    raise Exception("IMPLEMENT THE REST OF ME")

In [9]:
# Extracts emission features for tagging the word at word_index with tag.
# add_to_indexer is a boolean variable indicating whether we should be expanding the indexer or not:
# this should be True at train time (since we want to learn weights for all features) and False at
# test time (to avoid creating any features we don't have weights for).
def extract_emission_features(sentence, word_index, tag, feature_indexer, add_to_indexer):
    feats = []
    curr_word = sentence.tokens[word_index].word
    # Lexical and POS features on this word, the previous, and the next (Word-1, Word0, Word1)
    for idx_offset in range(-1, 2):
        if word_index + idx_offset < 0:
            active_word = "<s>"
        elif word_index + idx_offset >= len(sentence):
            active_word = "</s>"
        else:
            active_word = sentence.tokens[word_index + idx_offset].word
        if word_index + idx_offset < 0:
            active_pos = "<S>"
        elif word_index + idx_offset >= len(sentence):
            active_pos = "</S>"
        else:
            active_pos = sentence.tokens[word_index + idx_offset].pos
        maybe_add_feature(feats, feature_indexer, add_to_indexer, tag + ":Word" + repr(idx_offset) + "=" + active_word)
        maybe_add_feature(feats, feature_indexer, add_to_indexer, tag + ":Pos" + repr(idx_offset) + "=" + active_pos)
    # Character n-grams of the current word
    max_ngram_size = 3
    for ngram_size in range(1, max_ngram_size+1):
        start_ngram = curr_word[0:min(ngram_size, len(curr_word))]
        maybe_add_feature(feats, feature_indexer, add_to_indexer, tag + ":StartNgram=" + start_ngram)
        end_ngram = curr_word[max(0, len(curr_word) - ngram_size):]
        maybe_add_feature(feats, feature_indexer, add_to_indexer, tag + ":EndNgram=" + end_ngram)
    # Look at a few word shape features
    maybe_add_feature(feats, feature_indexer, add_to_indexer, tag + ":IsCap=" + repr(curr_word[0].isupper()))
    # Compute word shape
    new_word = []
    for i in range(0, len(curr_word)):
        if curr_word[i].isupper():
            new_word += "X"
        elif curr_word[i].islower():
            new_word += "x"
        elif curr_word[i].isdigit():
            new_word += "0"
        else:
            new_word += "?"
    maybe_add_feature(feats, feature_indexer, add_to_indexer, tag + ":WordShape=" + repr(new_word))
    return np.asarray(feats, dtype=int)
