In [None]:
import numpy as np
import json
import os
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"   # see issue #152
os.environ["CUDA_VISIBLE_DEVICES"]="0"

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import *
from tensorflow.keras.utils import to_categorical
import tensorflow.keras.backend as K
from tensorflow import keras

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
from tqdm.notebook import tqdm

In [None]:
def map_to_digit(string):
    if string == 'A':
        return 1
    elif string == 'T':
        return 2
    elif string == 'G':
        return 3
    elif string == 'C':
        return 4
    return 0

In [None]:
file_data = np.load('./training_data.npz',allow_pickle=True)
# data from file A as training data & valid data
# see 
# Y. Gao, X. Chen, H. Qiao, Y. Ke, and H. Qi, 
# “Low-bias manipu- lation of DNA oligo pool for robust data storage,” 
# ACS Synthetic Biology, vol. 9, no. 12, pp. 3344–3352, 2020, pMID: 33185422.
# for original data.
# './training_data.npz' is preprocessed. 

In [None]:
data_seq = file_data['data_seq']
# data_seq is a list whose elements are DNA-SEQs like 'AACCGGTTTAGCGT....'.
label_list = file_data['label_list']
# label_list is a list whose elements are list of COPY ERROR LABELS, for instance, 
# [[0,0,1,0,0,...],[0,1,0,0,...]] indicate
# two kinds of copies with errors at position 2 and 1
freq_list = file_data['freq_list']
# freq_list is a list whose elements are list of frequencies of COPIES, corresponds to label_list.

In [None]:
len(label_list[0][0])

In [None]:
training_data = []
training_label = []
for dna_seq, labels, copy_freq_list in zip(data_seq,label_list,freq_list):
    xx_label = np.zeros_like(labels[0])
    for a,b in zip(labels,copy_freq_list):
        xx_label += (np.array(a)==1) * b
    xx_label = xx_label/np.sum(copy_freq_list)
    training_label.append(xx_label)
    training_data.append([map_to_digit(letter) for letter in dna_seq[5:]])

In [None]:
plt.imshow(np.array(training_label[:100]))

In [None]:
training_data = np.array(training_data)
training_label = (np.array(training_label)).astype(np.float)

In [None]:
def global_attention(states, name=0):
    QUERY_KEY_DIMENTION = 16
    VALUE_DIMENTION = 16
    values = Dense(VALUE_DIMENTION,use_bias=False,activation='relu',kernel_regularizer='l2',name='values_dense_{}'.format(name))(states)
    queries = Dense(QUERY_KEY_DIMENTION,use_bias=False,activation='relu',kernel_regularizer='l2',name='queries_dense_{}'.format(name))(states)
    keys = Dense(QUERY_KEY_DIMENTION,use_bias=False,activation='relu',kernel_regularizer='l2',name='keys_dense_{}'.format(name))(states)
    weights = dot([keys,queries],[2,2],name='weights_{}'.format(name))
    weights = weights/np.sqrt(SEQ_LENGTH)
    weights = Activation('softmax',name='attention_weights_{}'.format(name))(weights)
    attention = dot([weights,values],[2,1])
    print(attention)
    attention = Dropout(0.3)(attention)
    return attention
def local_attention(states, name=0):
    QUERY_KEY_DIMENTION = 32
    VALUE_DIMENTION = 16
    values = Dense(VALUE_DIMENTION,use_bias=False,activation='relu',kernel_regularizer='l2',name='local_values_dense_{}'.format(name))(states)
    attention = Conv1D(16,kernel_size=3,strides=1,
                       padding='same',activation='relu',kernel_regularizer='l2',name='local_conv1_{}'.format(name))(values)
    attention = Conv1D(16,kernel_size=3,strides=1,
                       padding='same',activation='relu',kernel_regularizer='l2',name='local_conv2_{}'.format(name))(attention)
    attention = Dropout(0.3)(attention)
    print(attention)
    return attention

In [None]:
latent_dim = 128
num_encoder_tokens = 1
num_decoder_tokens = 4
SEQ_LENGTH = 15
NUMBER_HEADS = 5
NUMBER_LOCAL_HEADS = 5
ATTENTION_MODEL_NUM = 1
SEED = 0

encoder_inputs = Input(shape=(SEQ_LENGTH, num_encoder_tokens),name='seq_input')
c1 = Conv1D(filters=32,kernel_size=1,strides=1,padding='same',activation='relu',kernel_regularizer='l2')(encoder_inputs)
c2 = Conv1D(filters=32,kernel_size=2,strides=1,padding='same',activation='relu')(encoder_inputs)
c3 = Conv1D(filters=32,kernel_size=3,strides=1,padding='same',activation='relu',kernel_regularizer='l2')(encoder_inputs)
c4 = Conv1D(filters=32,kernel_size=4,strides=1,padding='same',activation='relu')(encoder_inputs)
c5 = Conv1D(filters=32,kernel_size=5,strides=1,padding='same',activation='relu',kernel_regularizer='l2')(encoder_inputs)
source_hidden_states = concatenate([c1,c2,c3,c4,c5],axis=-1)

multi_head = source_hidden_states
print(multi_head)

for L in range(ATTENTION_MODEL_NUM):
    attentions = []
    multi_head_saver = multi_head
    for _ in range(NUMBER_HEADS):
        attentions.append(global_attention(multi_head,name=_+L*100))
    for _ in range(NUMBER_LOCAL_HEADS):
        attentions.append(local_attention(multi_head,name=_+L*100))

    multi_head = concatenate(attentions,axis=-1)
    print(multi_head)
    multi_head = Add()([multi_head,multi_head_saver])
    multi_head = BatchNormalization()(multi_head)
    multi_head = Dense(64,activation='relu')(multi_head)
    multi_head = Dense(32,activation='relu')(multi_head)
    print(multi_head)

print(multi_head)


outputs = Flatten()(multi_head)

print(outputs)
outputs = Dense(64,activation='relu',kernel_regularizer='l2')(outputs)
outputs = Dense(32,activation='relu',kernel_regularizer='l2')(outputs)
outputs = Dropout(0.3)(outputs)
outputs = Flatten()(outputs)
outputs = Dense(2,activation='softmax')(outputs)
print(outputs)

model = Model(inputs=[encoder_inputs], outputs=[outputs])

In [None]:
def recall_neg(y_true, y_pred):
    thsh = 0.5
    pred = K.cast(y_pred>thsh,'float32')
    diff = y_true - pred
    diff1 = K.cast(diff<K.epsilon(),'float32')
    diff2 = K.cast(diff>-K.epsilon(),'float32')
    diff = diff1*diff2
    y_1_acc = K.sum((diff) * y_true) / (K.sum(y_true) + K.epsilon())
    
    y_true_flip = K.cast((y_true-0.8) < 0,'float32')
    y_0_acc = K.sum((diff) * y_true_flip) / (K.sum(y_true_flip) + K.epsilon())
    
    return y_0_acc

def recall_pos(y_true, y_pred):
    thsh = 0.5
    pred = K.cast(y_pred > thsh,'float32')
    diff = y_true - pred
    diff1 = K.cast(diff<K.epsilon(),'float32')  # (neg to pos) and (pos to pos)
    diff2 = K.cast(diff>-K.epsilon(),'float32') # (pos to neg) and (neg to neg)
    diff = diff1*diff2
    y_1_acc = K.sum((diff) * y_true) / (K.sum(y_true) + K.epsilon())
    
    y_true_flip = K.cast((y_true-0.8) < 0,'float32')
    y_0_acc = K.sum((diff) * y_true_flip) / (K.sum(y_true_flip) + K.epsilon())
    
    return y_1_acc

optimizer = tf.keras.optimizers.Adam(lr=0.0001)
loss = tf.losses.categorical_crossentropy
metrics = ['acc',recall_neg,recall_pos]
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)
model.summary()

In [None]:
from sklearn.utils import shuffle
train_data, train_label = training_data,training_label
_train_data = np.array(train_data)[:,:,np.newaxis]
_train_label = np.array(train_label)

N = SEQ_LENGTH
train_data_pos = []
train_label_pos = []
train_data_neg = []
train_label_neg = []
cnt = 0
POS_THSH = 0.05
np.random.seed(SEED)
for _data,_label, in zip(_train_data,_train_label):
    for _ in range(len(_data)-N):
        if np.sum(_label[_+N//2-1:_+N//2+2]) <= 0:
            train_data_neg.append(_data[_:_+N])
            train_label_neg.append(0)
        elif _label[_+N//2] >= POS_THSH:
            cnt += 1
            train_data_pos.append(_data[_:_+N])
            train_label_pos.append(1)

train_data_pos = np.array(train_data_pos)
train_label_pos = np.array(train_label_pos)
train_data_pos,train_label_pos = shuffle(train_data_pos,train_label_pos)
valid_data_pos = train_data_pos[-1000:]
valid_label_pos = train_label_pos[-1000:]
train_data_pos = train_data_pos[:-1000]
train_label_pos = train_label_pos[:-1000]

In [None]:
train_data_neg,train_label_neg = shuffle(train_data_neg,train_label_neg)

In [None]:
M = (len(train_data_neg)-len(valid_data_pos))//len(train_data_pos)//5

print(M)
train_data_combine = np.concatenate([train_data_pos]*(M)+[train_data_neg[:M*train_data_pos.shape[0]]])
train_label_combine = np.concatenate([train_label_pos]*(M)+[train_label_neg[:M*train_label_pos.shape[0]]])

valid_data = np.concatenate([valid_data_pos,train_data_neg[-valid_data_pos.shape[0]:]])
valid_label = np.concatenate([valid_label_pos,train_label_neg[-valid_label_pos.shape[0]:]])

valid_data,valid_label = shuffle(valid_data,valid_label)
train_data_combine,train_label_combine = shuffle(train_data_combine,train_label_combine)

In [None]:
hist = model.fit(train_data_combine,
                 to_categorical(train_label_combine),
                 batch_size=256, 
                 validation_data=(valid_data,to_categorical(valid_label)), shuffle=True, 
                 epochs=10)

In [None]:
file_data_test = np.load('./testing_data.npz',allow_pickle=True)
# Prepare testing data. 
# data from file B as testing data
# see 
# Y. Gao, X. Chen, H. Qiao, Y. Ke, and H. Qi, 
# “Low-bias manipu- lation of DNA oligo pool for robust data storage,” 
# ACS Synthetic Biology, vol. 9, no. 12, pp. 3344–3352, 2020, pMID: 33185422.
# for original data.
# './testing_data.npz' is preprocessed. 

In [None]:
data_seq_test = file_data_test['data_seq']
label_list_test = file_data_test['label_list']
freq_list_test = file_data_test['freq_list']

In [None]:
testing_data = []
testing_label = []
for dna_seq, labels, copy_freq_list in zip(data_seq_test,label_list_test,freq_list_test):
    xx_label = np.zeros_like(labels[0])
    for a,b in zip(labels,copy_freq_list):
        xx_label += (np.array(a)==1) * b
    xx_label = xx_label/np.sum(copy_freq_list)
    testing_label.append(xx_label)
    testing_data.append([map_to_digit(letter) for letter in dna_seq[5:]])
    
testing_data = np.array(testing_data).astype(np.float)
testing_label = np.array(testing_label).astype(np.float)

SEQ_LENGTH = SEQ_LENGTH
SEED = SEED
POS_THSH = POS_THSH
VALID_NUM = 1000
test_data_pos = []
test_data_neg = []
test_label_pos = []
test_label_pos_float = []
test_label_neg = []
for data,label in zip(testing_data,testing_label):
    for _ in range(len(data)-N):
        if label[_+N//2] <= 0:
            test_data_neg.append(data[_:_+N])
            test_label_neg.append(0)
        elif label[_+N//2] >= POS_THSH:
            test_data_pos.append(data[_:_+N])
            test_label_pos.append(1)
            test_label_pos_float.append(label[_+N//2])
test_data_pos = np.array(test_data_pos)[:,:,np.newaxis]
test_data_neg = np.array(test_data_neg)[:,:,np.newaxis]
test_label_pos_float = np.array(test_label_pos_float)

In [None]:
positive_ans = model.predict(test_data_pos,batch_size=512)
negative_ans = model.predict(test_data_neg,batch_size=512)

In [None]:
from sklearn.metrics import roc_curve,auc

In [None]:
fpr,tpr,threshold = roc_curve([1]*len(positive_ans)+[0]*len(negative_ans),
                              np.concatenate([positive_ans[:,1],negative_ans[:,1]]))
auc_class = auc(fpr,tpr)

In [None]:
plt.figure()
lw = 2
plt.plot(fpr, tpr, color='darkorange',
         lw=lw, label='ROC curve (area = %0.2f)'%(auc_class))
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.0])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic example')
plt.legend(loc="lower right")
plt.show()