In [1]:
%matplotlib inline


Bad key savefig.frameon in file /home/michele/miniconda3/lib/python3.8/site-packages/matplotlib/mpl-data/stylelib/_classic_test.mplstyle, line 421 ('savefig.frameon : True')
You probably need to get an updated matplotlibrc file from
https://github.com/matplotlib/matplotlib/blob/v3.3.3/matplotlibrc.template
or from the matplotlib source distribution

Bad key verbose.level in file /home/michele/miniconda3/lib/python3.8/site-packages/matplotlib/mpl-data/stylelib/_classic_test.mplstyle, line 472 ('verbose.level  : silent      # one of silent, helpful, debug, debug-annoying')
You probably need to get an updated matplotlibrc file from
https://github.com/matplotlib/matplotlib/blob/v3.3.3/matplotlibrc.template
or from the matplotlib source distribution

Bad key verbose.fileo in file /home/michele/miniconda3/lib/python3.8/site-packages/matplotlib/mpl-data/stylelib/_classic_test.mplstyle, line 473 ('verbose.fileo  : sys.stdout  # a log filename, sys.stdout or sys.stderr')
You probably need to g

# SNLP exercise sheet 4
###############################################################################

In [2]:
import math
from math import log, inf
import sys
import numpy as np
from collections import OrderedDict, defaultdict
import random
from datetime import datetime

This function can be used for importing the corpus.
Parameters: path_to_file: string; path to the file containing the corpus
Returns: list of list; the first layer list contains the sentences of the corpus;
    the second layer list contains tuples (token,label) representing a labelled sentence

In [3]:
def import_corpus(path_to_file):
    sentences = []
    sentence = []
    f = open(path_to_file)

    while True:
        line = f.readline()
        if not line: break

        line = line.strip()
        if len(line) == 0:
            sentences.append(sentence)
            sentence = []
            continue

        parts = line.split(' ')
        sentence.append((parts[0], parts[-1]))

    f.close()
    return sentences

Utility functions

In [4]:
def orderedDict2array(orderedDict):
    return np.array(list(orderedDict.values()))

def full_log(x):
    if x == 0:
        return -inf
    else:
        return log(x)

def build_features(token, label, prev_label):
    features = set()
    features.add('t=' + token + '/l=' + label)
    features.add('l1=' + prev_label + '/l2=' + label)
    return features

print(build_features('cat', 'NN', 'start'))

{'l1=start/l2=NN', 't=cat/l=NN'}


In [5]:
class LinearChainCRF(object):
    # training corpus
    corpus = None
    
    # (numpy) array containing the parameters of the model
    # has to be initialized by the method 'initialize'
    # An OrderedDict can be casted to a numpy array, but we can use strings to access its elements
    theta = OrderedDict()
    
    # set containing all features observed in the corpus 'self.corpus'
    # choose an appropriate data structure for representing features
    # each element of this set has to be assigned to exactly one component of the vector 'self.theta'
    features = set()
    
    # set containing all lables observed in the corpus 'self.corpus'
    labels = set()
    
    
    def initialize(self, corpus):
        '''
        build two sets 'self.features' and 'self.labels'
        '''
        self.corpus = corpus
        
        # add starting label
        self.labels.add('start')
        
        # extract all tokens and labels
        for sentence in corpus:
            
            prev_label = 'start'
            for token, label in sentence:
                
                self.labels.add(label)
                
                features = build_features(token, label, prev_label)
                for feature in features:
                    self.features.add(feature)
                    
                prev_label = label
                
        # initialize theta
        # sorted is used to establish a clear order in the feature set
        for feature in sorted(self.features):
            self.theta[feature] = 1
            
    #
    def get_active_features(self, token, label, prev_label):
        observed_features = build_features(token, label, prev_label)
        return observed_features.intersection(self.features)
    
    #
    def compute_psi(self, token, label, prev_label):
        
        param_sum = 0.0
        for feature in self.get_active_features(token, label, prev_label):
            param_sum += self.theta[feature]
            
        return math.exp(param_sum)

    # Exercise 1 a) ###################################################################
    def forward_variables(self, sentence):
        '''
        Compute the forward variables for a given sentence.
        Parameters: sentence: list of strings representing a sentence.
        Returns: data structure containing the matrix of forward variables
        '''
        
        T = len(sentence)
        # alpha is indexed from 0 to T - 1 to be consistent with the array indexing
        alpha = {}        
        
        # initialization
        # this dictionary contains labels as keys (j)
        alpha[0] = {}
        for label in self.labels:
            alpha[0][label] = self.compute_psi(sentence[0], label, 'start')
            
        # inductive case
        t = 1
        while t <= T - 1:
            alpha[t] = {}
            for label in self.labels:
                s = 0.0
                for prev_label in self.labels:
                    s += self.compute_psi(sentence[t-1], label, prev_label) * alpha[t-1][prev_label]
                    
                alpha[t][label] = s
            
            t += 1
            
        return alpha
        
        
    def backward_variables(self, sentence):
        '''
        Compute the backward variables for a given sentence.
        Parameters: sentence: list of strings representing a sentence.
        Returns: data structure containing the matrix of backward variables
        '''
        
        T = len(sentence)
        # beta is indexed from 0 to T - 1 to be consistent with the array indexing
        beta = {}        
        
        # initialization
        # this dictionary contains labels as keys (j)
        beta[T-1] = {}
        for label in self.labels:
            beta[T-1][label] = 1
            
        # inductive case
        t = T - 2
        while t >= 0:
            beta[t] = {}
            for label in self.labels:
                s = 0.0
                for prev_label in self.labels:
                    s += self.compute_psi(sentence[t+1], label, prev_label) * beta[t+1][prev_label]
                    
                beta[t][label] = s
            
            t -= 1
            
        return beta
        
    
    # Exercise 1 b) ###################################################################
    def compute_z(self, alpha_final):
        '''
        Compute the partition function Z(x).
        Parameters: alpha_final: final values of forward variables
        Returns: float;
        '''

        return sum(alpha_final.values())
        
            
    # Exercise 1 c) ###################################################################
    def marginal_probability(self, sentence, t, y_t, y_t_minus_one):
        '''
        Compute the marginal probability of the labels given by y_t and y_t_minus_one given a sentence.
        Parameters: sentence: list of strings representing a sentence.
                    y_t: element of the set 'self.labels'; label assigned to the word at position t
                    y_t_minus_one: element of the set 'self.labels'; label assigned to the word at position t-1
        Returns: float: probability;
        '''
        
        assert t >= 0
        assert t < len(sentence)
        assert y_t in self.labels
        assert y_t_minus_one in self.labels
        
        alpha = self.forward_variables(sentence)
        beta = self.backward_variables(sentence)
        z = self.compute_z(alpha[len(sentence)-1])
        
        if t == 0:
            if y_t_minus_one != 'start':
                # return zero if the start lable is not start
                return 0.0
            return 1/z * self.compute_psi(sentence[t], y_t, y_t_minus_one) * beta[t][y_t]
        else:
            return 1/z * alpha[t-1][y_t_minus_one] * self.compute_psi(sentence[t], y_t, y_t_minus_one) * beta[t][y_t]
    
    
    # Exercise 1 d) ###################################################################
    def expected_feature_count(self, sentence, feature):
        '''
        Compute the expected feature count for the feature referenced by 'feature'
        Parameters: sentence: list of strings representing a sentence.
                    feature: a feature; element of the set 'self.features'
        Returns: float;
        '''
        
        expected_count = 0.0
        for t, token in enumerate(sentence):
            for prev_label in self.labels:
                for label in self.labels:
                    features = build_features(token, label, prev_label)
                    if feature in features:
                        expected_count += self.marginal_probability(sentence, t, label, prev_label)
                            
        return expected_count
    
    def full_expected_feature_count(self, sentence):
        feature_count = OrderedDict()
        
        # use theta.keys instead of features because it is ordered
        for feature in self.theta.keys():
            feature_count[feature] = self.expected_feature_count(sentence, feature)
            
        return orderedDict2array(feature_count)    
    
#     def _full_expected_feature_count(self, sentence):
#         feature_count = OrderedDict()
        
#         for feature in self.theta.keys():
#             feature_count[feature] = 0.0

#         for t, token in enumerate(sentence):
#             for prev_label in self.labels:
#                 for label in self.labels:
#                     features = build_features(token, label, prev_label)
#                     for feature in features:
#                         # if the combination is a know feature
#                         if feature in feature_count:
#                             # sum up its marginal probability
#                             feature_count[feature] += self.marginal_probability(sentence, t, label, prev_label)
            
#         return orderedDict2array(feature_count)
    
    def full_empirical_feature_count(self, sentence):
        
        # initialize feature count to zero
        feature_count = OrderedDict()
        # use theta.keys instead of features because it is ordered
        for feature in self.theta.keys():
            feature_count[feature] = 0
            
        # count features
        prev_label = 'start'
        for token, label in sentence:
            observed_features = build_features(token, label, prev_label)
            
            for feature in observed_features:
                assert feature in feature_count, feature
                feature_count[feature] += 1
                    
            prev_label = label
            
        return orderedDict2array(feature_count)
        
    
    # Exercise 1 e) ###################################################################
    def train(self, num_iterations, learning_rate=0.01):
        '''
        Method for training the CRF.
        Parameters: num_iterations: int; number of training iterations
                    learning_rate: float
        '''
        
        sentence = random.choice(self.corpus)
        # tokens only
        x = list(map(lambda pair: pair[0], sentence))
        
        empirical_count = self.full_empirical_feature_count(sentence)
        expected_count = self.full_expected_feature_count(x)
        
        gradient = empirical_count - expected_count
        
        for i, key in enumerate(self.theta.keys()):
            self.theta[key] += learning_rate * gradient[i]
        
    
    
    # Exercise 2 ###################################################################
    def most_likely_label_sequence(self, sentence):
        '''
        Compute the most likely sequence of labels for the words in a given sentence.
        Parameters: sentence: list of strings representing a sentence.
        Returns: list of lables; each label is an element of the set 'self.labels'
        '''

        T = len(sentence) - 1

        # (label, observation_index) --> log probability
        trellis = defaultdict(lambda: -inf)

        # list to keep track of the values
        gamma_list = []

        # label with maximum probability
        current_gamma = ""
        
        # maximum logarithmic probability
        max_probability = -inf
        
        for label in self.labels:
            trellis[(label, 0)] = full_log(self.compute_psi(sentence[0], label, 'start'))
            
            if trellis[(label, 0)] > max_probability:
                max_probability = trellis[label, 0]
                current_gamma = label

        gamma_list.append(current_gamma)

        t = 1
        while t <= T:
            max_probability = -inf

            # label of the current delta
            for label_j in self.labels:
                delta_j = -inf
                
                # label of the previous delta
                for label_i in self.labels:                    
                    delta_j_candidate = trellis[label_i,t-1] + full_log(self.compute_psi(sentence[t], label_j, label_i))

                    # consider the maximum
                    if delta_j_candidate > delta_j:
                        delta_j = delta_j_candidate
                        current_gamma = label_j

                trellis[label_j, t] = delta_j

            gamma_list.append(current_gamma)
            t += 1

        # print(trellis)
        return gamma_list

    

In [6]:
corpus = import_corpus('corpus_pos.txt')

# random.shuffle(corpus)

split_limit = int(len(corpus) * 0.8)
training_corpus = corpus[:split_limit]
test_corpus = corpus[split_limit:]

print(training_corpus[0])
print(test_corpus[0])

[('A', 'DT'), ('Lorillard', 'NNP'), ('spokewoman', 'NN'), ('said', 'VBD'), ('This', 'DT'), ('is', 'VBZ'), ('an', 'DT'), ('old', 'JJ'), ('story', 'NN')]
[('IBM', 'NNP'), ("'s", 'POS'), ('750', 'CD'), ('million', 'CD'), ('*U*', '-NONE-'), ('debenture', 'NN'), ('offering', 'NN'), ('dominated', 'VBD'), ('activity', 'NN'), ('in', 'IN'), ('the', 'DT'), ('corporate', 'JJ'), ('debt', 'NN'), ('market', 'NN')]


In [7]:
lcrf = LinearChainCRF()
lcrf.initialize(training_corpus[:5])

print(orderedDict2array(lcrf.theta))
print(lcrf.labels)
# print(lcrf.features)

[1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1]
{'CC', '-NONE-', 'VBP', 'VBD', 'NN', 'VBN', 'NNS', 'RB', 'start', 'CD', 'VBZ', 'VBG', 'DT', 'JJ', 'PRP', 'TO', 'POS', 'IN', 'NNP'}


In [8]:
sentence = training_corpus[0]
x = list(map(lambda pair: pair[0], sentence))
print(x)

alpha = lcrf.forward_variables(x)
print(alpha[0])

beta = lcrf.backward_variables(x)
print(beta[len(sentence)-1])

['A', 'Lorillard', 'spokewoman', 'said', 'This', 'is', 'an', 'old', 'story']
{'CC': 1.0, '-NONE-': 1.0, 'VBP': 1.0, 'VBD': 1.0, 'NN': 1.0, 'VBN': 1.0, 'NNS': 1.0, 'RB': 1.0, 'start': 1.0, 'CD': 1.0, 'VBZ': 1.0, 'VBG': 1.0, 'DT': 7.38905609893065, 'JJ': 1.0, 'PRP': 2.718281828459045, 'TO': 1.0, 'POS': 1.0, 'IN': 2.718281828459045, 'NNP': 2.718281828459045}
{'CC': 1, '-NONE-': 1, 'VBP': 1, 'VBD': 1, 'NN': 1, 'VBN': 1, 'NNS': 1, 'RB': 1, 'start': 1, 'CD': 1, 'VBZ': 1, 'VBG': 1, 'DT': 1, 'JJ': 1, 'PRP': 1, 'TO': 1, 'POS': 1, 'IN': 1, 'NNP': 1}


In [9]:
z = lcrf.compute_z(alpha[len(sentence)-1])
print(z)

10234784412549.58


In [10]:
print(lcrf.expected_feature_count(x, 't=Lorillard/l=NNP'))

0.20351840033930563


In [11]:
print(len(sentence))
print(lcrf.full_empirical_feature_count(sentence).sum())

9
18


In [12]:
then = datetime.now()
print(lcrf.full_expected_feature_count(x).sum())
print(datetime.now() - then)

5.42653902455379
0:00:10.622554


In [13]:
then = datetime.now()
lcrf.train(1)
print(datetime.now() - then)

0:00:10.601994


In [14]:
sentence = test_corpus[0]
x = list(map(lambda pair: pair[0], sentence))

print(x)

y = lcrf.most_likely_label_sequence(x)
print(y)

['IBM', "'s", '750', 'million', '*U*', 'debenture', 'offering', 'dominated', 'activity', 'in', 'the', 'corporate', 'debt', 'market']
['DT', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP', 'NNP']
