# Компʼютерний практикум №15
Виконав студент групи ЗК-41мп Гломозда Костянтин

TRANSFER LEARNING (ЧАСТИНА II).
ЗБЕРЕЖЕННЯ МОДЕЛІ, ЗАВАНТАЖЕННЯ МОДЕЛІ, ЗМІНА
ЗАВАНТАЖЕНОЇ МОДЕЛІ

# Import necessary libraries

In [1]:
import torch
import torch.nn as nn
from torch.autograd import Variable
from torch.optim import Adam
import numpy as np

# Define the CRF class

In [2]:
class CRF(nn.Module):
    def __init__(self, n_dice, log_likelihood):
        super(CRF, self).__init__()
        self.n_states = n_dice
        self.transition = torch.nn.init.normal_(nn.Parameter(torch.randn(n_dice, n_dice + 1)), -1, 0.1)
        self.loglikelihood = log_likelihood

    def to_scalar(self, var):
        return var.view(-1).data.tolist()[0]

    def argmax(self, vec):
        _, idx = torch.max(vec, 1)
        return self.to_scalar(idx)

    def log_sum_exp(self, vec):
        max_score = vec[0, self.argmax(vec)]
        max_score_broadcast = max_score.view(1, -1).expand(1, vec.size()[1])
        return max_score + torch.log(torch.sum(torch.exp(vec - max_score_broadcast)))

    def _data_to_likelihood(self, rolls):
        return Variable(torch.FloatTensor(self.loglikelihood[rolls]), requires_grad=False)

    def _compute_likelihood_numerator(self, loglikelihoods, states):
        prev_state = self.n_states
        score = Variable(torch.Tensor([0]))
        for index, state in enumerate(states):
            score += self.transition[state, prev_state] + loglikelihoods[index, state]
            prev_state = state
        return score

    def _compute_likelihood_denominator(self, loglikelihoods):
        prev_alpha = self.transition[:, self.n_states] + loglikelihoods[0].view(1, -1)
        for roll in loglikelihoods[1:]:
            alpha_t = []
            for next_state in range(self.n_states):
                feature_function = self.transition[next_state, :self.n_states].view(1, -1) + \
                                   roll[next_state].view(1, -1).expand(1, self.n_states)
                alpha_t_next_state = prev_alpha + feature_function
                alpha_t.append(self.log_sum_exp(alpha_t_next_state))
            prev_alpha = torch.stack(alpha_t).view(1, -1)
        return self.log_sum_exp(prev_alpha)

    def _viterbi_algorithm(self, loglikelihoods):
        argmaxes = []
        prev_delta = self.transition[:, self.n_states].view(1, -1) + loglikelihoods[0].view(1, -1)
        for roll in loglikelihoods[1:]:
            local_argmaxes = []
            next_delta = []
            for next_state in range(self.n_states):
                feature_function = self.transition[next_state, :self.n_states].view(1, -1) + \
                                   roll.view(1, -1) + prev_delta
                most_likely_state = self.argmax(feature_function)
                score = feature_function[0][most_likely_state]
                next_delta.append(score)
                local_argmaxes.append(most_likely_state)
            prev_delta = torch.stack(next_delta).view(1, -1)
            argmaxes.append(local_argmaxes)
        final_state = self.argmax(prev_delta)
        final_score = prev_delta[0][final_state]
        path_list = [final_state]
        for states in reversed(argmaxes):
            final_state = states[final_state]
            path_list.append(final_state)
        return np.array(path_list), final_score

    def neg_log_likelihood(self, rolls, states):
        loglikelihoods = self._data_to_likelihood(rolls)
        states = torch.LongTensor(states)
        sequence_loglik = self._compute_likelihood_numerator(loglikelihoods, states)
        denominator = self._compute_likelihood_denominator(loglikelihoods)
        return denominator - sequence_loglik

    def forward(self, rolls):
        loglikelihoods = self._data_to_likelihood(rolls)
        return self._viterbi_algorithm(loglikelihoods)

# Define the training function

In [3]:
def crf_train_loop(model, rolls, targets, n_epochs, learning_rate=0.01):
    optimizer = Adam(model.parameters(), lr=learning_rate, weight_decay=1e-4)
    for epoch in range(n_epochs):
        batch_loss = []
        N = rolls.shape[0]
        model.zero_grad()
        for index, (roll, labels) in enumerate(zip(rolls, targets)):
            neg_log_likelihood = model.neg_log_likelihood(roll, labels)
            batch_loss.append(neg_log_likelihood)
            if index % 50 == 0:
                ll = torch.cat(batch_loss).mean()
                ll.backward()
                optimizer.step()
                batch_loss = []
    return model

# Generate synthetic data for training

In [4]:
fair_dice = np.array([1 / 6] * 6)
loaded_dice = np.array([0.04, 0.04, 0.04, 0.04, 0.04, 0.8])
probabilities = {'fair': fair_dice, 'loaded': loaded_dice}
transition_mat = {'fair': np.array([0.8, 0.2, 0.0]),
                  'loaded': np.array([0.35, 0.65, 0.0]),
                  'start': np.array([0.5, 0.5, 0.0])}
states = ['fair', 'loaded', 'start']
state2ix = {'fair': 0, 'loaded': 1, 'start': 2}
log_likelihood = np.hstack([np.log(fair_dice).reshape(-1, 1),
                            np.log(loaded_dice).reshape(-1, 1)])

def simulate_data(n_timesteps):
    data = np.zeros(n_timesteps)
    prev_state = 'start'
    state_list = np.zeros(n_timesteps)
    for n in range(n_timesteps):
        next_state = np.random.choice(states, p=transition_mat[prev_state])
        state_list[n] = state2ix[next_state]
        next_data = np.random.choice([0, 1, 2, 3, 4, 5], p=probabilities[next_state])
        data[n] = next_data
        prev_state = next_state
    return data, state_list

# Prepare training data

In [5]:
n_obs = 15
rolls = np.zeros((5000, n_obs)).astype(int)
targets = np.zeros((5000, n_obs)).astype(int)
for i in range(5000):
    data, dices = simulate_data(n_obs)
    rolls[i] = data.reshape(1, -1).astype(int)
    targets[i] = dices.reshape(1, -1).astype(int)

# Train the CRF model

In [6]:
model = CRF(2, log_likelihood)
model = crf_train_loop(model, rolls, targets, 10, 0.001)
torch.save(model.state_dict(), "./models/crf.hdf5")
model.load_state_dict(torch.load("./models/crf.hdf5"))

  model.load_state_dict(torch.load("./models/crf.hdf5"))


<All keys matched successfully>

# Test the trained model

In [7]:
data, dices = simulate_data(15)
test_rolls = data.reshape(1, -1).astype(int)
test_targets = dices.reshape(1, -1).astype(int)
print(test_rolls[0])
print(model.forward(test_rolls[0])[0])
print(test_targets[0])
print(np.exp(list(model.parameters())[0].data.numpy()))

[5 4 1 3 4 1 2 4 0 5 5 5 5 5 5]
[1 1 1 1 1 1 1 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 1 1 1 1 1 1]
[[0.6096541  0.18964128 0.33120134]
 [0.22679496 0.48702103 0.42212826]]


DNA solving

# Import necessary libraries

In [8]:
import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable

# Define transition probabilities and emission probabilities

In [9]:
transition_probabilities = {
    'H': {'H': 0.5, 'L': 0.5},
    'L': {'H': 0.4, 'L': 0.6}
}

emission_probabilities = {
    'H': {'A': 0.2, 'C': 0.3, 'G': 0.3, 'T': 0.2},
    'L': {'A': 0.3, 'C': 0.2, 'G': 0.2, 'T': 0.3}
}

states = ['H', 'L']
observations = ['G', 'C', 'A', 'C', 'T', 'G', 'A', 'A']

# Define the Viterbi algorithm for hidden state prediction

In [10]:
def viterbi_algorithm(obs, states, start_prob, trans_prob, emit_prob):
    n = len(obs)
    viterbi = np.zeros((len(states), n))
    path = np.zeros((len(states), n), dtype=int)

    for s in range(len(states)):
        viterbi[s, 0] = start_prob[states[s]] * emit_prob[states[s]][obs[0]]

    for t in range(1, n):
        for s in range(len(states)):
            max_prob, max_state = max(
                (viterbi[s_prev, t - 1] * trans_prob[states[s_prev]][states[s]] * emit_prob[states[s]][obs[t]], s_prev)
                for s_prev in range(len(states))
            )
            viterbi[s, t] = max_prob
            path[s, t] = max_state

    max_final_prob, max_final_state = max((viterbi[s, n - 1], s) for s in range(len(states)))

    optimal_path = [max_final_state]
    for t in range(n - 1, 0, -1):
        optimal_path.insert(0, path[optimal_path[0], t])

    return max_final_prob, [states[state] for state in optimal_path]

In [11]:
# Define start probabilities
start_probabilities = {'H': 0.5, 'L': 0.5}

# Convert observations to indices
observations_indices = observations

# Run the Viterbi algorithm
max_probability, hidden_states = viterbi_algorithm(
    observations_indices, states, start_probabilities, transition_probabilities, emission_probabilities
)

print("Most probable sequence of hidden states:", hidden_states)
print("Probability of the sequence:", max_probability)

Most probable sequence of hidden states: ['H', 'H', 'L', 'L', 'L', 'L', 'L', 'L']
Probability of the sequence: 2.834351999999999e-07


# Summary

Модель CRF показала ймовірності переходів між станами і на основі послідовності спрогнозувала, які стани (зміни чи сталість типу кубика) найімовірніші на кожному кроці. Результат демонструє високу відповідність моделі завданням, пов’язаним із залежностями між мітками. У випадку з ДНК, алгоритм Вітербі передбачив, що більша частина послідовності має стан "L" (низький вміст GC), з початковим високим вмістом GC ("H"). Хоча ймовірність результату низька, це може бути обумовлено природними обмеженнями вибірки та ймовірнісної моделі.


Контрольні запитання
1. У чому суть Conditional Random Fields?
2. Що являє собою алгоритм Viterbi?


1. CRF — це ймовірнісна модель для прогнозування послідовності міток, яка враховує залежність між сусідніми мітками. Вона дозволяє знаходити найбільш імовірну комбінацію міток для заданих даних.
2. Вітербі — це динамічний алгоритм для знаходження найімовірнішої послідовності прихованих станів у задачах із прихованими марковськими моделями (HMM). Він враховує ймовірності переходів і емісій для побудови оптимального шляху.