In [160]:
# data loading
import pandas as pd

# math
import numpy as np

# plot
import matplotlib.pyplot as plt

# ml
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dense, SimpleRNN, Bidirectional
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier

# ml and data handling
from sklearn.utils import shuffle
from sklearn.model_selection import KFold
from sklearn.model_selection import GridSearchCV

def creat_seq_map(vfile_path):
    """
    (str) -> dict
    
    read csv file for sequence one-hot encoding and output mapping dictionary.
    """
    vfile = pd.read_csv(vfile_path, index_col=None, skiprows=0)
    return {vfile["One-Letter Code"][i]:i for i in range(len(vfile["One-Letter Code"]))}

def seq2vec(string, outlen, vocab):
    """
    (str, int, dict) -> list(list(int))
    
    precondition: len(string) <= outlen
    
    given dictionary, one-hot encode all given sequences.
    if the length of string is smaller than outlen, it will be padded with zero to make the langth the same.
    ---
    string: input string (length smaller than outlen)
    outlen: output length of vecotr.
    vocab: the dictionary for the encoding.s
    """   
    vector = [vocab[amino_acid] for amino_acid in string]
    vector = np.pad(vector, (0,outlen-len(vector)), constant_values=20)
    return np.array(vector)

def label_data(data):
    """
    (pd.DataFrame) -> pd.DataFrame
    
    add extra label column based on their scores (D, E).
    1 if D+E > 0
    0 if D+E <= 0
    """   
    data.loc[(data["binding score"] + data["digest score"] > 0), "label"] = 0
    data.loc[(data["binding score"] + data["digest score"] <= 0), "label"] = 1   

    return data

def screen_data(data, threshold):
    """
    (pd.DataFrame, int) -> (pd.DataFrame)
    
    screen over a given data frame according to its two scores (D, E).
    mask is a boolean array based on,
    True if abs(D+E) >= threshold
    False if abs(D+E) < threshold
    """
    
    mask = abs(data["digest score"] + data["binding score"]) >= threshold
    return data[mask]

def create_model(rnn_layers, rnn_units, fc_layers, fc_units, fc_activations, optimizer):
    """
    (int, list(int), int, list(int), list(str), int, int) -> tf.keras.model
    
    build model for learning session.
    """
    assert rnn_layers == len(rnn_units)
    assert fc_layers == len(fc_units)
    assert fc_layers == len(fc_activations)
    
    model = Sequential()
    
    # build recurent layers
    for i in range(rnn_layers):
        if i != rnn_layers-1:
            model.add(Bidirectional(SimpleRNN(units=rnn_units[i], input_shape=(Tx, nx), return_sequences=True)))
        else:
            model.add(Bidirectional(SimpleRNN(units=rnn_units[i], input_shape=(Tx, nx))))
        
    # build fully connected layer
    for i in range(fc_layers):
        model.add(Dense(units=fc_units[i], activation=fc_activations[i]))
        
    model.compile(loss='sparse_categorical_crossentropy', metrics=['accuracy'], optimizer=optimizer)
    
    return model

In [117]:
# load seq data
data = pd.read_csv("family_101F.csv")

# exclude points according to sum of scores whithin threshold
data = screen_data(data, 1.0)

# label data according to sum of scores bigger or smaller than 0
data = label_data(data)

# shuffle the data set
data = shuffle(data, random_state=0)

In [118]:
# create dictionary for seq mapping
vocab = creat_seq_map("vocab.csv")

# change vector to one-hot encoding
m = len(data)
Tx = data["len"].max()
nx = len(vocab)
X = np.array([to_categorical(seq2vec(data["seq"][i], Tx, vocab)) for i in data.index])

In [202]:
# sanity check of output shape
assert [m, Tx, nx] == list(X.shape)
print(X.shape)

(19215, 84, 21)


In [142]:
# 80-20 split of data set
X_train = X[:len(X)*8//10]
Y_train = data["label"].iloc[:len(X)*8//10].values

X_test = X[len(X)*8//10:]
Y_test = data["label"].iloc[len(X)*8//10:].values

In [193]:
# define all the hyper parameters for grid search
rnn_layers = [2]
rnn_units = [(16, 16)]

fc_layers = [2]
fc_units = [(16, 2)]
fc_activations = [("relu", "softmax")]

optimizer = ['SGD', 'Adam']

param_grid = dict(rnn_layers = rnn_layers, 
                  rnn_units = rnn_units, 
                  fc_layers = fc_layers, 
                  fc_units = fc_units, 
                  fc_activations = fc_activations,
                  optimizer = optimizer)

In [201]:
param_grid

{'rnn_layers': [2],
 'rnn_units': [(16, 16)],
 'fc_layers': [2],
 'fc_units': [(16, 2)],
 'fc_activations': [('relu', 'softmax')],
 'optimizer': ['SGD', 'Adam']}

In [195]:
# apply grid search with 5 fold cross validation
model = KerasClassifier(build_fn = create_model, epochs = 1, batch_size = 320, verbose=0)
grid = GridSearchCV(estimator = model, param_grid = param_grid, n_jobs = -1, cv = 5)
grid_result = grid.fit(X_train, Y_train)

In [200]:
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_['optimizer']))

means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']

for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param['optimizer']))

Best: 0.698803 using Adam
0.608379 (0.001171) with: 'SGD'
0.698803 (0.007416) with: 'Adam'
