In [1]:
import numpy as np

# Load data

with open("data/model_init.txt") as model_init:
    model_init.readline() # "initial: 6"
    initial = np.array([float(x) for x in model_init.readline().split()])
    model_init.readline()
    model_init.readline() # "transition: 6"
    transition = np.array([[float(x) for x in model_init.readline().split()]
                           for _ in range(6)])
    model_init.readline()
    model_init.readline() # "emission: 6"
    emission = np.array([[float(x) for x in model_init.readline().split()]
                          for _ in range(6)])

vocabulary = "ABCDEF"
lookup = {letter: index for index, letter in enumerate(vocabulary)}
train_data = []
for i in range(1, 6):
    train_data_i = []
    with open(f"data/seq_model_0{i}.txt") as data:
        for line in data:
            train_data_i.append(np.array([lookup[letter] for letter in line.rstrip()]))
    train_data.append(np.array(train_data_i))
    print(f"Train data {i} loaded: {train_data[i-1].shape}")

with open(f"data/dev_data.txt") as data:
    dev_data = []
    for line in data:
        sequence, label = line.split("\t")
        sequence = np.array([lookup[letter] for letter in sequence])
        label = int(label) - 1
        dev_data.append((sequence, label))
print(f"Development data loaded: {len(dev_data)}")

print()
print("Starting initial probabilities:")
print(initial)
print("initial[i] is the probability of starting in state i")
print()
print("Starting transition probabilities:")
print(transition)
print("transition[i,j] is the probability of transitioning from state i to state j")
print()
print("Starting emission probabilities")
print(emission)
print("emission[v,i] is the probability of emitting v in state i")

Train data 1 loaded: (10000, 50)
Train data 2 loaded: (10000, 50)
Train data 3 loaded: (10000, 50)
Train data 4 loaded: (10000, 50)
Train data 5 loaded: (10000, 50)
Development data loaded: 500

Starting initial probabilities:
[0.2 0.1 0.2 0.2 0.2 0.1]
initial[i] is the probability of starting in state i

Starting transition probabilities:
[[0.3 0.3 0.1 0.1 0.1 0.1]
 [0.1 0.3 0.3 0.1 0.1 0.1]
 [0.1 0.1 0.3 0.3 0.1 0.1]
 [0.1 0.1 0.1 0.3 0.3 0.1]
 [0.1 0.1 0.1 0.1 0.3 0.3]
 [0.3 0.1 0.1 0.1 0.1 0.3]]
transition[i,j] is the probability of transitioning from state i to state j

Starting emission probabilities
[[0.2 0.2 0.1 0.1 0.1 0.1]
 [0.2 0.2 0.2 0.2 0.1 0.1]
 [0.2 0.2 0.2 0.2 0.2 0.2]
 [0.2 0.2 0.2 0.2 0.2 0.2]
 [0.1 0.1 0.2 0.2 0.2 0.2]
 [0.1 0.1 0.1 0.1 0.2 0.2]]
emission[v,i] is the probability of emitting v in state i


In [8]:
class HMM:
    def __init__(self, initial, transition, emission):
        self.initial = initial.copy()
        self.transition = transition.copy()
        self.emission = emission.copy()
        self.num_emissions, self.num_states = emission.shape
    
    def save(self, filename):
        with open(filename, "wb") as out_file:
            np.savez(out_file, initial=self.initial, transition=self.transition, emission=self.emission)
    
    @classmethod
    def load(cls, filename):
        with open(filename, "rb") as in_file:
            arrays = np.load(in_file)
            hmm = cls(arrays["initial"], arrays["transition"], arrays["emission"])
            arrays.close()
        return hmm
    
    def forward(self, observation):
        # Input: int array of size (length,). observation[t] is the observation at time t
        # Output: forward trellis - double array of size (length, self.num_states)
        length = len(observation)
        alpha = np.zeros((length, self.num_states), dtype=np.double)
        alpha[0] = self.initial * self.emission[observation[0]]
        for t in range(1, length):
            alpha[t] = np.matmul(alpha[t-1], self.transition) * self.emission[observation[t]]
        return alpha
    
    def forward_batch(self, observations):
        # OPTIONAL
        # Input: int array of size (length, num_observations). observation[t, i] is the observation at time t in sequence i
        # Output: forward trellis - double array of size (length, num_observations, self.num_states)
        length, num_observations = observations.shape
        alpha = np.zeros((length, num_observations, self.num_states), dtype=np.double)
        alpha[0] = self.initial * self.emission[observations[0]]
        for t in range(1, length):
            alpha[t] = np.matmul(alpha[t-1], self.transition) * self.emission[observations[t]]
        return alpha

        
    def backward(self, observation):
        # Input: int array of size (length,). observation[t] is the observation at time t
        # Output: backward trellis - double array of size (length, self.num_states)
        length = len(observation)
        beta = np.zeros((length, self.num_states), dtype=np.double)
        beta[length-1] = 1
        for t in reversed(range(length-1)):
            beta[t] = np.matmul(self.emission[observation[t+1]] * beta[t+1], self.transition.T)
        return beta
    
    def backward_batch(self, observations):
        # OPTIONAL
        # Input: int array of size (length, num_observations). observation[t, i] is the observation at time t in sequence i
        # Output: backward trellis - double array of size (length, num_observations, self.num_states)
        length, num_observations = observations.shape
        beta = np.zeros((length, num_observations, self.num_states), dtype=np.double)
        beta[length-1] = 1
        for t in reversed(range(length-1)):
            beta[t] = np.matmul(self.emission[observations[t+1]] * beta[t+1], self.transition.T)
        return beta
        
    def viterbi(self, observation):
        # Input: int array of size (length,). observation[t] is the observation at time t
        # Output: tuple with (viterbi_path, likelihood)
        # viterbi_path is an int array of size (length,). viterbi_path[t] is state at time t in the most likely state sequence.
        # likelihood: double scalar. The probability of the most likely path given the observation
        length = len(observation)
        delta = np.zeros((length, self.num_states), dtype=np.double)
        psi = np.zeros((length-1, self.num_states), dtype=int)
        delta[0] = self.initial * self.emission[observation[0]]

        for t in range(1, length):
            inner = delta[t-1, :, None] * self.transition
            delta[t] = np.max(inner, 0) * self.emission[observation[t]]
            psi[t-1] = np.argmax(inner, 0)

        best_path = np.empty((length,), dtype=int)
        probability = np.max(delta[length-1])
        best_path[length-1] = np.argmax(delta[length-1])
        for t in reversed(range(length-1)):
            best_path[t] = psi[t, best_path[t+1]]
        return best_path, probability
        
    def baum_welch(self, observations):
        # Input: int array of size (num_observations, length). Each observations[i] is an independent observation sequence. observations[i, t] is the observation in sample i at time t
        # Output: likelihoods: double array of size (num_observations,). likelihoods[i] is the probability of observing the sequence observations[i] given the (old) model parameters
        # Side Effect: Update self.initial, self.transition and self.emission according to the Baum-Welch reestimation rule for multiple observation sequences.
        num_observations, length = observations.shape
        observations = np.ascontiguousarray(observations.T)
        alpha = self.forward_batch(observations)
        beta = self.backward_batch(observations)

        assert alpha.shape == beta.shape == (length, num_observations, self.num_states)

        observation_probability = alpha[-1].sum(1)
        gamma = alpha * beta / observation_probability[np.newaxis, :, np.newaxis]
        xi = alpha[:length-1, :, :, np.newaxis] * self.transition * (beta * self.emission[observations])[1:, :, np.newaxis, :]
        xi /= observation_probability[np.newaxis, :, np.newaxis, np.newaxis]

        assert gamma.shape == (length, num_observations, self.num_states)
        assert xi.shape == (length-1, num_observations, self.num_states, self.num_states)

        self.initial[:] = gamma[0].mean(0)
        self.transition[:] = xi.sum(axis=(0,1)) / gamma[:length-1].sum(axis=(0,1))
        sum_gamma = gamma.sum(axis=(0,1))
        for vk in range(self.num_emissions):
            self.emission[vk] = gamma[observations == vk].sum(0) / sum_gamma
        return observation_probability

In [9]:
import os
from tqdm.notebook import tqdm
hmms = [HMM(initial, transition, emission) for i in range(5)]
batch_size = 50

os.makedirs("models", exist_ok=True)

for n,(dataset, hmm) in enumerate(zip(train_data, hmms)):
    pbar = tqdm(total=len(dataset))
    for i in range(0, len(dataset), batch_size):
        likelihood = hmm.baum_welch(dataset[i:i+batch_size])
        pbar.set_description(f"log-likelihood: {np.mean(np.log(likelihood)):.2f}")
        pbar.update(batch_size)
    pbar.close()
    hmm.save(os.path.join("models", f"model_{n+1}"))


  0%|          | 0/10000 [00:00<?, ?it/s]

  0%|          | 0/10000 [00:00<?, ?it/s]

  0%|          | 0/10000 [00:00<?, ?it/s]

  0%|          | 0/10000 [00:00<?, ?it/s]

  0%|          | 0/10000 [00:00<?, ?it/s]

In [11]:
correct = 0
pbar = tqdm(total=len(dev_data))
for sample, label in dev_data:
    decision = np.argmax([hmm.viterbi(sample)[1] for hmm in hmms])
    correct += (decision == label)
    pbar.update()
pbar.close()
print(f"Accuracy: {correct / len(dev_data):.1%}")

  0%|          | 0/500 [00:00<?, ?it/s]

Accuracy: 82.6%
