In [None]:
import os
import pandas as pd
import numpy as np
import sys
from pathlib import Path
 
from keras.models import Sequential
from keras.layers import Dense, Embedding
from keras.layers import SimpleRNN, GRU, LSTM, Bidirectional
from keras.layers import Conv1D, MaxPooling1D, Flatten
from keras.utils.np_utils import to_categorical
from keras.preprocessing.sequence import pad_sequences
from keras.callbacks import ModelCheckpoint, EarlyStopping
 
from sklearn.metrics import balanced_accuracy_score
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
 
sys.path.append(YOUR PATH)
import confusion_matrix_plot
import train_validation_plots
 
sys.path.append(YOUR PATH)
import encode_biological_sequence
 
# *******************************************************************
             
def cnn_one_layer(xtrainraw, ytrain, xtestraw, ytest, num_classes,
                  type_of_sequence, type_of_encoding, embedding_dimension,
                  num_epochs, batchsize,
                  conv_window_size, pooling_size,
                  verbosity, results_dir, class_names,
                  xprodraw, yprod):
     
    model_name = 'CNN_Conv_Window_Size_' + str(conv_window_size) + '_' + type_of_encoding.title()
     
    vocab_size = 50  # > number of possible unique words
    maximum_length = 0
    if type_of_encoding == 'embedding':
        # integer encode the peptides
                 
        peptides_integer_encoded_train = []
        for pep in xtrainraw:
            pie = encode_biological_sequence.encode_sequence(pep,
                  type_of_sequence, type_of_encoding)
            peptides_integer_encoded_train.append(pie)
         
        peptides_integer_encoded_test = []
        for pep in xtestraw:
            pie = encode_biological_sequence.encode_sequence(pep,
                  type_of_sequence, type_of_encoding)
            peptides_integer_encoded_test.append(pie)
         
        peptides_integer_encoded_prod = []
        for pep in xprodraw:
            pie = encode_biological_sequence.encode_sequence(pep,
                  type_of_sequence, type_of_encoding)
            peptides_integer_encoded_prod.append(pie)
                         
        # add padding so that all have the same length
        max_length_train = max(len(p) for p in peptides_integer_encoded_train)
        max_length_test = max(len(p) for p in peptides_integer_encoded_test)
        maximum_length = max(max_length_train, max_length_test)
        assert(vocab_size > maximum_length)
         
        xtrain_cnn = pad_sequences(peptides_integer_encoded_train, 
                                   maxlen=maximum_length, padding='post')
        xtest_cnn = pad_sequences(peptides_integer_encoded_test, 
                                  maxlen=maximum_length, padding='post')
        xprod_cnn = pad_sequences(peptides_integer_encoded_prod, 
                                  maxlen=maximum_length, padding='post')
    else:
        raise NameError
                   
    # create the cnn
    num_output_nodes = num_classes
    layer_1_units = 32
    model_cnn = Sequential()
    model_cnn.add(Embedding(vocab_size, embedding_dimension, 
                           input_length=maximum_length))
    model_cnn.add(Conv1D(filters=layer_1_units, kernel_size=conv_window_size, activation='relu'))
    model_cnn.add(MaxPooling1D(pool_size=pooling_size))
    model_cnn.add(Flatten())
    model_cnn.add(Dense(num_output_nodes, activation='softmax'))
    # compile the model
    model_cnn.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['acc'])
    # summarize the model
    print('\n** results for simple cnn:')
    print(model_cnn.summary())
    # fit the model
    h = model_cnn.fit(xtrain_cnn, ytrain, epochs=num_epochs, 
                            batch_size=batchsize,
                            validation_data=(xtest_cnn, ytest), 
                            verbose=verbosity)
     
    start_epoch=10
    # plot loss
    plot_title = model_name.replace('_',' ') + ' Loss'
    train_validation_plots.plot_train_validation(h.history['loss'], 'Training Loss',  
              h.history['val_loss'], 'Validation Loss',
              start_epoch,
              'Epochs', 'Loss',
              plot_title, results_dir, False)
     
    # plot accuracy
    plot_title = model_name.replace('_',' ') + ' Accuracy'
    train_validation_plots.plot_train_validation(h.history['acc'], 'Training Accuracy',  
              h.history['val_acc'], 'Validation Accuracy',
              start_epoch,
              'Epochs', 'Accuracy',
              plot_title, results_dir, False)
   
    y_test = np.argmax(ytest, axis=1)
    y_test_predict = np.argmax(model_cnn.predict(xtest_cnn), axis=1)
     
    y_prod = np.argmax(yprod, axis=1)
    y_prod_predict = np.argmax(model_cnn.predict(xprod_cnn), axis=1)
        
    # compute and print error metrics to model_name.txt
    stdout_default = sys.stdout
    sys.stdout = open(results_dir + model_name + '.txt','w')
    print(model_cnn.summary(),'\n***\n\n')
    # test metrics
    balanced_accuracy = balanced_accuracy_score(y_test, y_test_predict)
    accuracy = accuracy_score(y_test, y_test_predict, normalize=True)  # fractions
    print('test metrics: balanced accuracy =', balanced_accuracy)
    print('test metrics: accuracy =',accuracy)
    print('test classification report:\n',classification_report(y_test, y_test_predict))
     
    # production metrics
    balanced_accuracy = balanced_accuracy_score(y_prod, y_prod_predict)
    accuracy = accuracy_score(y_prod, y_prod_predict, normalize=True)  # fractions
    print('\n\nproduction metrics: balanced accuracy =', balanced_accuracy)
    print('production metrics: accuracy =',accuracy)
    print('production classification report:\n',classification_report(y_prod, y_prod_predict))
    sys.stdout = stdout_default
        
    # compute confusion matrices for production predictions
    confusion_matrix_plot.cm_plot(y_prod, y_prod_predict, class_names, results_dir,
                                  False, False, 30)  # raw
 
    confusion_matrix_plot.cm_plot(y_prod, y_prod_predict, class_names, results_dir,
                                  False, True, 30)  # normalized
     
# end of cnn_one_layer()
 
# *******************************************************************
             
def cnn_one_layer_check_early_stop_cv(xcalibraw, ycalib, num_classes,
                  type_of_sequence, type_of_encoding, embedding_dimension,
                  num_epochs, batchsize,
                  conv_window_size, pooling_size,
                  verbosity, results_dir, class_names,
                  xprodraw, yprod, stop_epochs, folds):
     
    model_name = 'CNN_1_Conv_Window_Size_' + str(conv_window_size) + '_' + type_of_encoding.title()
    
    vocab_size = 50  # > number of possible unique words
    maximum_length = 0
    if type_of_encoding == 'embedding':
        # integer encode the peptides               
        peptides_integer_encoded_calib = []
        for pep in xcalibraw:
            pie = encode_biological_sequence.encode_sequence(pep,
                  type_of_sequence, type_of_encoding)
            peptides_integer_encoded_calib.append(pie)
         
        peptides_integer_encoded_prod = []
        for pep in xprodraw:
            pie = encode_biological_sequence.encode_sequence(pep,
                  type_of_sequence, type_of_encoding)
            peptides_integer_encoded_prod.append(pie)
                        
        # add padding so that all have the same length
        maximum_length = max(len(p) for p in peptides_integer_encoded_calib)
        assert(vocab_size > maximum_length)
         
        xcalib_cnn = pad_sequences(peptides_integer_encoded_calib, 
                                   maxlen=maximum_length, padding='post')
        xprod_cnn = pad_sequences(peptides_integer_encoded_prod, 
                                  maxlen=maximum_length, padding='post')
    else:
        raise NameError
                 
    # set up the cnn
    metric = 'acc'
    model_file = model_name + '.h5'
    callbacks_list = [
            EarlyStopping(monitor='val_loss', patience=stop_epochs),
            ModelCheckpoint(filepath=model_file, monitor='val_loss', save_best_only=True)]
    num_output_nodes = num_classes
    layer_1_units = 32
     
    list_cv_metrics = []
    list_cv_losses = []
    list_cv_epochs = []
    cv_num = int(xcalib_cnn.shape[0]/folds)
    for fold in range(folds):
        # create the cnn
        model_cnn = Sequential()
        model_cnn.add(Embedding(vocab_size, embedding_dimension, 
                               input_length=maximum_length))
        model_cnn.add(Conv1D(filters=layer_1_units, kernel_size=conv_window_size, 
                             activation='relu'))
        model_cnn.add(MaxPooling1D(pool_size=pooling_size))
        model_cnn.add(Flatten())
        model_cnn.add(Dense(num_output_nodes, activation='softmax'))
        # compile the model
        model_cnn.compile(optimizer='adam', loss='categorical_crossentropy', 
                          metrics=[metric])
         
        # get train/valid
        x_train = np.vstack((xcalib_cnn[:cv_num*fold], xcalib_cnn[cv_num*(fold+1):]))
        y_train = np.vstack((ycalib[:cv_num*fold], ycalib[cv_num*(fold+1):]))
         
        x_valid = xcalib_cnn[cv_num*fold:cv_num*(fold+1)]
        y_valid = ycalib[cv_num*fold:cv_num*(fold+1)]
         
        # fit the model
        h = model_cnn.fit(x_train, y_train, 
                          epochs=num_epochs, 
                          batch_size=batchsize,
                          validation_data=(x_valid, y_valid), 
                          callbacks=callbacks_list,
                          verbose=verbosity)
         
        # collect cv stats
        cv_loss, cv_metric = model_cnn.evaluate(x_valid, y_valid, verbose=0)
        list_cv_metrics.append(cv_metric)
        list_cv_losses.append(cv_loss)
        list_cv_epochs.append(len(h.history['val_loss']))
            
    # redo the model with the maximum epochs from the cv and all calib data, no early stopping
    model_cnn_final = Sequential()
    model_cnn_final.add(Embedding(vocab_size, embedding_dimension, 
                           input_length=maximum_length))
    model_cnn_final.add(Conv1D(filters=layer_1_units, kernel_size=conv_window_size, 
                         activation='relu'))
    model_cnn_final.add(MaxPooling1D(pool_size=pooling_size))
    model_cnn_final.add(Flatten())
    model_cnn_final.add(Dense(num_output_nodes, activation='softmax'))
    # compile the model
    model_cnn_final.compile(optimizer='adam', loss='categorical_crossentropy', 
                            metrics=[metric])
    # summarize the model
    print('\n** final results for',model_name)
    # fit the model
    h = model_cnn_final.fit(xcalib_cnn, ycalib, 
                      epochs=np.max(list_cv_epochs), 
                      batch_size=batchsize,
                      verbose=verbosity)
         
    y_prod = np.argmax(yprod, axis=1)
    y_prod_predict = np.argmax(model_cnn_final.predict(xprod_cnn), axis=1)
         
    # compute and print error metrics to model_name.txt
    stdout_default = sys.stdout
    sys.stdout = open(results_dir + model_name + '.txt','w')
    print(model_cnn_final.summary(),'\n***\n\n')
    print('\ncross validation metrics:')
    for i in range(folds):
        print('fold:',i+1,'\terror:',list_cv_metrics[i],'\tloss:',list_cv_losses[i],
              '\tepochs:',list_cv_epochs[i])
         
    print('\nmax cross validation metric =',np.max(list_cv_metrics))
    print('min cross validation metric =',np.min(list_cv_metrics))
    print('mean cross validation error =',np.mean(list_cv_metrics))
     
    # production metrics
    balanced_accuracy = balanced_accuracy_score(y_prod, y_prod_predict)
    accuracy = accuracy_score(y_prod, y_prod_predict, normalize=True)  # fractions
    print('\n\nproduction metrics: balanced accuracy =', balanced_accuracy)
    print('production metrics: accuracy =',accuracy)
    print('production classification report:\n',classification_report(y_prod, y_prod_predict))
    sys.stdout = stdout_default
     
     
    # compute confusion matrices for production predictions
    confusion_matrix_plot.cm_plot(y_prod, y_prod_predict, class_names, results_dir,
                                  False, False, 30)  # raw
 
    confusion_matrix_plot.cm_plot(y_prod, y_prod_predict, class_names, results_dir,
                                  False, True, 30)  # normalized
     
# end of cnn_one_layer_check_early_stop_cv()
 
# *******************************************************************
 
if __name__ == '__main__':
     
    code_dir = YOUR PATH
    data_dir = code_dir + 'data/'
     
    df_train = pd.read_pickle(data_dir + 'df_train.pkl')
    df_test = pd.read_pickle(data_dir + 'df_test.pkl')
    df_production = pd.read_pickle(data_dir + 'df_production.pkl')
     
    number_of_classes = df_train['label_num'].value_counts().shape[0]
    sequence_type = 'amino_acid_20'
    encoding_type = 'embedding'
         
    dimension_of_embedding = 32
     
    epochs = 150
    batch_size = 128
    verbose = 0
     
    x_train = df_train['peptide'].values
    y_train = to_categorical(df_train['label_num'].values)
     
    x_test = df_test['peptide'].values
    y_test = to_categorical(df_test['label_num'].values)
     
    x_production = df_production['peptide'].values
    y_production = to_categorical(df_production['label_num'].values)
     
    df_calib = pd.concat([df_train, df_test])
    x_calib = df_calib['peptide'].values
    y_calib = to_categorical(df_calib['label_num'].values)
     
    class_labels = ['Non Binder','Weak Binder','Strong Binder']
     
    # cnn - single layer, no regularization, no cross valid, no early stopping
    pool_size = 3
    for convolution_window_size in [5,7]:
        results_directory = code_dir + 'cnn_1/conv_window_size_' + str(convolution_window_size) + '/'
        if not Path(results_directory).is_dir():
            os.mkdir(results_directory)
             
        cnn_one_layer(x_train, y_train, x_test, y_test, number_of_classes,
                      sequence_type, encoding_type, dimension_of_embedding,
                      epochs, batch_size, convolution_window_size, pool_size,
                      verbose, results_directory, class_labels,
                      x_production, y_production)
     
    # cnn - 1 layer, early stopping, checkpoint, cross valid, no regularization
    cv_folds = 5
    checkpoint_epochs_stop = 10
    pool_size = 3
    cnn_1_directory = code_dir + 'cnn_1_cv/'
    if not Path(cnn_1_directory).is_dir():
        os.mkdir(cnn_1_directory)
    for convolution_window_size in [5,7]:
        results_directory = cnn_1_directory + 'conv_window_size_' + str(convolution_window_size) + '/'
        if not Path(results_directory).is_dir():
            os.mkdir(results_directory)
                         
        cnn_one_layer_check_early_stop_cv(x_calib, y_calib, number_of_classes,
                      sequence_type, encoding_type, dimension_of_embedding,
                      epochs, batch_size, convolution_window_size, pool_size,
                      verbose, results_directory, class_labels,
                      x_production, y_production, checkpoint_epochs_stop, cv_folds)

In [1]:
#%% Loss and accuracy plotting code:

import matplotlib.pyplot as plt
 
def plot_train_validation(train_data, train_label,  
                          valid_data, valid_label,
                          start_num,
                          xlabel, ylabel,
                          title, directory, show_plot):
     
    plt.ioff()
    fig, ax = plt.subplots()
     
    plt.plot(train_data[start_num:], 'bo', label=train_label)
    plt.plot(valid_data[start_num:], 'ro', label=valid_label)
    ax.set_xlim(start_num, )
    plt.xlabel(xlabel, fontsize=15)
    plt.ylabel(ylabel, fontsize=15)
    plt.legend()
    plt.title(title, fontsize=15)
    plt.grid()
     
    plt.savefig(directory + title + '.png')
    plt.clf()
    plt.close('all')

#%% Confusion matrix plotting code:

import numpy as np
import matplotlib.pyplot as plt
 
from sklearn.metrics import confusion_matrix
 
def cm_plot(y_true, y_pred, class_names, plot_dir,
            show_plot=False, normalize=False, xangle=30):
     
    # set color map and precision (for normalized cm)
    cmap=plt.cm.Blues
    np.set_printoptions(precision=3)
     
    # Compute confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        title = 'Normalized Confusion Matrix'
    else:
        title = 'Confusion Matrix'
 
    plt.ioff()
    fig, ax = plt.subplots()
    im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
    ax.figure.colorbar(im, ax=ax)
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           xticklabels=class_names, yticklabels=class_names,
           title=title,
           ylabel='True Label',
           xlabel='Predicted Label')
 
    # Rotate the tick labels and set their alignment.
    plt.setp(ax.get_xticklabels(), rotation=xangle, ha="right",
             rotation_mode="anchor")
 
    # Loop over data dimensions and create text annotations.
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], fmt),
                    ha="center", va="center",
                    color="white" if cm[i, j] > thresh else "black")
    fig.tight_layout()
         
    #save plot
    plt.savefig(plot_dir + title + '.png')
    plt.clf()
    plt.close(fig)