# Description

This work aims to provide a basic understanding of Conditional random fields (CRF), Viterbi algorithm and
Markov chains.

# Tasks

In [1]:
import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.optim import Adam
import numpy as np

Task 1. Creating a CRF model to predict a sequence of 15 dice rolls (fair cube, biased cube).
<br>Probabilities:
* P (the first cube in the sequence is "fair") = 0.5
* P (current "fair" cubes | previous "fair" cubes) = 0.8
* P (current "biased" cubes | previous "biased" cubes) = 0.35

In [2]:
class CRF(nn.Module):
    
    def __init__(self, n_dice, log_likelihood):
        super(CRF, self).__init__()
        self.n_states = n_dice
        self.transition = torch.nn.init.normal(nn.Parameter(torch.randn(n_dice, n_dice + 1)), -1, 0.1)
        self.loglikelihood = log_likelihood
    

    def to_scalar(self, var):
        return var.view(-1).data.tolist()[0]


    def argmax(self, vec):
        _, idx = torch.max(vec, 1)
        return self.to_scalar(idx)
        
    # numerically stable log sum exp
    # source: http://pytorch.org/tutorials/beginner/nlp/advanced_tutorial.html
    def log_sum_exp(self, vec):
        max_score = vec[0, self.argmax(vec)]
        max_score_broadcast = max_score.view(1, -1).expand(1, vec.size()[1])
        return max_score + torch.log(torch.sum(torch.exp(vec - max_score_broadcast)))
    
    
    def _data_to_likelihood(self, rolls):
        """Converts a numpy array of rolls (integers) to log-likelihood.
        Input is one [1, n_rolls]
        """
        return Variable(torch.FloatTensor(self.loglikelihood[rolls]), requires_grad=False)
        
    
    def _compute_likelihood_numerator(self, loglikelihoods, states):
        """Computes numerator of likelihood function for a given sequence.
        
        We'll iterate over the sequence of states and compute the sum 
        of the relevant transition cost with the log likelihood of the observed
        roll. 
        Input:
            loglikelihoods: torch Variable. Matrix of n_obs x n_states. 
                            i,j entry is loglikelihood of observing roll i given state j
            states: sequence of labels
        Output:
            score: torch Variable. Score of assignment. 
        """
        
        prev_state = self.n_states
        score = Variable(torch.Tensor([0]))
        
        for index, state in enumerate(states):
            score += self.transition[state, prev_state] + loglikelihoods[index, state]
            prev_state = state
        return score
    
    def _compute_likelihood_denominator(self, loglikelihoods):
        """Implements the forward pass of the forward-backward algorithm.
        
        We loop over all possible states efficiently using the recursive
        relationship: alpha_t(j) = \sum_i alpha_{t-1}(i) * L(x_t | y_t) * C(y_t | y{t-1} = i)
        Input:
            loglikelihoods: torch Variable. Same input as _compute_likelihood_numerator.
                            This algorithm efficiently loops over all possible state sequences
                            so no other imput is needed.
        Output:
            torch Variable. 
        """

        # stores the current value of alpha at timestep t
        prev_alpha = self.transition[:, self.n_states] + loglikelihoods[0].view(1, -1)
        for roll in loglikelihoods[1:]:
            alpha_t = []

            # loop over all possible states
            for next_state in range(self.n_states):
                
                # compute all possible costs of transitioning to next_state
                feature_function = self.transition[next_state, :self.n_states].view(1, -1) + \
                                   roll[next_state].view(1, -1).expand(1, self.n_states)
                alpha_t_next_state = prev_alpha + feature_function
                alpha_t.append(self.log_sum_exp(alpha_t_next_state))
            
            prev_alpha = torch.stack(alpha_t).view(1, -1)
        
        return self.log_sum_exp(prev_alpha)
    
    def _viterbi_algorithm(self, loglikelihoods):
        """Implements Viterbi algorithm for finding most likely sequence of labels.
        
        Very similar to _compute_likelihood_denominator but now we take the maximum
        over the previous states as opposed to the sum. 
        Input:
            loglikelihoods: torch Variable. Same input as _compute_likelihood_denominator.
        Output:
            tuple. First entry is the most likely sequence of labels. Second is
                   the loglikelihood of this sequence. 
        """

        argmaxes = []
        # prev_delta will store the current score of the sequence for each state
        prev_delta = self.transition[:, self.n_states].contiguous().view(1, -1) + loglikelihoods[0].view(1, -1)

        for roll in loglikelihoods[1:]:
            local_argmaxes = []
            next_delta = []
            for next_state in range(self.n_states):
                feature_function = self.transition[next_state,:self.n_states].view(1, -1) + roll.view(1, -1) + prev_delta
                most_likely_state = self.argmax(feature_function)
                score = feature_function[0][most_likely_state]
                next_delta.append(score)
                local_argmaxes.append(most_likely_state)
            
            prev_delta = torch.stack(next_delta).view(1, -1)
            argmaxes.append(local_argmaxes)
        
        final_state = self.argmax(prev_delta)
        final_score = prev_delta[0][final_state]
        path_list = [final_state]

        # backtrack through the argmaxes to find most likely state
        for states in reversed(argmaxes):
            final_state = states[final_state]
            path_list.append(final_state)
        
        return np.array(path_list), final_score
        
    def neg_log_likelihood(self, rolls, states):
        """Compute neg log-likelihood for a given sequence.
        
        Input: 
            rolls: numpy array, dim [1, n_rolls]. Integer 0-5 showing value on dice.
            states: numpy array, dim [1, n_rolls]. Integer 0, 1. 0 if dice is fair.
        """
        
        loglikelihoods = self._data_to_likelihood(rolls)
        states = torch.LongTensor(states)
        sequence_loglik = self._compute_likelihood_numerator(loglikelihoods, states)
        denominator = self._compute_likelihood_denominator(loglikelihoods)
        return denominator - sequence_loglik
               
    
    def forward(self, rolls):
        loglikelihoods = self._data_to_likelihood(rolls)
        return self._viterbi_algorithm(loglikelihoods)

In [3]:
def crf_train_loop(model, rolls, targets, n_epochs, learning_rate=0.01):
    # train of CRF with Adam optimizer
    optimizer = Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4)
    
    for epoch in range(n_epochs):
        batch_loss = []
        N = rolls.shape[0]
        model.zero_grad()
        
        for index, (roll, labels) in enumerate(zip(rolls, targets)):
            # forward pass
            neg_log_likelihood = model.neg_log_likelihood(roll, labels)
            batch_loss.append(neg_log_likelihood)
            
            if index % 50 == 0:
                ll = torch.cat(batch_loss).mean()
                ll.backward()
                optimizer.step()
                batch_loss = []
        print("Epoch {}: loss is {:.4f}".format(epoch, ll.data.numpy()))
    return model

__Set input data__

In [4]:
# two dice one is fair, one is loaded
fair_dice = np.array([1/6]*6)
loaded_dice = np.array([0.04, 0.04, 0.04, 0.04, 0.04, 0.8])
probabilities = {'fair': fair_dice,
                'loaded': loaded_dice}

In [5]:
# if dice is fair at time t, 0.6 chance we stay fair, 0.4 chance it is loaded at time 2
transition_mat = {'fair': np.array([0.6, 0.4, 0.0]),
                 'loaded': np.array([0.3, 0.7, 0.0]),
                 'start': np.array([0.5, 0.5, 0.0])}
states = ['fair', 'loaded', 'start']
state2ix = {'fair': 0, 'loaded': 1, 'start': 2}

log_likelihood = np.hstack([np.log(fair_dice).reshape(-1,1), np.log(loaded_dice).reshape(-1,1)])

In [6]:
def simulate_data(n_timesteps):
    data = np.zeros(n_timesteps)
    prev_state = 'start'
    state_list = np.zeros(n_timesteps)
    for n in range(n_timesteps):
        next_state = np.random.choice(states, p=transition_mat[prev_state])
        state_list[n] = state2ix[next_state]
        next_data = np.random.choice([0,1,2,3,4,5], p=probabilities[next_state])
        data[n] = next_data
        prev_state = next_state
    return data, state_list

In [7]:
n_obs = 15
rolls = np.zeros((5000, n_obs)).astype(int)
targets = np.zeros((5000, n_obs)).astype(int)

for i in range(5000):
    data, dices = simulate_data(n_obs)
    rolls[i] = data.reshape(1, -1).astype(int)
    targets[i] = dices.reshape(1, -1).astype(int)

__Model training__

In [8]:
model = CRF(2, log_likelihood)
model = crf_train_loop(model, rolls, targets, 10, 0.001)
torch.save(model.state_dict(), "./checkpoint.hdf5")

  


Epoch 0: loss is 6.5545
Epoch 1: loss is 6.4096
Epoch 2: loss is 6.3399
Epoch 3: loss is 6.3023
Epoch 4: loss is 6.2950
Epoch 5: loss is 6.2904
Epoch 6: loss is 6.2987
Epoch 7: loss is 6.2963
Epoch 8: loss is 6.3064
Epoch 9: loss is 6.2998


In [9]:
model.load_state_dict(torch.load("./checkpoint.hdf5"));

__Set test data__

In [10]:
data, dices = simulate_data(15)
test_rolls = data.reshape(1, -1).astype(int)
test_targets = dices.reshape(1, -1).astype(int)

In [11]:
print(test_rolls[0])
print(model.forward(test_rolls[0])[0])
print(test_targets[0])

[1 3 1 0 5 5 5 0 4 5 2 3 5 3 5]
[1 1 0 1 0 0 1 0 0 1 1 1 0 0 0]
[0 0 0 0 0 1 1 0 0 0 0 0 0 0 1]


In [12]:
print(np.exp(list(model.parameters())[0].data.numpy()))

[[0.4652566  0.23195508 0.39975867]
 [0.27500758 0.50441575 0.38241985]]


In [13]:
data, dices = simulate_data(15)
test_rolls = data.reshape(1, -1).astype(int)
test_targets = dices.reshape(1, -1).astype(int)

In [14]:
print(test_rolls[0])
print(model.forward(test_rolls[0])[0])
print(test_targets[0])

[2 5 3 2 3 5 5 5 5 4 1 3 4 0 5]
[1 1 0 0 0 0 0 1 1 1 1 0 0 0 0]
[0 1 0 0 0 1 1 0 1 0 0 0 0 0 1]
