In [None]:
#Joint work with Elias Mbarek which implements a convolutional network which takes an amino acid sequence as input and predicts whether
#a single amino acid mutation will result in a functional protein or not. It is baffling that this works at all given the arcane biochemistry
#underlying protein folding and the relative simplicity of a convolutional neural network.

from pandas import read_csv, DataFrame
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.calibration import CalibratedClassifierCV
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score
from sklearn.naive_bayes import GaussianNB
from sklearn.model_selection import GridSearchCV
from sklearn.dummy import DummyClassifier
from sklearn.metrics import f1_score
from sklearn import neighbors
from sklearn import preprocessing

from keras.models import Sequential, Model
from keras.layers import Dense, Dropout, Activation, Flatten, BatchNormalization, Input
from keras.layers.embeddings import Embedding
from keras.layers import Conv1D, MaxPooling1D, Add, ZeroPadding1D
from keras import backend as K
from keras.optimizers import SGD
from keras import regularizers
from keras.wrappers.scikit_learn import KerasClassifier
from keras.utils.generic_utils import get_custom_objects

from keras.preprocessing import text, sequence
from keras.preprocessing.text import Tokenizer

from imblearn.over_sampling import RandomOverSampler
from keras.utils import to_categorical, plot_model
from keras.callbacks import ModelCheckpoint
from keras.models import load_model


data = read_csv("train.csv")

sequences = data["Sequence"]

test_data = read_csv('test.csv')
test_sequences = test_data["Sequence"]

def downsampling(X, Y, Xtest = None, Ytest = None):

    pos_idx = np.where(Y == 1)[0]
    neg_idx = np.where(Y == 0)[0]
    tot_pos = (len(pos_idx))
    tot_neg = (len(neg_idx))

    down_idx = np.random.choice(np.arange(tot_neg), tot_pos, replace = False)
    down_train_idx =np.concatenate((pos_idx, neg_idx[down_idx]))


    X_down, Y_down = X[down_train_idx], Y[down_train_idx]

    return X_down, Y_down
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

class Swish(Activation):
    
    def __init__(self, activation, **kwargs):
        super(Swish, self).__init__(activation, **kwargs)
        self.__name__ = 'swish'

def swish (x, beta =2.5):
    return(x*K.sigmoid(beta*x))


get_custom_objects().update({'swish':Swish(swish)})
tokenizer = Tokenizer(char_level =True)
tokenizer.fit_on_texts(sequences)
tokenizer.fit_on_texts(test_sequences)
	
#print(amino_sites)
X_test_prediction = np.array(tokenizer.texts_to_sequences(test_sequences))
X= np.array(tokenizer.texts_to_sequences(sequences))
Y = np.genfromtxt("train.csv", delimiter =',')[1:, 1]


X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size =0.2, random_state = 42)
print(y_train.shape)

oversample =RandomOverSampler(sampling_strategy = 'minority')
X_train, y_train = oversample.fit_resample(X_train, y_train)
X_train2, y_train2 = X_train, y_train
X_train, y_train = downsampling(X_train, y_train)

X_train, X_test, X_test_prediction, X_train2 = to_categorical(X_train), to_categorical(X_test), to_categorical(X_test_prediction), to_categorical(X_train2)

dependencies = {
    'f1_m' : f1_m
}

visible = Input(shape=(4, 21))
conv1 = Conv1D(64, kernel_size=2, dilation_rate =2, strides=1, activation='swish')(visible) 
conv2 = Conv1D(64, kernel_size=2, dilation_rate = 1, strides=2,activation='swish')(conv1)
#conv3 = Conv1D(16, kernel_size=1, dilation_rate =1, strides=1, activation='swish')(conv2)
bat1 = BatchNormalization()(conv2)
#zero1=MaxPooling1D(pool_size=)(bat1)
zero1 = ZeroPadding1D(padding=1)(bat1)
'''
conv3 = Conv1D(16, kernel_size=1, dilation_rate =1, strides=1, activation='swish')(zero1) 
conv4 = Conv1D(16, kernel_size=1, dilation_rate = 1, strides=1,activation='swish')(conv3)
bat2 = BatchNormalization()(conv4)
pool1 = MaxPooling1D(pool_size= 1)(bat2)
'''
flat = Flatten()(zero1)
hidden1 = Dense(32, activation='swish')(flat)

output = Dense(1, activation='sigmoid')(hidden1)

mc = ModelCheckpoint('best_model.h5', monitor='val_f1_m', mode = 'max', verbose = 1, save_best_only = True)

model =Model(inputs = visible, outputs = output)
opt = SGD(lr=0.1, momentum=0.9)
model.compile(loss = 'binary_crossentropy', optimizer = 'adam', metrics =[f1_m])

#fit Keras model on dataset
weights={0:1, 1:0.07} #performed best on model (0.047)
model.fit(X_train, y_train, class_weight = weights, epochs = 70, batch_size = 500, validation_data = (X_test, y_test), callbacks = [mc])

saved_model =load_model('best_model.h5', custom_objects = dependencies)

score = saved_model.evaluate(X_test, y_test)
print('')
print('Test loss:', score[0])
print('Test f1_score:', score[1])


active = saved_model.predict(X_test_prediction)
ones = np.where(active > 0.5)
df = DataFrame(active)
df.loc[:] = 0
df.loc[ones] = 1
df.to_csv('prediction_active_final8.csv', index=False)

