#Import Data

Importing the datafiles from github

In [None]:
import os
! git clone https://git.wur.nl/dijk097/dl_bif_project_data.git
os.chdir('dl_bif_project_data')
print(os.listdir("project_datasets"))

Cloning into 'dl_bif_project_data'...
remote: Enumerating objects: 22, done.[K
remote: Total 22 (delta 0), reused 0 (delta 0), pack-reused 22
Unpacking objects: 100% (22/22), 8.74 MiB | 6.03 MiB/s, done.
['len200_500_n5000nr3.pos', 'GO_3A0043066.annotprot', 'len200_500_n1000.pos', 'len100_200_n1000.pos', 'len200_500_n5000nr3.seq', 'len200_500_n5000nr2.pos', 'len100_200_n1000.seq', 'GO_3A0055085.annotprot', 'GO_3A0007165.annotprot', 'GO_3A0005576.annotprot', 'len200_500_n5000nr4.seq', 'len200_500_n5000nr2.seq', 'len200_500_n5000nr1.seq', 'len200_500_n5000nr1.pos', 'expr5Tseq_filtGO_100-1000.lis', 'GO_3A0005739.annotprot', 'test_set_filt.f', 'len200_500_n5000nr4.pos', 'len200_500_n1000.seq']


Installing d2l

In [None]:
!pip install d2l==0.16 --quiet

Importing required modules

In [None]:
from d2l import torch as d2l
import torch
import torch.nn as nn
from torch.nn import functional as F

#1D-CNN on the simulated datasets

In [None]:
def read(seqfile,posfile):
  '''Parsing two files in text format.

  Keyword arguments:
  seqfile -- file with sequences
  posfile -- file with positive cases (annotated with function)
  Returns:
  datalist -- list containing all sequences
  labellist -- list containing labels in binary format (0 = negative, 
  1 = positive)
  '''
  datalist = []
  with open(seqfile, 'r') as seq:
    count = 0
    for line in seq:
      line_spl = line.strip().split("\t")
      datalist.append(line_spl[1])
      count += 1
    labellist = [0] * count #create list with zeros
  with open(posfile, 'r') as pos:
    for line in pos:
      pos_str = line.strip()
      pos_ind = pos_str[3:]
      pos_num = int(pos_ind)
      labellist[pos_num - 1] = 1
  return datalist, labellist

def read_halfpos(seqfile,posfile):
  '''Parsing two files in text format. Reduce positives by 50%!

  Keyword arguments:
  seqfile -- file with sequences
  posfile -- file with positive cases (annotated with function)
  Returns:
  datalist -- list containing all sequences
  labellist -- list containing labels in binary format (0 = negative, 
  1 = positive) !50% less positives.
  '''
  datalist = []
  pos_lst = []
  with open(seqfile, 'r') as seq:
    count = 0
    for line in seq:
      line_spl = line.strip().split("\t")
      datalist.append(line_spl[1])
      count += 1
    labellist = [0] * count #create list with zeros
  with open(posfile, 'r') as pos:
    for line in pos:
      pos_lst.append(line.strip())
    half_pos = pos_lst[1::2] #exclude 50% of the positives
    for item in half_pos:
      pos_ind = item[3:]
      pos_num = int(pos_ind)
      labellist[pos_num - 1] = 1
  return datalist, labellist

def generate_train_test(datalist, labellist):
  '''Split data into a training set (70%) and test set (30%). 

  Keyword arguments:
  datalist -- list containing all sequences obtained from the read function
  labellist -- list containing labels in binary format obtained from the 
  read function
  Returns:
  traindatalist -- list containing 70% of the data for the trainset
  trainlabellist -- list containing the pos labels for the trainset
  testdatalist -- list containing 30% of the data for testset
  testlabellist -- list containing the pos labels for the testset
  ! Data and labels are put into a tuple.
  '''
  length_seq = len(datalist)
  len_70 = round(length_seq * 0.7)
  traindatalist = []
  trainlabellist = []
  testdatalist = []
  testlabellist= []
  for count, seq in enumerate(datalist):
    if count < len_70:
      traindatalist.append(seq)
    else:
      testdatalist.append(seq)
  for count, pos in enumerate(labellist):
    if count < len_70:
      trainlabellist.append(pos)
    else:
      testlabellist.append(pos)
  return (traindatalist, trainlabellist), (testdatalist, testlabellist)

def load_data(batch_size, num_steps, dataset):
    '''Load data, tokenize and convert to one-hot encoding.

    Keyword arguments:
    batch_size -- the batch size for data loading
    num_steps -- the number of steps for each sequence
    dataset -- the dataset containing sequences and labels obtained 
    from the generate_train_test function.
    Returns:
    data_iter -- data iterator with batched one-hot encoded sequences
    '''
    mapaa2num = {aa: i for (i, aa) in enumerate(list("ACDEFGHIKLMNPQRSTVWY"))}
    seq, lab = dataset
    seq = tokenize(seq, mapaa2num)
    seq_array = build_seq_array(seq, num_steps)
    # Convert to one-hot encoding
    one_hot_seq_array = F.one_hot(seq_array, num_classes=21).float()
    # Transpose the dimensions
    one_hot_seq_array = one_hot_seq_array.transpose(1, 2)
    # Add extra dimension to the target tensor and change data type to float
    data_arrays = (one_hot_seq_array, torch.tensor(lab, dtype=torch.long))
    data_iter = d2l.load_array(data_arrays, batch_size)
    return data_iter

def tokenize(dat, map2num,non_aa_num=20):
  '''Tokenize the input data by mapping amino acids to numbers.

  Keyword arguments:
  dat -- input data as list of sequences
  map2num -- dictionary mapping amino acids to numbers
  non_aa_num -- number to replace non-amino acid characters (default: 20)
  Returns:
  seq -- list of tokenized sequences
  '''
  seq = []
  for count, i in enumerate(dat):
    seq.append([map2num.get(j,non_aa_num) for j in list(i)])
  return seq

def build_seq_array(lines, num_steps,non_aa_num=20):
  '''Build a sequence array from the input data.

  Keyword arguments:
  lines -- input data as list of sequences
  num_steps -- the number of steps for each sequence
  non_aa_num -- number to replace non-amino acid characters (default: 20)
  Returns:
  array -- tensor containing the padded or truncated sequences
  '''
  array = torch.tensor([
  truncate_pad(l, num_steps, non_aa_num) for l in lines])
  return array

def truncate_pad(line, num_steps, padding_token):
  '''Truncate or pad a sequence based on the specified number of steps.

  Keyword arguments:
  line -- input sequence as a list of tokens
  num_steps -- the number of steps for each sequence
  padding_token -- token to pad sequences shorter than num_steps
  Returns:
  truncated or padded sequence
  '''
  if len(line) > num_steps:
    return line[:num_steps] # Truncate the sequence if needed
  return line + [padding_token] * (num_steps - len(line)) # Padding


def train_ch6(net, train_iter, test_iter, num_epochs, lr, momentum,
              device=d2l.try_gpu()):
    '''Train the 1D-CNN model.

    Keyword arguments:
    net -- model to train
    train_iter -- training data iterator
    test_iter -- test data iterator
    num_epochs -- number of training epochs
    lr -- learning rate
    momentum -- momentum for the optimizer
    device -- device to run the training on (default: GPU if available)
    '''
    def init_weights(m):
        if type(m) == nn.Linear or type(m) == nn.Conv1d:
            nn.init.xavier_uniform_(m.weight)
    net.apply(init_weights)
    print('training on', device)
    net.to(device)
    optimizer = torch.optim.SGD(net.parameters(), lr=lr, momentum=momentum)
    #optimizer = torch.optim.Adam(net.parameters(), lr = lr)
    loss = nn.CrossEntropyLoss()
    animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs],
                            legend=['train loss', 'train acc', 'test acc'])
    timer, num_batches = d2l.Timer(), len(train_iter)
    for epoch in range(num_epochs):
        # Sum of training loss, sum of training accuracy, no. of examples
        metric = d2l.Accumulator(3)
        net.train()
        for i, (X, y) in enumerate(train_iter):
            timer.start()
            optimizer.zero_grad()
            X, y = X.to(device), y.to(device)
            y_hat = net(X)
            l = loss(y_hat, y)
            l.backward()
            optimizer.step()
            with torch.no_grad():
                metric.add(l * X.shape[0], d2l.accuracy(y_hat, y), X.shape[0])
            timer.stop()
            train_l = metric[0] / metric[2]
            train_acc = metric[1] / metric[2]
            if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
              animator.add(epoch + (i + 1) / num_batches,
              (train_l, train_acc, None))
        test_acc = evaluate_accuracy_gpu(net, test_iter)
        animator.add(epoch + 1, (None, None, test_acc))
    print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, '
          f'test acc {test_acc:.3f}')
    print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec '
          f'on {str(device)}')


def evaluate_accuracy_gpu(net, data_iter, device=None):  
    '''Evaluate the accuracy of the model using a GPU.

    Keyword arguments:
    net -- model to evaluate
    data_iter -- data iterator for the dataset to evaluate on
    device -- device to run the evaluation on (default: None)
    Returns:
    accuracy -- proportion of correct predictions
    '''
    if isinstance(net, torch.nn.Module):
        net.eval()  # Set the model to evaluation mode
        if not device:
            device = next(iter(net.parameters())).device
    # No. of correct predictions, no. of predictions
    metric = d2l.Accumulator(2)
    for X, y in data_iter:
        X = X.to(device)
        y = y.to(device)
        metric.add(d2l.accuracy(net(X), y), y.numel())
    return metric[0] / metric[1]

#Parsing files
'''
datalist, labellist = read("project_datasets/len100_200_n1000.seq",
                           "project_datasets/len100_200_n1000.pos"
'''
datalist, labellist = read("project_datasets/len200_500_n1000.seq",
                           "project_datasets/len200_500_n1000.pos")
'''
datalist, labellist = read("project_datasets/len200_500_n5000nr1.seq",
                           "project_datasets/len200_500_n5000nr1.pos")
datalist, labellist = read("project_datasets/len200_500_n5000nr2.seq",
                           "project_datasets/len200_500_n5000nr2.pos")
datalist, labellist = read("project_datasets/len200_500_n5000nr3.seq",
                           "project_datasets/len200_500_n5000nr3.pos")
datalist, labellist = read("project_datasets/len200_500_n5000nr4.seq",
                           "project_datasets/len200_500_n5000nr4.pos")
'''

#Parsing files and decreasing positives by 50%
'''
datalist, labellist = read_halfpos("project_datasets/len100_200_n1000.seq",
                                   "project_datasets/len100_200_n1000.pos")
'''

#splitting datset into trainset and testset
traindataset,testdataset =  generate_train_test(datalist, labellist)


#Set batch size and number of steps (maximum sequence length) 
#change num_steps when using the len 100_200
train_iter=load_data(20,200,traindataset) 
test_iter=load_data(20,200,testdataset)

#The 1D-CNN network architecture
net = nn.Sequential(
    # Convolutional layer
    nn.Conv1d(in_channels=21, out_channels=32, kernel_size=9, stride=1, padding=4),
    # ReLU activation function
    nn.ReLU(),
    # Max pooling layer
    nn.MaxPool1d(kernel_size=2),

    # Flatten layer
    nn.Flatten(),

    # First fully connected layer, outputs 120
    nn.Linear(32 * 100, 120),
    # ReLU activation function
    nn.ReLU(),
    # First dropout layer
    nn.Dropout(0.8),

    # Second fully connected layer, outputs 84
    nn.Linear(120, 84),
    # ReLU activation function
    nn.ReLU(),
    # Second dropout layer
    nn.Dropout(0.5),

    # Final fully connected layer, outputs 2 (for two classes)
    nn.Linear(84, 2),
)

#Training the model
train_ch6(net, train_iter, test_iter, num_epochs=40, lr=0.01, momentum = 0.9)

Manually look at the predicted output of one iteration

In [None]:
x,y = next(iter(test_iter))
device=d2l.try_gpu()
X, y = x.to(device), y.to(device)
y_hat = net(X)
print(y_hat.argmax(axis=1))
print(y)

#1D-CNN on the biological dataset

In [None]:
def read(seqfile, *go_files):
    '''Parse a sequence file and multiple Gene Ontology files.

    Keyword arguments:
    seqfile -- file containing sequences and labels
    *go_files -- variable-length list of files containing Gene Ontology annotations
    Returns:
    datalist -- list containing all sequences
    labellist -- list containing labels as integers (0 for not found,
    1-n for corresponding GO files)
    '''
    datalist = []
    labels = []
    labellist = []
    with open(seqfile, 'r') as seq:
        for line in seq:
            line_spl = line.strip().split("\t")
            datalist.append(line_spl[1])
            labels.append(line_spl[0])
    go_lists = []
    for go_file in go_files:
        go_list = []
        with open(go_file, 'r') as f:
            for line in f:
                go_list.append(line.strip())
        go_lists.append(go_list)
    for item in range(len(labels)):
        found = False
        for i, go_list in enumerate(go_lists):
            if labels[item] in go_list:
                labellist.append(i + 1)
                found = True
                break
        if not found:
          labellist.append(0)
    return datalist, labellist

def generate_train_test(datalist, labellist):
  '''Split data into a training set (70%) and test set (30%). 

  Keyword arguments:
  datalist -- list containing all sequences obtained from the read function
  labellist -- list containing labels in binary format obtained from the 
  read function
  Returns:
  traindatalist -- list containing 70% of the data for the trainset
  trainlabellist -- list containing the pos labels for the trainset
  testdatalist -- list containing 30% of the data for testset
  testlabellist -- list containing the pos labels for the testset
  ! Data and labels are put into a tuple.
  '''
  length_seq = len(datalist)
  len_70 = round(length_seq * 0.7)
  traindatalist = []
  trainlabellist = []
  testdatalist = []
  testlabellist= []
  for count, seq in enumerate(datalist):
    if count < len_70:
      traindatalist.append(seq)
    else:
      testdatalist.append(seq)
  for count, pos in enumerate(labellist):
    if count < len_70:
      trainlabellist.append(pos)
    else:
      testlabellist.append(pos)
  return (traindatalist, trainlabellist), (testdatalist, testlabellist)

def load_data(batch_size, num_steps, dataset):
    '''Load data, tokenize and convert to one-hot encoding.

    Keyword arguments:
    batch_size -- the batch size for data loading
    num_steps -- the number of steps for each sequence
    dataset -- the dataset containing sequences and labels obtained 
    from the generate_train_test function.
    Returns:
    data_iter -- data iterator with batched one-hot encoded sequences
    '''
    mapaa2num = {aa: i for (i, aa) in enumerate(list("ACDEFGHIKLMNPQRSTVWY"))}
    seq, lab = dataset
    seq = tokenize(seq, mapaa2num)
    seq_array = build_seq_array(seq, num_steps)
    # Convert to one-hot encoding
    one_hot_seq_array = F.one_hot(seq_array, num_classes=21).float()
    # Transpose the dimensions
    one_hot_seq_array = one_hot_seq_array.transpose(1, 2)
    # Add extra dimension to the target tensor and change data type to float
    data_arrays = (one_hot_seq_array, torch.tensor(lab, dtype=torch.long))
    data_iter = d2l.load_array(data_arrays, batch_size)
    return data_iter

def tokenize(dat, map2num,non_aa_num=20):
  '''Tokenize the input data by mapping amino acids to numbers.

  Keyword arguments:
  dat -- input data as list of sequences
  map2num -- dictionary mapping amino acids to numbers
  non_aa_num -- number to replace non-amino acid characters (default: 20)
  Returns:
  seq -- list of tokenized sequences
  '''
  seq = []
  for count, i in enumerate(dat):
    seq.append([map2num.get(j,non_aa_num) for j in list(i)])
  return seq

def build_seq_array(lines, num_steps,non_aa_num=20):
  '''Build a sequence array from the input data.

  Keyword arguments:
  lines -- input data as list of sequences
  num_steps -- the number of steps for each sequence
  non_aa_num -- number to replace non-amino acid characters (default: 20)
  Returns:
  array -- tensor containing the padded or truncated sequences
  '''
  array = torch.tensor([
  truncate_pad(l, num_steps, non_aa_num) for l in lines])
  return array

def truncate_pad(line, num_steps, padding_token):
  '''Truncate or pad a sequence based on the specified number of steps.

  Keyword arguments:
  line -- input sequence as a list of tokens
  num_steps -- the number of steps for each sequence
  padding_token -- token to pad sequences shorter than num_steps
  Returns:
  truncated or padded sequence
  '''
  if len(line) > num_steps:
    return line[:num_steps] # Truncate the sequence if needed
  return line + [padding_token] * (num_steps - len(line)) # Padding


def train_ch6(net, train_iter, test_iter, num_epochs, lr, momentum,
              device=d2l.try_gpu()):
    '''Train the 1D-CNN model.

    Keyword arguments:
    net -- model to train
    train_iter -- training data iterator
    test_iter -- test data iterator
    num_epochs -- number of training epochs
    lr -- learning rate
    momentum -- momentum for the optimizer
    device -- device to run the training on (default: GPU if available)
    '''
    def init_weights(m):
        if type(m) == nn.Linear or type(m) == nn.Conv1d:
            nn.init.xavier_uniform_(m.weight)
    net.apply(init_weights)
    print('training on', device)
    net.to(device)
    optimizer = torch.optim.SGD(net.parameters(), lr=lr, momentum=momentum)
    #optimizer = torch.optim.Adam(net.parameters(), lr = lr)
    loss = nn.CrossEntropyLoss()
    animator = d2l.Animator(xlabel='epoch', xlim=[1, num_epochs],
                            legend=['train loss', 'train acc', 'test acc'])
    timer, num_batches = d2l.Timer(), len(train_iter)
    for epoch in range(num_epochs):
        # Sum of training loss, sum of training accuracy, no. of examples
        metric = d2l.Accumulator(3)
        net.train()
        for i, (X, y) in enumerate(train_iter):
            timer.start()
            optimizer.zero_grad()
            X, y = X.to(device), y.to(device)
            y_hat = net(X)
            l = loss(y_hat, y)
            l.backward()
            optimizer.step()
            with torch.no_grad():
                metric.add(l * X.shape[0], d2l.accuracy(y_hat, y), X.shape[0])
            timer.stop()
            train_l = metric[0] / metric[2]
            train_acc = metric[1] / metric[2]
            if (i + 1) % (num_batches // 5) == 0 or i == num_batches - 1:
                animator.add(epoch + (i + 1) / num_batches,
                             (train_l, train_acc, None))
        test_acc = evaluate_accuracy_gpu(net, test_iter)
        animator.add(epoch + 1, (None, None, test_acc))
    print(f'loss {train_l:.3f}, train acc {train_acc:.3f}, '
          f'test acc {test_acc:.3f}')
    print(f'{metric[2] * num_epochs / timer.sum():.1f} examples/sec '
          f'on {str(device)}')


def evaluate_accuracy_gpu(net, data_iter, device=None):  
    '''Evaluate the accuracy of the model using a GPU.

    Keyword arguments:
    net -- model to evaluate
    data_iter -- data iterator for the dataset to evaluate on
    device -- device to run the evaluation on (default: None)
    Returns:
    accuracy -- proportion of correct predictions
    '''
    if isinstance(net, torch.nn.Module):
        net.eval()  # Set the model to evaluation mode
        if not device:
            device = next(iter(net.parameters())).device
    # No. of correct predictions, no. of predictions
    metric = d2l.Accumulator(2)
    for X, y in data_iter:
        X = X.to(device)
        y = y.to(device)
        metric.add(d2l.accuracy(net(X), y), y.numel())
    return metric[0] / metric[1]


#Parsing files
datalist, labellist = read("project_datasets/expr5Tseq_filtGO_100-1000.lis",
                           "project_datasets/GO_3A0055085.annotprot",
                           "project_datasets/GO_3A0043066.annotprot",
                           "project_datasets/GO_3A0007165.annotprot",
                           "project_datasets/GO_3A0005739.annotprot",
                           "project_datasets/GO_3A0005576.annotprot")

#Calculate number of output classes in the network
out_classes = len(("project_datasets/GO_3A0055085.annotprot",
                   "project_datasets/GO_3A0043066.annotprot",
                   "project_datasets/GO_3A0007165.annotprot",
                   "project_datasets/GO_3A0005739.annotprot",
                   "project_datasets/GO_3A0005576.annotprot")) + 1


#splitting datset into trainset and testset
traindataset,testdataset =  generate_train_test(datalist, labellist)


#Set batch size and number of steps (maximum sequence length) 
train_iter=load_data(20,1000,traindataset) 
test_iter=load_data(20,1000,testdataset)

#The 1D-CNN network architecture
net = nn.Sequential(
    # Convolutional layer
    nn.Conv1d(in_channels=21, out_channels=32, kernel_size=9, stride=1, padding=4),
    # ReLU activation function
    nn.ReLU(),
    # Max pooling layer
    nn.MaxPool1d(kernel_size=2),

    # Flatten layer
    nn.Flatten(),

    # First fully connected layer, outputs 120
    nn.Linear(32 * 500, 120),
    # ReLU activation function
    nn.ReLU(),
    # First dropout layer
    nn.Dropout(0.8),

    # Second fully connected layer, outputs 84
    nn.Linear(120, 84),
    # ReLU activation function
    nn.ReLU(),
    # Second dropout layer
    nn.Dropout(0.5),

    # Final fully connected layer, outputs=out classes (6 for used dataset)
    nn.Linear(84, out_classes),
)

#Training the model
train_ch6(net, train_iter, test_iter, num_epochs=40, lr=0.01, momentum = 0.9)

Manually look at the predicted output of one iteration

In [None]:
x,y = next(iter(test_iter))
device=d2l.try_gpu()
X, y = x.to(device), y.to(device)
y_hat = net(X)
print(y_hat.argmax(axis=1))
print(y)
