In [1]:
from os import path
import torch

print(torch.__version__)
print('using gpu : ', torch.cuda.is_available())
accelerator = 'cu80' if torch.cuda.is_available() else 'cpu'
print('accelerator : ', accelerator)

1.10.2
using gpu :  True
accelerator :  cu80


In [2]:
codetest = True

In [3]:
num_motif = 16
bases = 'ACGT' # DNA bases
# basesRNA = 'ACGU' # RNA bases
batch_size = 64
dictReverse = {'A':'T','C':'G','G':'C','T':'A','N':'N'} #dictionary to implement reverse-complement mode
reverse_mode=False

In [4]:
import numpy as np
import csv
import math 
import random
import gzip
from scipy.stats import bernoulli

In [5]:
# only supporting DNA bases
def seq2pad(sequence, motiflen):
    rows = len(sequence) + 2*motiflen - 2
    S = np.empty([rows, 4])
    base = bases
    for i in range(rows):
        for j in range(4):
            if i-motiflen+1<len(sequence) and sequence[i-motiflen+1] == 'N' or i<motiflen-1 or i>len(sequence)+motiflen-2:
                S[i,j] = np.float32(0.25)
            elif sequence[i-motiflen+1] == base[j]:
                S[i,j] = np.float32(1)
            else:
                S[i,j] = np.float32(0)
    return np.transpose(S)

In [6]:
# testing seq2pad

if(codetest):
    seq_test = 'ATGGATGG'
    motiflen_test = 3
    s = seq2pad(seq_test, motiflen_test)
    print(s)

[[0.25 0.25 1.   0.   0.   0.   1.   0.   0.   0.   0.25 0.25]
 [0.25 0.25 0.   0.   0.   0.   0.   0.   0.   0.   0.25 0.25]
 [0.25 0.25 0.   0.   1.   1.   0.   0.   1.   1.   0.25 0.25]
 [0.25 0.25 0.   1.   0.   0.   0.   1.   0.   0.   0.25 0.25]]


In [7]:
def dinuc_shuffle(sequence):
    b = [sequence[i:i+2] for i in range(0, len(sequence), 2)]
    random.shuffle(b)
    d = ''.join([str(x) for x in b])
    return d

In [8]:
# testing dinuc_shuffle

if(codetest):
    print(dinuc_shuffle(seq_test))

GGATGGAT


In [9]:
def complement(sequence):
    complement = {'A': 'T', 'C': 'G', 'G': 'C', 'T': 'A', 'N': 'N'}
    complement_sequence = [complement[base] for base in sequence]
    return complement_sequence

In [10]:
# testing complement

if(codetest):
    print(complement(seq_test))
    print(list(seq_test))

['T', 'A', 'C', 'C', 'T', 'A', 'C', 'C']
['A', 'T', 'G', 'G', 'A', 'T', 'G', 'G']


In [11]:
def reverse_complement(sequence):
    sequence = list(sequence)
    sequence.reverse()
    return ''.join(complement(sequence))

In [12]:
# testing reverse_complement

if(codetest):
    print(reverse_complement(seq_test))

CCATCCAT


In [40]:
class Chip():
    def __init__(self, filename, motiflen=24, reverse_complement_mode = reverse_mode):
        self.file = filename
        self.motiflen = motiflen
        self.reverse_complement_mode = reverse_complement_mode
    
    def openFile(self):
        train_dataset = []
        with gzip.open(self.file, 'rt') as data:
            next(data)
            reader = csv.reader(data, delimiter = '\t')
            if not self.reverse_complement_mode:
                for row in reader:
                    train_dataset.append([seq2pad(row[2], self.motiflen), [1]]) # target = 1
                    train_dataset.append([seq2pad(dinuc_shuffle(row[2]), self.motiflen), [0]]) # target = 0
            else:
                for row in reader:
                      train_dataset.append([seq2pad(row[2],self.motiflen),[1]])
                      train_dataset.append([seq2pad(reverse_complement(row[2]),self.motiflen),[1]])
                      train_dataset.append([seq2pad(dinuc_shuffle(row[2]),self.motiflen),[0]])
                      train_dataset.append([seq2pad(dinuc_shuffle(reverse_complement(row[2])),self.motiflen),[0]])
        
        train_dataset_pad = train_dataset

        size = int(len(train_dataset_pad)/3)
        firstvalid = train_dataset_pad[:size]
        secondvalid = train_dataset_pad[size:2*size]
        thirdvalid = train_dataset_pad[2*size:]
        firsttrain = secondvalid+thirdvalid
        secondtrain = firstvalid+thirdvalid
        thirdtrain = firstvalid+secondvalid

        return firsttrain, firstvalid, secondtrain, secondvalid, thirdtrain, thirdvalid, train_dataset_pad

In [41]:
chipseq = Chip('data/encode/ELK1_GM12878_ELK1_(1277-1)_Stanford_AC.seq.gz')

In [42]:
train1, valid1, train2, valid2, train3, valid3, all_data = chipseq.openFile()

In [17]:
# testing Chip

if(codetest):
    dataset = []
    with gzip.open('data/encode/ELK1_GM12878_ELK1_(1277-1)_Stanford_AC.seq.gz', 'rt') as data:
        next(data)
        reader = csv.reader(data, delimiter='\t')
        for row in reader:
            print(row[2], " : ", len(row[2]))

In [18]:
from torch.utils.data import Dataset, DataLoader

class chipseq_dataset(Dataset):
    '''diabetes dataset'''

    def __init__(self, xy = None):
        self.x_data = np.asarray([element[0] for element in xy],dtype=np.float32)
        self.y_data = np.asarray([element[1] for element in xy], dtype=np.float32)
        self.x_data = torch.from_numpy(self.x_data)
        self.y_data = torch.from_numpy(self.y_data)
        self.len = len(self.x_data)
    
    def __getitem__(self, index):
        return self.x_data[index], self.y_data[index]
    
    def __len__(self):
        return self.len

In [19]:
train1_dataset=chipseq_dataset(train1)
train2_dataset=chipseq_dataset(train2)
train3_dataset=chipseq_dataset(train3)
valid1_dataset=chipseq_dataset(valid1)
valid2_dataset=chipseq_dataset(valid2)
valid3_dataset=chipseq_dataset(valid3)
all_data_dataset=chipseq_dataset(all_data)

In [53]:
if(codetest):
    train2_test = []
    with gzip.open('data/encode/ELK1_GM12878_ELK1_(1277-1)_Stanford_AC.seq.gz', 'rt') as data:
        next(data)
        reader = csv.reader(data, delimiter = '\t')
        for row in reader:
            train2_test.append([row[2], [1]]) # target = 1
            train2_test.append([dinuc_shuffle(row[2]), [0]]) # target = 0
    
    print(train2_test[0]) # -> the first data on 'data/encode/ELK1_GM12878_ELK1_(1277-1)_Stanford_AC.seq.gz'
    # to see the data, please refer to the copy of it, 'data/encode/ELK1_GM12878_ELK1_(1277-1)_Stanford_AC.seq copy'

# portion of the output of train2_dataset.__gettiem__(0) -> 'CCCG'...
    # 0.0000, 0.0000, 0.0000, 0.0000
    # 1.0000, 1.0000, 1.0000, 0.0000
    # 0.0000, 0.0000, 0.0000, 1.0000
    # 0.0000, 0.0000, 0.0000, 0.0000

# please note that the output tensor is transposed

['CCCGCTCCCTAGCAACGATTGGTTAACGAGGCCTGGTTTCCAGAAATGGGCACTATTTCCGGACGATGTTTGTAGAAGGGAGATCGCTGGGCGGGCGGACT', [1]]


In [55]:
if reverse_mode:
  train_loader1 = DataLoader(dataset=train1_dataset,batch_size=batch_size,shuffle=False)
  train_loader2 = DataLoader(dataset=train2_dataset,batch_size=batch_size,shuffle=False)
  train_loader3 = DataLoader(dataset=train3_dataset,batch_size=batch_size,shuffle=False)
  valid1_loader = DataLoader(dataset=valid1_dataset,batch_size=batch_size,shuffle=False)
  valid2_loader = DataLoader(dataset=valid2_dataset,batch_size=batch_size,shuffle=False)
  valid3_loader = DataLoader(dataset=valid3_dataset,batch_size=batch_size,shuffle=False)
  alldataset_loader=DataLoader(dataset=all_data_dataset,batch_size=batch_size,shuffle=False)
else:
  train_loader1 = DataLoader(dataset=train1_dataset,batch_size=batch_size,shuffle=True)
  train_loader2 = DataLoader(dataset=train2_dataset,batch_size=batch_size,shuffle=True)
  train_loader3 = DataLoader(dataset=train3_dataset,batch_size=batch_size,shuffle=True)
  valid1_loader = DataLoader(dataset=valid1_dataset,batch_size=batch_size,shuffle=False)
  valid2_loader = DataLoader(dataset=valid2_dataset,batch_size=batch_size,shuffle=False)
  valid3_loader = DataLoader(dataset=valid3_dataset,batch_size=batch_size,shuffle=False)
  alldataset_loader=DataLoader(dataset=all_data_dataset,batch_size=batch_size,shuffle=False)

train_dataloader = [train_loader1, train_loader2, train_loader3]
valid_dataloader = [valid1_loader, valid2_loader, valid3_loader]

In [56]:
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F

# Device Configuration
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# Hyperparameters
epochs = 4
num_classes = 10
batch_size = 100
lr = 0.001

In [57]:
def logsampler(a,b):
    x = np.random.uniform(low = 0, high = 1)
    y = 10**((math.log10(b)-math.log10(a))*x + math.log10(a))
    return y

In [59]:
def sqrtsampler(a,b):
    x = np.random.uniform(low = 0, hight = 1)
    y = (b-a)*math.sqrt(x) + a
    return y

In [62]:
class ConvNet(nn.Module):
    def __init__(self, num_motif, motif_len, poolType, neuType, mode, droprate, lr, momentum_rate, 
                sigmaConv, sigmaNeu, beta1, beta2, beta3, reverse_complement_mode = reverse_mode):
        super(ConvNet, self).__init__()
        self.poolType = poolType
        self.neuType = neuType
        self.mode = mode
        self.reverse_complement_mode = reverse_complement_mode
        self.droprate = droprate
        self.lr = lr
        self.momentum_rate = momentum_rate
        self.sigmaConv = sigmaConv
        self.sigmaNeu = sigmaNeu
        self.beta1 = beta1
        self.beta2 = beta2
        self.beta3 = beta3

        self.wConv = torch.randn(num_motif, 4, motif_len).to(device)
        torch.nn.init.normal_(self.wConv, mean = 0, std = self.sigmaConv)
        self.wConv.requires_grad = True

        self.wRect = torch.randn(num_motif).to(device)
        torch.nn.init.normal_(self.wRect)
        self.wRect = -self.wRect
        self.wRect.requires_grad = True

        if neuType == 'nohidden':
            if poolType == 'maxavg':
                self.wNeu = torch.randn(2*num_motif, 1).to(device)
            else:
                self.wNeu = torch.randn(num_motif, 1).to(device)
            self.wNeuBias = torch.randn(1).to(device)
            torch.nn.init.normal_(self.wNeu, mean=0, std = self.sigmaNeu)
            torch.nn.init.normal_(self.wNeuBias, mean=0, std = self.sigmaNeu)
        
        else:
            if poolType == 'maxavg':
                self.wHidden = torch.randn(2*num_motif, 32).to(device)
            else:
                self.wHidden = torch.randn(num_motif, 32).to(device)
            self.wHiddenBias = torch.randn(32).to(device)
            self.wNeu = torch.randn(32, 1).to(device)
            self.wNeuBias = torch.randn(1).to(device)
            torch.nn.init.normal_(self.wNeu, mean=0, std = self.sigmaNeu)
            torch.nn.init.normal_(self.wNeuBias, mean=0, std = self.sigmaNeu)
            torch.nn.init.normal_(self.wHidden, mean=0, std = 0.3)
            torch.nn.init.normal_(self.wHiddenBias, mean=0, std = 0.3)

            self.wHidden.requires_grad = True
            self.wHiddenBias.requires_grad = True
        
        self.wNeu.requires_grad = True
        self.wNeuBias.requires_grad = True
    
    def divide_two_tensors(self, x):
        l = torch.unbind(x)

        list1 = [l[2*i] for i in range(int(x.shape[0]/2))]
        list2 = [l[2*i + 1] for i in range(int(x.shape[0]/2))]
        x1 = torch.stack(list1, 0)
        x2 = torch.stack(list2, 0)
        return x1, x2
    
    def forward_pass(self, x, mask=None, use_mask=False):
        conv = F.conv1d(x, self.wConv, bias = self.wRect, stride = 1, padding=0)
        rect = conv.clamp(min=0) # -> rectification
        maxPool, _ = torch.max(rect, dim=2)
        if self.poolType == 'maxavg':
            avgPool = torch.mean(rect, dim=2)
            pool = torch.cat((maxPool, avgPool), 1) # -> pool 순서가 논문과 다르지만 상관 없을것 같다!
        else:
            pool = maxPool
        if(self.neuType == 'nohidden'):
            if self.mode == 'training':
                if not use_mask:
                    mask = bernoulli.rvs(self.droprate, size=len(pool[0]))
                    mask = torch.from_numpy(mask).float().to(device)
                pooldrop = pool*mask
                out = pooldrop @ self.wNeu
                out.add_(self.wNeuBias)
            else:
                out = self.droprate*(pool @ self.wNeu)
                out.add_(self.wNeuBias)
        else:
            hid = pool @ self.wHidden
            hid.add_(self.wHiddenBias)
            hid = hid.clamp(min = 0) # rectification
            if self.mode == 'training':
                if not use_mask:
                    mask = bernoulli.rvs(self.droprate, size = len(hid[0]))
                    mask = torch.from_numpy(mask).float().to(device)
                hid_drop = hid*mask
                out = self.droprate*(hid@self.wNeu)
                out.add_(self.wNeuBias)
        
        return out, mask
    
    def forward(self, x):
        if not self.reverse_complement_mode:
            out, _ = self.forward_pass(x)
        else:
            x1, x2 = self.divide_two_tensors(x)
            out1, mask = self.forward_pass(x1)
            out2, _ = self.forward_pass(x2, mask, True)
            out = torch.max(out1, out2)
        
        return out
