In [31]:
import torch
import numpy as np

# Data Engineering

Suppose we are given amino acid sequence s and class label sequence c. Let n be the length of both sequences. 
For simplicity, we assume there are 20 amino acids. We create a bijection mapping from each amino acid to [1, 20]. We map each amino acid sequence s to a n sized vector $s'\in R^n$, where the $i^{th}$ position is its corresponding numerical mapping from its amino acid.

In a similar fashion, we perform the same process for secondary structures. We assume there are two types of secondary structures alpha helix and beta sheet and optionally no secondary structure. We map each class label sequence c to a n sized vector $c' \in R^n$, where the $i^{th}$ position is its corresponding numerical mapping from its class label.

In [126]:
def aa_to_vec(s):
    amino_acids = list("AGILPVFWYDERHKSTCMNQ")
    amino_acids.sort()
    aa_mapping = {}
    for i, aa in enumerate(amino_acids):
        aa_mapping[aa] = i
    return list(map(lambda aa: aa_mapping[aa], list(s)))
        

def ss_to_vec(c):
    #TODO
    return list(map(lambda lbl: int(lbl) - 1, list(c)))

# shorthand aliases
aa = lambda s: aa_to_vec(s)
ss = lambda c: ss_to_vec(c)

# Model Parameterization

For notion, let $\theta$ := hidden markov model parameters (state transition probabilities, symbol emission probabilities), and let $\phi$ := class emission parameters.

Our objective function attempts to find maximize the conditionally probability of obtaining class label sequence c given amino acid sequence s, hidden markov parameters $\theta$, class emission parameters $\phi$.

The number of hidden states usually requires some expert insights. Here, we adopt the hidden markov model setup introduced in assignment two - which includes two hidden states A and B. Then, we have a 4 state transisition probabilities $(t_{aa}, t_{ab}, t_{ba}, t_{bb})$, 20 symbol emission probabilties for state A $(e_{a1}, ... , e_{a20})$, 20 symbol emission probabilties for state B $(e_{b1}, ... , e_{b20})$. Then, we have 44 parameters total for $\theta$.


In this setup, we also have 3 class emission probabilities for state A $(\phi_{a1}, \phi_{a2} , \phi_{a3})$, 3 class emission probabilities for state B $(\phi_{b1}, \phi_{b2} , \phi_{b3})$. 

For simplicity, we initalize these variables to a uniform distribution.

In [127]:
class HiddenMarkovModelParameters:
    
    
    def __init__(self, theta, phi=None):
        self.update_theta(theta)
        self.class_emiss_probs = np.ones((2, 3)) / 3
        if phi is None:
            self.phi = np.ones((2, 3)) / 3  # Uniform initialization if phi not provided
        else:
            self.update_phi(phi)
        
        
    def update_theta(self, new_theta):
        # Update transition probabilities
        self.trans_probs = np.array(new_theta[:4]).reshape(2, 2)

        # Update emission probabilities for state A and state B
        self.emiss_probs = np.zeros((2, 20))  # Assuming 20 emissions for each state
        self.emiss_probs[0, :] = new_theta[4:24]
        self.emiss_probs[1, :] = new_theta[24:] 
    
    
    def update_phi(self, new_phi):
        self.phi = np.array(new_phi).reshape(2, 3)
        
        
    def get_theta(self):
        return np.concatenate((self.trans_probs.flatten(), self.emiss_probs.flatten()))
    
    
    def get_phi(self):
        return self.phi.flatten()    

# Gradient Calculations

We can convert our objective function into a minimization problem by changing our objective into minimizing the negative log likelihood.

$$\hat{\theta} = \arg \max_{\theta} P(c | s, \theta, \phi) = \frac{P(c, s | \theta, \phi)}{P(s | \theta)}$$
$$\hat{\theta} = \arg \min_{\theta} -\log(\frac{P(c, s | \theta, \phi)}{P(s | \theta)})$$

We can calculate the gradient of this expression into terms that we know from forward-backward algorithms.

$$\frac{dL}{d\theta_k} = -\frac {m_k(c, s) - n_k(s)}{\theta_k}$$

$$n_k(s) := idk$$

$$m_k(c, s) := idk$$

In [136]:
class HiddenMarkovModel(HiddenMarkovModelParameters):
    
    
    def __init__(self, theta, phi=None):
        super().__init__(theta, phi)
    
    
    def log_sum_exp(self, arr):
        # Log-sum-exp trick for numerical stability in log-space
        max_val = np.max(arr)
        return max_val + np.log(np.sum(np.exp(arr - max_val)))
    
    
    def calculate_theta_gradient(self, mk, nk):
        if mk is None or nk is None:
            raise ValueError("mk and nk must be provided and have the correct shape.")

        theta_flat = np.concatenate((self.trans_probs.flatten(), self.emiss_probs.flatten()))
        theta_flat = np.clip(theta_flat, 1e-10, None)
        gradient = -(mk - nk) / theta_flat

        return gradient
    
    
    def calculate_phi_gradient(self, q_k):
        # Calculate gradient for phi using -q_k(c, s) / phi_k
        # Ensure no division by zero
        phi_k_nonzero = np.clip(self.phi, 1e-10, None)
        phi_gradient = -q_k / phi_k_nonzero
        return phi_gradient
    
    
    def forward_backward(self, obs, class_labels=None):
        num_states = 2
        num_observations = len(obs)

        forward = np.zeros((num_states, num_observations))
        backward = np.zeros((num_states, num_observations))
        n_k = np.zeros(4 + 2 * 20)  # Transition + Emission probabilities
        m_k = np.zeros_like(n_k)    # Expected usage of model parameters given class labels
        q_k = np.zeros((2, 3))      # Expected usage of class emission parameters

        # Forward Pass
        forward[:, 0] = self.emiss_probs[:, obs[0]] / num_states
        for t in range(1, num_observations):
            for s in range(num_states):
                forward[s, t] = 0
                for prev_s in range(num_states):
                    forward_prob = forward[prev_s, t-1] * self.trans_probs[prev_s, s] * self.emiss_probs[s, obs[t]]
                    forward[s, t] += forward_prob

                    # Update n_k for transition and emission probabilities
                    n_k_index = prev_s * num_states + s
                    n_k[n_k_index] += forward_prob
                    n_k[4 + s * 20 + obs[t]] += forward_prob

                    # Update m_k if class_labels are provided
                    if class_labels is not None and class_labels[t] == s:
                        m_k[n_k_index] += forward_prob
                        m_k[4 + s * 20 + obs[t]] += forward_prob

                # Update q_k for class emission probabilities
                if class_labels is not None:
                    q_k[s, class_labels[t]] += forward[s, t]

        # Backward Pass
        backward[:, -1] = 1
        for t in range(num_observations - 2, -1, -1):
            for s in range(num_states):
                backward[s, t] = sum(backward[next_s, t+1] * self.trans_probs[s, next_s] * self.emiss_probs[next_s, obs[t+1]] for next_s in range(num_states))

        # Normalize n_k, m_k, and q_k by the total number of observations
        n_k /= num_observations
        m_k /= num_observations
        q_k /= num_observations

        # Update class emission probabilities
        for state in range(num_states):
            self.phi[state] = q_k[state] / q_k[state].sum()

        return forward, backward, n_k, m_k, q_k
    
    
    def predict_class_sequence(self, obs_sequence):
        num_states = self.trans_probs.shape[0]
        num_observations = len(obs_sequence)

        # Initialize Viterbi matrices
        viterbi = np.zeros((num_states, num_observations))
        backpointer = np.zeros((num_states, num_observations), dtype=int)

        # Initialization step
        viterbi[:, 0] = self.emiss_probs[:, obs_sequence[0]] * self.phi[:, 0]

        # Viterbi algorithm - Dynamic Programming
        for t in range(1, num_observations):
            for s in range(num_states):
                transition_probs = viterbi[:, t-1] * self.trans_probs[:, s]
                max_prob = np.max(transition_probs)
                backpointer[s, t] = np.argmax(transition_probs)

                # Incorporate emission probabilities and class probabilities
                viterbi[s, t] = max_prob * self.emiss_probs[s, obs_sequence[t]] * self.phi[s, 0]

        # Backtracking to find the most probable path and class labels
        most_probable_path = np.zeros(num_observations, dtype=int)
        most_probable_classes = np.zeros(num_observations, dtype=int)
        most_probable_path[-1] = np.argmax(viterbi[:, -1])

        for t in range(num_observations - 2, -1, -1):
            most_probable_path[t] = backpointer[most_probable_path[t+1], t+1]
            most_probable_classes[t] = np.argmax(self.phi[most_probable_path[t], :])

        return most_probable_path, most_probable_classes
    
    
    def gradient_descent(self, observations_set, class_labels_set, learning_rate=0.01, iterations=100, convergence_threshold=1e-6):
        prev_theta = self.get_theta()
        prev_phi = self.phi.copy()

        for iteration in range(iterations):
            total_theta_gradient = np.zeros_like(prev_theta)
            total_phi_gradient = np.zeros_like(prev_phi)

            for obs, class_labels in zip(observations_set, class_labels_set):
                # Ensure the class label sequence matches the observation sequence in length
                if len(obs) != len(class_labels):
                    raise ValueError("Length of observation sequence and class label sequence must match.")

                _, _, nk, mk, q_k = self.forward_backward(obs, class_labels)
                theta_gradient = self.calculate_theta_gradient(mk, nk)
                phi_gradient = self.calculate_phi_gradient(q_k)

                total_theta_gradient += theta_gradient
                total_phi_gradient += phi_gradient

            # Average the gradients across all sequences
            avg_theta_gradient = total_theta_gradient / len(observations_set)
            avg_phi_gradient = total_phi_gradient / len(observations_set)

            # Update theta and phi in the direction of the negative average gradients
            updated_theta = prev_theta - learning_rate * avg_theta_gradient
            updated_phi = prev_phi - learning_rate * avg_phi_gradient

            # Constrain updated parameters
            updated_theta[:4] = np.clip(updated_theta[:4], 0, 1)
            updated_phi = np.clip(updated_phi, 0, 1)

            if np.linalg.norm(updated_theta - prev_theta) < convergence_threshold and \
               np.linalg.norm(updated_phi - prev_phi) < convergence_threshold:
                break

            prev_theta = updated_theta
            prev_phi = updated_phi

        self.update_theta(updated_theta)
        self.update_phi(updated_phi)
    



# Gradient Descent

We use gradient descent to minimize our objective function.

We repeat the following operation until convergence. $\theta'=\theta - \alpha \nabla L$. For simplicity, we fix our step size $\alpha$.

In [137]:
class HiddenMarkovModelWithGradientDescent(HiddenMarkovModel):
    
    
    def __init__(self, theta, phi=None):
        super().__init__(theta, phi)
    
    
    def gradient_descent(self, observations_set, class_labels_set, learning_rate=0.01, iterations=100, convergence_threshold=1e-6):
        prev_theta = self.get_theta()
        prev_phi = self.phi.copy()

        for iteration in range(iterations):
            total_theta_gradient = np.zeros_like(prev_theta)
            total_phi_gradient = np.zeros_like(prev_phi)

            for obs, class_labels in zip(observations_set, class_labels_set):
                # Ensure the class label sequence matches the observation sequence in length
                if len(obs) != len(class_labels):
                    raise ValueError("Length of observation sequence and class label sequence must match.")

                _, _, nk, mk, q_k = self.forward_backward(obs, class_labels)
                theta_gradient = self.calculate_theta_gradient(mk, nk)
                phi_gradient = self.calculate_phi_gradient(q_k)

                total_theta_gradient += theta_gradient
                total_phi_gradient += phi_gradient

            # Average the gradients across all sequences
            avg_theta_gradient = total_theta_gradient / len(observations_set)
            avg_phi_gradient = total_phi_gradient / len(observations_set)

            # Update theta and phi in the direction of the negative average gradients
            updated_theta = prev_theta - learning_rate * avg_theta_gradient
            updated_phi = prev_phi - learning_rate * avg_phi_gradient

            # Constrain updated parameters
            updated_theta[:4] = np.clip(updated_theta[:4], 0, 1)
            updated_phi = np.clip(updated_phi, 0, 1)

            if np.linalg.norm(updated_theta - prev_theta) < convergence_threshold and \
               np.linalg.norm(updated_phi - prev_phi) < convergence_threshold:
                break

            prev_theta = updated_theta
            prev_phi = updated_phi

        self.update_theta(updated_theta)
        self.update_phi(updated_phi)

        

# Workflow on Human Data

In [139]:
with open("HUMAN_training_data.txt") as f:
    content = f.read().split("\n")
    observations_set = []
    class_labels_set = []
    for i in range(0, len(content), 2):
        observations_set.append(aa(content[i]))
    for i in range(1, len(content), 2):
        class_labels_set.append(ss(content[i]))
    
    # Initialize transition probabilities uniformly
    transition_probs = [0.5, 0.5, 0.5, 0.5]  # 2 states, so 2x2 transition matrix, each entry is 0.5

    # Initialize emission probabilities uniformly
    emission_probs = [1.0 / 20] * 40  # 20 emissions for each of the 2 states, each probability is 1/20

    # Combine to form initial theta
    theta_initial = transition_probs + emission_probs

    # Initialize class emission probabilities uniformly
    phi_initial = [[1.0 / 3, 1.0 / 3, 1.0 / 3],  # State 1
                   [1.0 / 3, 1.0 / 3, 1.0 / 3]]  # State 2

    
    # Initialize the model
    hmm = HiddenMarkovModelWithGradientDescent(theta_initial, phi_initial)

    
    # Set learning rate, number of iterations, and convergence threshold
    learning_rate = 0.01
    iterations = 100
    convergence_threshold = 1e-4

    # Run gradient descent
    hmm.gradient_descent(observations_set, class_labels_set, learning_rate, iterations, convergence_threshold)

    
    obs_sequence = aa("MENFQKVEKIGEGTYGVVYKARNKLTGEVVALKKIRLDTETEGVPSTAIREISLLKELNHPNIVKLLDVIHTENKLYLVFEFLHQDLKKFMDASALTGIPLPLIKSYLFQLLQGLAFCHSHRVLHRDLKPQNLLINTEGAIKLADFGLARAFGVPVRTYTHEVVTLWYRAPEILLGCKYYSTAVDIWSLGCIFAEMVTRRALFPGDSEIDQLFRIFRTLGTPDEVVWPGVTSMPDYKPSFPKWARQDFSKVVPPLDEDGRSLLSQMLHYDPNKRISAKAALAHPFFQDVTKPVPHLRL")  # Example sequence
    state_sequence, class_sequence = hmm.predict_class_sequence(obs_sequence)
    print("Most probable class sequence:", class_sequence)


Most probable state sequence: [0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0]
Most probable class sequence: [2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2

# Validation Results

Since our dataset if limited, we approximate our validation error using k-cross-folds validation in particular we use LOOCV (leave-one-out-cross-validation). 

In [None]:
with open("HUMAN_training_data.txt") as f:
    content = f.read().split("\n")
    s_seqs = []
    c_seqs = []
    for i in range(0, len(content), 2):
        s_seqs.append(aa(content[i]))
    for i in range(1, len(content), 2):
        c_seqs.append(ss(content[i]))
    
    human_train = zip(s_seqs, c_seqs)
    
    
        
        
        