In [1]:
import numpy as np
import pandas as pd
import sys
import torch.nn.functional as F
import os
import random

import torch
import torch.nn as nn ##########
import torch.utils.data as Data
from torch.autograd import Variable
from torch import optim
from sklearn.model_selection import train_test_split
import itertools

from sklearn import metrics
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import roc_curve
from sklearn.metrics import auc
from sklearn.metrics import confusion_matrix,recall_score,matthews_corrcoef,roc_curve,roc_auc_score,auc,precision_recall_curve
# from torch.utils.tensorboard import SummaryWriter
import seaborn as sns

#----->>
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
import pickle
# from scipy import interp
import matplotlib.pyplot as plt


In [3]:
def convert_seq_to_bicoding(seq,wsize=41):
	#return bicoding for a sequence
	seq=seq.replace('U','T') #turn rna seq to dna seq if have
	feat_bicoding=[]
	bicoding_dict={'A':[1,0,0,0],'C':[0,1,0,0],'G':[0,0,1,0],'T':[0,0,0,1],'N':[0,0,0,0]}
	if len(seq) < wsize:
		seq=seq+'N'*(wsize-len(seq))
	for each_nt in seq:
		feat_bicoding+=bicoding_dict[each_nt]
	return feat_bicoding # (404,) list


def load_data_bicoding(Path):
	data = np.loadtxt(Path,dtype=list)
	data_result = []
	for seq in data:
		seq = str(seq.strip('\n'))
		bicoding=convert_seq_to_bicoding(seq)
		data_result.append(bicoding)
	return data_result 



def load_train_val_bicoding(pos_train_fa,neg_train_fa):
	data_pos_train = []
	data_neg_train = []
	
	data_pos_train = load_data_bicoding(pos_train_fa)
	data_neg_train = load_data_bicoding(neg_train_fa)
	
	
	data_train = np.array([_ + [1] for _ in data_pos_train] + [_ + [0] for _ in data_neg_train])
	np.random.seed(42)
	np.random.shuffle(data_train)
	
	X = np.array([_[:-1] for _ in data_train])
	y = np.array([_[-1] for _ in data_train])
	
	X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=1.0/10, random_state=42)
	
	return X_train,y_train,X_test,y_test

def load_test_bicoding(pos_test_fa,neg_test_fa):
	data_pos_test = []
	data_neg_test = []
	
	data_pos_test = load_data_bicoding(pos_test_fa)
	data_neg_test = load_data_bicoding(neg_test_fa)
	
	X_test = np.array([_ for _ in data_pos_test] + [_ for _ in data_neg_test])
	y_test = np.array([1 for _ in data_pos_test] + [0 for _ in data_neg_test])
	
	return X_test, y_test


def load_in_torch_fmt(X_train, y_train,vec_len):
	X_train = X_train.reshape(X_train.shape[0], int(X_train.shape[1]/vec_len), vec_len)
	X_train = torch.from_numpy(X_train).float()
	y_train = torch.from_numpy(y_train).long()

	
	return X_train, y_train

			
def chunkIt(seq, num): 
	avg = len(seq) / float(num)
	out = []
	last = 0.0
	
	while last < len(seq):
		out.append(seq[int(last):int(last + avg)])
		last += avg
		
	return out

def shuffleData(X, y):
	index = [i for i in range(len(X))]
	# np.random.seed(42)
	random.shuffle(index)
	new_X = X[index]
	new_y = y[index]
	return new_X, new_y

def round_pred(pred):
	list_result = []
	for i in pred:
		if i >0.5:
			list_result.append(1)
		elif i <=0.5:
			list_result.append(0)
	return torch.tensor(list_result)

In [4]:

class CNN4_LSTM(nn.Module):
		def __init__(self,kernel_size):
				super(CNN4_LSTM, self).__init__()
				self.conv1 = nn.Sequential(
						nn.Conv1d(
								in_channels=wordvec_len,
								out_channels=256,
								kernel_size = kernel_size,
								padding = int(kernel_size/2)),
					
						nn.ReLU(),
						nn.BatchNorm1d(256),
						nn.Dropout()
				)
				self.conv2 = nn.Sequential(
						nn.Conv1d(
								in_channels=256,
								out_channels=256,
								kernel_size = kernel_size,
						),
						nn.ReLU(),
						nn.BatchNorm1d(256),
						nn.Dropout()
					
				)
				self.conv3 = nn.Sequential(
						nn.Conv1d(
								in_channels=256,
								out_channels=256,
								kernel_size = kernel_size,
						),
						nn.ReLU(),  
						nn.BatchNorm1d(256),
						nn.Dropout()
					
				)
				self.conv4 = nn.Sequential( 
						nn.Conv1d(
								in_channels=256,
								out_channels=256, 
								kernel_size = kernel_size,
						),
						nn.ReLU(),

						nn.BatchNorm1d(256),
						nn.Dropout()
					
				)
				self.lstm = torch.nn.LSTM(256, 32, 2, batch_first=True, bidirectional=True) #
			
				self.fc_task = nn.Sequential(
					nn.Linear(32*2, 32),
					nn.Dropout(0.5),
					nn.ReLU(),
					nn.Linear(32, 2),
				)
				self.classifier = nn.Linear(2, 1)

			
		def forward(self, x):
				h0 = Variable(torch.zeros(2 * 2, x.size(0), 32)).to(device)#----cuda ##
				c0 = Variable(torch.zeros(2 * 2, x.size(0), 32)).to(device)#----cuda #3

				x = x.transpose(1, 2)
				x = self.conv1(x) 
				x = self.conv2(x)
				x = self.conv3(x)
				x = self.conv4(x)
				x = x.transpose(1, 2)
				#rnn layer
				out, _ = self.lstm(x, (h0, c0))
			
				reduction_feature = self.fc_task(torch.mean(out, 1))

				representation = reduction_feature
				logits_clsf = self.classifier(representation)
				logits_clsf1 = logits_clsf
				logits_clsf = torch.sigmoid(logits_clsf)
				return logits_clsf, representation

In [5]:
if __name__ == '__main__':
	# Hyper Parameters------------------>>
	
	device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
	device_cpu = torch.device("cpu") 
	wordvec_len =4
	
	k_folds = 10
	EPOCH = 1
	BATCH_SIZE = 256
	LR = 0.01 
	kernel_size = 10
	
	#!!! Model
	net = CNN4_LSTM(kernel_size =kernel_size).to(device)
	print(net)  # net architecture

	trainning_result = []
	validation_result = []
	testing_result = []
	
	
	train_loss_sum, valid_loss_sum, test_loss_sum= 0, 0, 0
	train_acc_sum , valid_acc_sum , test_acc_sum = 0, 0, 0
	
	test_acc = []  
	test_losses = [] 

	all_index = ['0']
	for index_fold in all_index:  
		species = '880'
		train_pos_fa = './{0}_data/{0}_positive_train.fa'.format(species)
		train_neg_fa = './{0}_data/{0}_negative_train.fa'.format(species)
			
		X_train, y_train, X_valid, y_valid = load_train_val_bicoding(train_pos_fa,train_neg_fa)

		X_train, y_train = load_in_torch_fmt(X_train, y_train,wordvec_len)
		X_valid, y_valid = load_in_torch_fmt(X_valid, y_valid,wordvec_len)
	
		train_loader = Data.DataLoader(Data.TensorDataset(X_train,y_train), BATCH_SIZE, shuffle = False)
		val_loader = Data.DataLoader(Data.TensorDataset(X_valid, y_valid), BATCH_SIZE, shuffle = False)
	
		model = net
	
		criterion = nn.BCELoss(size_average=False)
		optimizer = torch.optim.Adam(params = model.parameters(), lr = LR)

		train_losses = []
		val_losses = []
	
		train_acc = []
		val_acc = []
	
		best_acc = 0
		patience = 0
		patience_limit = 30
	
		epoch_list = [] #统计用到的epoch，用于画图
		for epoch in range(EPOCH):
			
			model.train()
			correct = 0   		
			train_loss = 0
			for step, (train_x, train_y) in enumerate(train_loader):
				
				train_x = Variable(train_x, requires_grad=False).to(device)
				train_y = Variable(train_y, requires_grad=False).to(device)

				fx, presention = model(train_x)

				loss = criterion(fx.squeeze(), train_y.type(torch.FloatTensor).to(device))

				optimizer.zero_grad()
				loss.backward()
				optimizer.step()
				pred = round_pred(fx.data.cpu().numpy()).to(device)
				correct += pred.eq(train_y.view_as(pred)).sum().item()
				
				train_loss += loss.item() * len(train_y)
				pred_prob = fx
				
				if (step+1) % 10 == 0:
				# 每10个batches打印一次loss
					print ('Epoch : %d/%d, Iter : %d/%d,  Loss: %.4f'%(epoch + 1, EPOCH, 
																		step + 1, len(X_train)//BATCH_SIZE, 
																		loss.item()))
					
			
			accuracy = 100.*correct/len(X_train)
			#统计：Epoch: 1, Loss: 0.64163, Training set accuracy: 908/1426 (63.675%)
			print('Epoch: {}, Loss: {:.5f}, Training set accuracy: {}/{} ({:.3f}%)'.format(
				epoch + 1, loss.item(), correct, len(X_train), accuracy))
			train_acc.append(accuracy)
			
			# 每个epoch计算测试集accuracy
			model.eval() #!!! Valid
			val_loss = 0
			correct = 0
			repres_list_valid , label_list_valid = [],[]
			
			with torch.no_grad():
				for step, (valid_x, valid_y) in enumerate(val_loader):     
					valid_x = Variable(valid_x, requires_grad=False).to(device)  
					valid_y = Variable(valid_y, requires_grad=False).to(device) 
					optimizer.zero_grad()
					y_hat_val, presention_valid = model(valid_x)
					loss = criterion(y_hat_val.squeeze(), valid_y.type(torch.FloatTensor).to(device)).item()  #BE
					val_loss += loss * len(valid_y)            									   #Cross
					pred_val = round_pred(y_hat_val.data.cpu().numpy()).to(device)    							   #BE
					correct += pred_val.eq(valid_y.view_as(pred_val)).sum().item()

			val_losses.append(val_loss/len(X_valid)) # all loss / all sample
			accuracy = 100.*correct/len(X_valid)
			#统计：Valid set: Average loss: 410.8327, Accuracy: 148/158 (93.671%)
			print('Valid set: Average loss: {:.4f}, Accuracy: {}/{} ({:.3f}%)\n'.format(
				val_loss/len(X_valid), correct, len(X_valid), accuracy))
			val_acc.append(accuracy)
			

CNN4_LSTM(
  (conv1): Sequential(
    (0): Conv1d(4, 256, kernel_size=(10,), stride=(1,), padding=(5,))
    (1): ReLU()
    (2): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Dropout(p=0.5, inplace=False)
  )
  (conv2): Sequential(
    (0): Conv1d(256, 256, kernel_size=(10,), stride=(1,))
    (1): ReLU()
    (2): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Dropout(p=0.5, inplace=False)
  )
  (conv3): Sequential(
    (0): Conv1d(256, 256, kernel_size=(10,), stride=(1,))
    (1): ReLU()
    (2): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Dropout(p=0.5, inplace=False)
  )
  (conv4): Sequential(
    (0): Conv1d(256, 256, kernel_size=(10,), stride=(1,))
    (1): ReLU()
    (2): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Dropout(p=0.5, inplace=False)
  )
  (lstm): LSTM(256, 32, num_layers=2, batch_first=True, b



Epoch: 1, Loss: 97.34566, Training set accuracy: 757/1425 (53.123%)
Valid set: Average loss: 103.3581, Accuracy: 102/159 (64.151%)

