# Small Neural Network that takes as input both the aptamer features and the peptide features to predict affinity.

## Generate features for both aptamers and peptides + construct training/test sets

In [4]:
import json
import random
import numpy as np
from sklearn import linear_model, metrics
from sklearn.svm import SVC
from scipy import stats
import random
import re
import os
import torch
from torch.utils.data import Dataset, DataLoader

In [5]:
# Hyperparameters
lr = 0.005
d = 200
samples = 10000
split = 8000
k_apt = 4
k_pep = 4
random.seed(42)
device = torch.device('cuda')
torch.cuda.get_device_properties(0)


_CudaDeviceProperties(name='TITAN Xp', major=6, minor=1, total_memory=12196MB, multi_processor_count=30)

In [6]:
'''
Function to classify binding affinity of a sample. 
'''
def classify_affinity(affinity):
    if float(affinity) <= 9:
        return 0
    elif float(affinity) <= 50:
        return 1
    elif float(affinity) <= 400:
        return 2
    return 3

In [7]:
dataset_file = "../data/mhcflurry_dataset.json"
'''
Constructs a dataset that has 10,000 pairs for every class of binding affinity. 
'''
def construct_dataset():
    with open(dataset_file, 'r') as f:
        mhcflurry_data = json.load(f)
    
    # Full dataset. The index of the list corresponds to the binding affinity class
    full_dataset = [[], [], [], []]
    for allele in mhcflurry_data:
        peptides = mhcflurry_data[allele]
        for p, b in peptides:
            affinity_class = classify_affinity(b)
            full_dataset[affinity_class].append((allele, p))
    
    subsampled_dataset = [[], [], [], []]
    
    for i in range(len(full_dataset)):
        full_class = np.asarray(full_dataset[i])
        # Sample the hardcoded number of samples pairs randomly
        subsampled_dataset[i] = np.copy(full_class[np.random.choice(full_class.shape[0], samples, replace=False), :])
    
    subsampled_dataset = np.asarray(subsampled_dataset)    
    return subsampled_dataset

In [8]:
subsampled_dataset = construct_dataset()

In [9]:
'''
Extracts features from the subsampled dataset
'''
def extract_features(dataset, d, k_apt, k_pep):
    # Number of features
    aptamer_features = [[], [], [], []]
    peptide_features = [[], [], [], []]

    for i in range(dataset.shape[0]):
        flattened = dataset[i].flatten('F')
        all_aptamers = flattened[:samples]
        all_peptides = flattened[samples:]

        split = int(0.8*len(all_aptamers))
        all_aptamers = all_aptamers[:split]
        all_peptides = all_peptides[:split]

        # Generate the aptamer features randomly
        for j in range(d):
            # Find a random aptamer
            apt = random.choice(all_aptamers)

            # Find a random subsection of k elements from this sequence and the quartile
            start = random.randint(0, len(apt)-k_apt)
            quartile_pctg = (start + 1)/float(len(apt))
            if quartile_pctg <= 0.25:
                quartile = 1
            elif quartile_pctg > 0.25 and quartile_pctg <= 0.5:
                quartile = 2
            elif quartile_pctg > 0.5 and quartile_pctg <= 0.75:
                quartile = 3
            else:
                quartile = 4
            
            aptamer_features[i].append((apt[start:start+k_apt], quartile))

        # Generate the peptide features randomly
        for j in range(d):
            # Find a random aptamer
            pep = random.choice(all_peptides)

            # Find a random subsection of k elements from this sequence
            start = random.randint(0, len(pep)-k_pep)
            quartile_pctg = (start + 1)/float(len(pep))
            if quartile_pctg <= 0.25:
                quartile = 1
            elif quartile_pctg > 0.25 and quartile_pctg <= 0.5:
                quartile = 2
            elif quartile_pctg > 0.5 and quartile_pctg <= 0.75:
                quartile = 3
            else:
                quartile = 4
            
            peptide_features[i].append((pep[start:start+k_pep], quartile))


    return aptamer_features, peptide_features, split
  

In [10]:
aptamer_features, peptide_features, split = extract_features(subsampled_dataset, d=d, k_apt=k_apt, k_pep=k_pep)

In [11]:
'''
Generates training and testing sets. Training is the first 8000 samples, test is the last 2000 samples. 
'''
def construct_train_test_sets(aptamer_features, peptide_features, split):
    train_pairs = [[], [], [], []]
    test_pairs = [[], [], [], []]
    
    for c in range(len(subsampled_dataset)):
        train_pairs[c] = subsampled_dataset[c][:split]
        test_pairs[c] = subsampled_dataset[c][split:]
    
    train_pairs = np.asarray(train_pairs)
    test_pairs = np.asarray(test_pairs)
    
    train_aptamers = [[], [], [], []]
    test_aptamers = [[], [], [], []]
    
    train_peptides = [[], [], [], []]
    test_peptides = [[], [], [], []]
    
    # Make a 0/1 matrix for the training aptamers/peptides
    for i in range(len(train_aptamers)):
        pairs = train_pairs[i]
        apt_features = aptamer_features[i]
        pep_features = peptide_features[i]
        
        for j in range(len(pairs)):
            a, p = pairs[j]
            matrix_aptamer_train = []
            matrix_peptide_train = []
            
            for k in range(len(apt_features)):
                feat, quartile = apt_features[k]
                starts = [m.start() for m in re.finditer(feat, a)]
                if len(starts) == 0:
                    matrix_aptamer_train.append(0)
                    continue
                exists = False
                for s in starts:
                    pctg = (s + 1) / len(a)
                    if pctg <= 0.25 and quartile == 1:
                        exists = True
                        break
                    elif (pctg > 0.25 and pctg <= 0.5) and quartile == 2:
                        exists = True
                        break
                    elif (pctg > 0.5 and pctg <= 0.75) and quartile == 3:
                        exists = True
                        break
                    elif pctg > 0.75 and quartile == 4:
                        exists = True
                        break
                    else:
                        exists = False
                if exists:
                    matrix_aptamer_train.append(1)
                if not exists:
                    matrix_aptamer_train.append(0)
            
            train_aptamers[i].append(matrix_aptamer_train)
            
            for k in range(len(pep_features)):
                feat, quartile = pep_features[k]
                starts = [m.start() for m in re.finditer(feat, p)]
                if len(starts) == 0:
                    matrix_peptide_train.append(0)
                    continue
                exists = False
                for s in starts:
                    pctg = (s + 1) / len(p)
                    if pctg <= 0.25 and quartile == 1:
                        exists = True
                        break
                    elif (pctg > 0.25 and pctg <= 0.5) and quartile == 2:
                        exists = True
                        break
                    elif (pctg > 0.5 and pctg <= 0.75) and quartile == 3:
                        exists = True
                        break
                    elif pctg > 0.75 and quartile == 4:
                        exists = True
                        break
                    else:
                        exists = False
                
                if exists:
                    matrix_peptide_train.append(1)
                if not exists:
                    matrix_peptide_train.append(0)
                    
            train_peptides[i].append(matrix_peptide_train)
    
    train_aptamers = np.asarray(train_aptamers)
    train_peptides = np.asarray(train_peptides)
    
    # Make a 0/1 matrix for the testing aptamers/peptides

    for i in range(len(test_aptamers)):
        pairs = test_pairs[i]
        apt_features = aptamer_features[i]
        pep_features = peptide_features[i]
        
        for j in range(len(pairs)):
            a, p = pairs[j]
            matrix_aptamer_test = []
            matrix_peptide_test = []
            
            for k in range(len(apt_features)):
                feat, quartile = apt_features[k]
                starts = [m.start() for m in re.finditer(feat, a)]
                if len(starts) == 0:
                    matrix_aptamer_test.append(0)
                    continue
                exists = False
                for s in starts:
                    # Each s is an index of the beginning of this features
                    # If one of them appears in the correct quartile, then this is 1
                    pctg = (s + 1) / len(a)
                    if pctg <= 0.25 and quartile == 1:
                        exists = True
                        break
                    elif (pctg > 0.25 and pctg <= 0.5) and quartile == 2:
                        exists = True
                        break
                    elif (pctg > 0.5 and pctg <= 0.75) and quartile == 3:
                        exists = True
                        break
                    elif pctg > 0.75 and quartile == 4:
                        exists = True
                        break
                    else:
                        exists = False
                if exists:
                    matrix_aptamer_test.append(1)
                if not exists:
                    matrix_aptamer_test.append(0)
                        
            test_aptamers[i].append(matrix_aptamer_test)
            
            for k in range(len(pep_features)):
                feat, quartile = pep_features[k]
                starts = [m.start() for m in re.finditer(feat, p)]
                if len(starts) == 0:
                    matrix_peptide_test.append(0)
                    continue
                exists = False
                for s in starts:
                    pctg = (s + 1) / len(p)
                    if pctg <= 0.25 and quartile == 1:
                        exists = True
                        break
                    elif (pctg > 0.25 and pctg <= 0.5) and quartile == 2:
                        exists = True
                        break
                    elif (pctg > 0.5 and pctg <= 0.75) and quartile == 3:
                        exists = True
                        break
                    elif pctg > 0.75 and quartile == 4:
                        exists = True
                        break
                    else:
                        exists = False
                if exists:
                    matrix_peptide_test.append(1)
                else:
                    matrix_peptide_test.append(0)

            test_peptides[i].append(matrix_peptide_test)
                
    test_aptamers = np.asarray(test_aptamers)
    test_peptides = np.asarray(test_peptides)
    
    
    return train_aptamers, train_peptides, test_aptamers, test_peptides
    
    
    

In [12]:
train_aptamers, train_peptides, test_aptamers, test_peptides = construct_train_test_sets(aptamer_features, peptide_features, split)

In [13]:
print(str(train_aptamers.shape))
print(str(train_peptides.shape))
print(str(test_aptamers.shape))
print(str(test_peptides.shape))

(4, 8000, 200)
(4, 8000, 200)
(4, 2000, 200)
(4, 2000, 200)


## Construct a Pytorch DataLoader

In [14]:
class AptamerPeptideDataset(Dataset):
    '''
    @param: peptides = n*m
    @param: aptamers = n*m
    @param: affinities = n*1
    '''
    def __init__(self, peptides, aptamers, affinities):
        self.peptides = peptides
        self.aptamers = aptamers
        affinities = np.reshape(affinities, (affinities.shape[0], 1))
        self.affinities = affinities
    
    def __len__(self):
        return self.peptides.shape[0]
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        pep = self.peptides[idx]
        apt = self.aptamers[idx]        
        aff_class = self.affinities[idx]
        
        sample = {'peptide': pep, 'aptamer': apt, 'affinity': aff_class}
        
        return sample
        
        

In [15]:
# Reshape the dataset to fit the dataset class
def reshape_dataset(train_aptamers, test_aptamers, train_peptides, test_peptides):
    all_train_aptamers = []
    all_test_aptamers = []
    for i in range(len(train_aptamers)):
        all_train_aptamers.extend(train_aptamers[i])
        all_test_aptamers.extend(test_aptamers[i])

    # n * m
    all_train_aptamers = np.array(all_train_aptamers)
    all_test_aptamers = np.array(all_test_aptamers)

    all_train_peptides = []
    all_test_peptides = []
    for i in range(len(train_peptides)):
        all_train_peptides.extend(train_peptides[i])
        all_test_peptides.extend(test_peptides[i])

    # n * m
    all_train_peptides = np.array(all_train_peptides)
    all_test_peptides = np.array(all_test_peptides)


    # n * 1
    train_affinity_classes = np.repeat(np.array([[0, 1], [2, 3]]), split)
    test_affinity_classes = np.repeat(np.array([[0, 1], [2, 3]]), samples-split)

    return all_train_peptides, all_train_aptamers, all_test_peptides, all_test_aptamers, train_affinity_classes, test_affinity_classes

In [16]:
train_pep, train_apt, test_pep, test_apt, train_aff, test_aff = reshape_dataset(train_aptamers, test_aptamers, train_peptides, test_peptides)

In [17]:
print("Train pep shape: ", train_pep.shape)
print("Train apt shape: ", train_apt.shape)
print("Test apt shape: ", test_apt.shape)
print("Test_pep shape: ", test_pep.shape)

Train pep shape:  (32000, 200)
Train apt shape:  (32000, 200)
Test apt shape:  (8000, 200)
Test_pep shape:  (8000, 200)


In [18]:
train_dataset = AptamerPeptideDataset(train_pep, train_apt, train_aff)
test_dataset = AptamerPeptideDataset(test_pep, test_apt, test_aff)

In [19]:
trainloader = DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=1)
testloader = DataLoader(test_dataset, batch_size=1, shuffle=True, num_workers=1)

## Construct a small neural network

In [20]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam

In [21]:
# Define the network
class SmallNN(nn.Module):
    def __init__(self, d_value):
        super(SmallNN, self).__init__()
        self.fc1 = nn.Linear(d_value, 1024)
        self.prelu1 = nn.PReLU(num_parameters=1)
        self.fc2 = nn.Linear(1024, 512)
        self.prelu2 = nn.PReLU(num_parameters=1)
        self.lin1 = nn.Linear(512, 250)
        self.prelu3 = nn.PReLU(num_parameters=1)
        self.lin2 = nn.Linear(250, 100)
        self.fc3 = nn.Linear(250, 4)
        self.sequential = nn.Sequential(self.fc1, self.prelu1, self.fc2, self.prelu2, self.lin1, self.prelu3, self.fc3)
        self.fc4 = nn.Linear(8, 4)
       
    def forward(self, apt, pep):
        apt = apt.type(torch.FloatTensor)
        pep = pep.type(torch.FloatTensor)
        apt = self.sequential(apt)
        pep = self.sequential(pep)
        x = torch.cat((apt, pep), 1)
        x = self.fc4(x)
        x = F.log_softmax(x, dim=1)
        return x
    
    def loss(self, prediction, label):
        loss = nn.MSELoss()
        label = label.type(torch.FloatTensor)
        label = np.reshape(label, (1, 4))
        return loss(prediction, label)

In [22]:
model = SmallNN(d_value=d)
optimizer = Adam(model.parameters(), lr=lr)

In [23]:
# Training loop
for epoch in range(10):
    print("Epoch: ", epoch)
    model.train()
    for i, data in enumerate(trainloader):
        pep = data['peptide']
        apt = data['aptamer']
        label = data['affinity'].item()
        one_hot_label = [0] * 4
        one_hot_label[label] = 1
        one_hot_label = torch.tensor(one_hot_label)
        
        optimizer.zero_grad()
        output = model(pep, apt)
        loss = model.loss(output, one_hot_label)
        loss.backward()
        optimizer.step()
    
print('Finished Training')

Epoch:  0
Epoch:  1
Epoch:  2
Epoch:  3
Epoch:  4
Epoch:  5
Epoch:  6
Epoch:  7
Epoch:  8
Epoch:  9
Finished Training


In [24]:
save_path = 'small_nn.pth'
torch.save(model.state_dict(), save_path)

### Test the performance of the neural network

In [25]:
correct = 0
total = 0
with torch.no_grad():
    for i, data in enumerate(testloader):
        pep = data['peptide']
        apt = data['aptamer']
        label = data['affinity'].item()
        
        output = model(pep, apt)
        pred = torch.argmax(output).item()
        
        total += 1
        correct += (pred == label)

print('Accuracy of the network on the test samples: %d %%' % (100 * correct / total))

Accuracy of the network on the test samples: 25 %


## Experiments

In [None]:
def experimental_loop(d, k_apt, k_pep):
    # Construct the dataset
    print("Constructing the dataset")
    aptamer_features, peptide_features, split = extract_features(subsampled_dataset, d=d, k_apt=k_apt, k_pep=k_pep)
    train_aptamers, train_peptides, test_aptamers, test_peptides = construct_train_test_sets(aptamer_features, peptide_features, split)
    train_pep, train_apt, test_pep, test_apt, train_aff, test_aff = reshape_dataset(train_aptamers, test_aptamers, train_peptides, test_peptides)
    train_dataset = AptamerPeptideDataset(train_pep, train_apt, train_aff)
    test_dataset = AptamerPeptideDataset(test_pep, test_apt, test_aff)

    trainloader = DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=1)
    testloader = DataLoader(test_dataset, batch_size=1, shuffle=True, num_workers=1)

    # Construct the model
    print("Constructing the model")
    model = SmallNN(d_value=d)
    optimizer = Adam(model.parameters(), lr=lr)
    # Training loop
    for epoch in range(1):
        print("Epoch: ", epoch)
        model.train()
        for i, data in enumerate(trainloader):
            pep = data['peptide']
            apt = data['aptamer']
            label = data['affinity'].item()
            one_hot_label = [0] * 4
            one_hot_label[label] = 1
            one_hot_label = torch.tensor(one_hot_label)

            optimizer.zero_grad()
            output = model(pep, apt)
            loss = model.loss(output, one_hot_label)
            loss.backward()
            optimizer.step()

    print('Finished Training')

    # Testing loop
    correct = 0
    total = 0
    with torch.no_grad():
        for i, data in enumerate(testloader):
            pep = data['peptide']
            apt = data['aptamer']
            label = data['affinity'].item()

            output = model(pep, apt)
            pred = torch.argmax(output).item()

            total += 1
            correct += (pred == label)
    return correct, total

In [None]:
def experimental_testing_loop(d, k_apt, k_pep):
    # Construct the dataset
    aptamer_features, peptide_features, split = extract_features(subsampled_dataset, d=d, k_apt=k_apt, k_pep=k_pep)
    train_aptamers, train_peptides, test_aptamers, test_peptides = construct_train_test_sets(aptamer_features, peptide_features, split)
    train_pep, train_apt, test_pep, test_apt, train_aff, test_aff = reshape_dataset(train_aptamers, test_aptamers, train_peptides, test_peptides)
    train_dataset = AptamerPeptideDataset(train_pep, train_apt, train_aff)
    test_dataset = AptamerPeptideDataset(test_pep, test_apt, test_aff)

    trainloader = DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=1)
    testloader = DataLoader(test_dataset, batch_size=1, shuffle=True, num_workers=1)

    # Construct the model
    model = SmallNN(d_value=d)
    optimizer = Adam(model.parameters(), lr=lr)
    # Training loop
    for epoch in range(3):
        print("Epoch: ", epoch)
        model.train()
        for i, data in enumerate(trainloader):
            pep = data['peptide']
            apt = data['aptamer']
            label = data['affinity'].item()
            one_hot_label = [0] * 4
            one_hot_label[label] = 1
            one_hot_label = torch.tensor(one_hot_label)

            optimizer.zero_grad()
            output = model(pep, apt)
            loss = model.loss(output, one_hot_label)
            loss.backward()
            optimizer.step()


        # Testing loop
        correct = 0
        total = 0
        with torch.no_grad():
            for i, data in enumerate(testloader):
                pep = data['peptide']
                apt = data['aptamer']
                label = data['affinity'].item()

                output = model(pep, apt)
                pred = torch.argmax(output).item()

                total += 1
                correct += (pred == label)
        print('Accuracy of the network after ' + str(epoch) + ' epoch on the test samples: %d %%' % (100* correct/total))    

In [None]:
d_values = [1800, 2200, 2600, 3000, 3400]
for d in d_values:
    correct, total = experimental_loop(d=d, k_apt=4, k_pep=4)
    print('D-value', d)
    print('Accuracy of the network on the test samples: %d %%' % (100* correct/total))


In [None]:
k_apt_values = [4, 6, 8, 10]
k_pep_values = [2, 3, 4, 5, 6, 7]

for a in k_apt_values:
    for p in k_pep_values:
        correct, total = experimental_loop(d=1800, k_apt=a, k_pep=p)
        print("D-value " + str(d) + " K_apt " + str(a) + " K_pep " + str(p))
        print('Accuracy of the network on the test samples: %d %%' % (100* correct/total))

In [None]:
experimental_testing_loop(1800, 2, 5)

In [None]:
correct, total = experimental_loop(1800, 2, 5)
print('Accuracy of the network on the test samples: %d %%' % (100* correct/total))