In [39]:
import argparse
import os
from itertools import groupby
from hmmlearn import hmm
import gzip as gz
import pandas as pd
import numpy as np
import pickle
import plotly.express as px
import plotly.graph_objects as go

In [2]:
NUM_OF_STATS = 3
STATES = {"H": 0, "S": 1, "L": 2}
AMINO_ACIDS = {"A": 0, "R": 1, "N": 2, "D": 3, "C": 4, "Q": 5, "E": 6, "G": 7, "H": 8, "I": 9,
               "L": 10, "K": 11, "M": 12, "F": 13, "P": 14, "S": 15, "T": 16, "W": 17,
               "Y": 18, "V": 19, "B": 20, "Z": 21, "X": 22, "J": 23, "O": 24, "U": 25, }
BAD_AMINO_ACIDS = "OUBZXJ"
EMISSIONS = pd.read_csv("emissions-Table 1.csv", index_col=0).T.to_numpy() / 100
TRANSITIONS = np.array([[12 / 13, 1 / 39, 2 / 39], [1 / 15, 4 / 5, 2 / 15], [0.4, 0.2, 0.4]])
PATH_TO_DATA = "./parsed_sequences" # Local

In [3]:
def train(train_samples, train_lengths, convergence_threshold: float = 0.01, num_iters: int = 80):
    # Create HMM model
    model = hmm.CategoricalHMM(n_components=NUM_OF_STATS,
                               algorithm='viterbi',
                               n_iter=num_iters,
                               tol=convergence_threshold,
                               verbose=True,
                               params="et",
                               init_params="s")

    # Config emissions
    model.emissionprob_ = EMISSIONS
    model.transmat_ = TRANSITIONS
    # Train th model
    model.fit(np.concatenate(train_samples).reshape((-1, 1)), train_lengths)

    return model

In [4]:
def split_train_test(dir_path: str):
    train_samples, train_lengths = [], []
    test_samples, test_lengths, test_labels = [], [], []

    for file in os.listdir(dir_path):
        flag = np.random.binomial(1, 0.8, 1)[0]
        file = f"{dir_path}/{file}"
        with open(file, 'r') as f:
            content = f.readlines()
            seq, labels = content[0], content[1]
            numeric_seq, numeric_labels = [], []
            for s, l in zip(seq, labels):
                if s not in BAD_AMINO_ACIDS:
                    numeric_seq += [AMINO_ACIDS[s]]
                    numeric_labels += [STATES[l]]
            if flag:
                train_samples.append(np.array(numeric_seq).reshape(-1, 1))
                train_lengths.append(len(numeric_seq))
            else:
                test_samples.append(np.array(numeric_seq).reshape(-1, 1))
                test_lengths.append(len(numeric_seq))
                test_labels.append(numeric_labels)

    return train_samples, train_lengths, test_samples, test_lengths, test_labels

In [7]:
def get_test_seqs(dir_path: str):
    test_samples, test_lengths, test_labels = [], [], []

    for file in os.listdir(dir_path):
        file = f"{dir_path}/{file}"
        with open(file, 'r') as f:
            content = f.readlines()
            seq, labels = content[0], content[1]
            numeric_seq, numeric_labels = [], []
            for s, l in zip(seq, labels):
                if s not in BAD_AMINO_ACIDS:
                    numeric_seq += [AMINO_ACIDS[s]]
                    numeric_labels += [STATES[l]]

            test_samples.append(np.array(numeric_seq).reshape(-1, 1))
            test_lengths.append(len(numeric_seq))
            test_labels.append(numeric_labels)

    return test_samples, test_lengths, test_labels


In [49]:
train_samples, train_lengths, test_samples, test_lengths, test_labels = split_train_test(PATH_TO_DATA)
# train_samples = pickle.load(open(f"./models/train_samples_expresive", 'rb'))
# train_lengths = pickle.load(open(f"./models/train_lengths_expresive", 'rb'))
# test_samples = pickle.load(open(f"./models/test_samples_expresive", 'rb'))
# test_lengths = pickle.load(open(f"./models/test_lengths_expresive", 'rb'))
# test_labels = pickle.load(open(f"./models/test_labels_expresive", 'rb'))
model = train(train_samples, train_lengths, num_iters=100)
# Create pickle file
if not os.path.exists("models"):
    os.mkdir("models")
with open(f"models/model_bar_100", 'wb') as file:
    pickle.dump(model, file)
# model = pickle.load(open(f"./models/model_100_expresive_2", 'rb'))
# model = pickle.load(open(f"./models/model_bar_67", 'rb'))


         1    -2860953.3006             +nan
         2    -2846211.4135      +14741.8871
         3    -2845630.8928        +580.5207
         4    -2845259.6472        +371.2456
         5    -2844989.4130        +270.2341
         6    -2844777.0843        +212.3287
         7    -2844604.2301        +172.8542
         8    -2844461.9117        +142.3185
         9    -2844344.6194        +117.2922
        10    -2844248.0093         +96.6101
        11    -2844168.2498         +79.7595
        12    -2844101.9569         +66.2929
        13    -2844046.2567         +55.7003
        14    -2843998.8101         +47.4466
        15    -2843957.7776         +41.0325
        16    -2843921.7448         +36.0328
        17    -2843889.6375         +32.1074
        18    -2843860.6427         +28.9947
        19    -2843834.1441         +26.4987
        20    -2843809.6707         +24.4734
        21    -2843786.8599         +22.8107
        22    -2843765.4299         +21.4301
        23

# General Error

In [50]:
misclassification_error = 0
for sample, length, label in zip(test_samples, test_lengths, test_labels):
    ll, hidden_states = model.decode(sample, length)
    # hidden_states = np.digitize(hidden_states, bins=[0, 13, 18, 22]) - 1
    misclassification_error += np.count_nonzero(hidden_states - label)
# Save misclassification error
misclassification_error_rate = misclassification_error / np.sum(test_lengths)
print(f"Error rate: {misclassification_error_rate}")

Error rate: 0.593356069419433


In [48]:
model_name = ["Baseline", "Expressive"]
errors = [0.593356069419433, 0.6375903846490537]
total_performence_bar = px.bar(x=model_name, y=errors)
total_performence_bar.update_layout(xaxis_title="Model Type", yaxis_title="Error rate")
total_performence_bar.show()


# Analysis of Three protein Families

In [36]:
# Predict
dir_names = []
errors = []
for dir_name in os.listdir("families"):
    d = os.path.join("families", dir_name)
    if not os.path.isdir(d):
        continue
    dir_names.append(dir_name)
    misclassification_error = 0
    test_samples, test_lengths, test_labels = get_test_seqs(d)
    for sample, length, label in zip(test_samples, test_lengths, test_labels):
        ll, hidden_states = model.decode(sample, length)
        hidden_states = np.digitize(hidden_states, bins=[0, 13, 18, 22]) - 1
        misclassification_error += np.count_nonzero(hidden_states - label)

    misclassification_error_rate = misclassification_error / np.sum(test_lengths)
    errors.append(misclassification_error_rate)

In [33]:
simple_errors = errors

In [37]:
print(dir_names)
print(errors)

['dna_binding', 'antibody', 'membrane']
[0.6689141781466587, 0.7655156561270879, 0.5275201887306052]


In [38]:
bar_fig = px.bar(x=dir_names, y=[simple_errors, errors], title="Error Rate per Type of Protein , Expressive model")
bar_fig.update_layout(xaxis_title="Protein Family", yaxis_title="Error rate")
bar_fig.show()
if not os.path.exists("plots"):
    os.mkdir("plots")
bar_fig.write_image("plots/secondary_type.png")

In [42]:
fig = go.Figure(data=[
    go.Bar(name='Baseline Model', x=dir_names, y=simple_errors),
    go.Bar(name='Expressive Model', x=dir_names, y=errors)
])
# Change the bar mode
fig.update_layout(barmode='group', xaxis_title="Protein Family", yaxis_title="Error rate", title="Comparison Between Expressive and Baseline models Performence on Protein Families" )
fig.show()

In [53]:
expressive_model = pickle.load(open(f"./models/model_100_expresive_2", 'rb'))

In [59]:
np.savetxt('output.csv',expressive_model.transmat_,delimiter=",")