# Program 1: Peptide Classification - using Pytorch


In [1]:
import numpy as np

np.random.seed(400)

## Import Data

We import the training and test data from the given files

In [2]:
def parse_training_data(file_path):
    x_train = []
    y_train = []
    
    with open(file_path, 'r') as file:
        lines = file.readlines()
        for line in lines:
            line = line.strip()
            line = line.split("\t")
            #print(line)
            y_train.append(int(line[0]))
            x_train.append(line[1])
    return x_train, y_train

def parse_testing_data(file_path):
    x_test = []
    
    with open(file_path, 'r') as file:
        lines = file.readlines()
        for line in lines:
            line = line.strip()
            #print(line)
            x_test.append(line)
    
    return x_test 

In [3]:
X_train_input, Y_train_input = parse_training_data('train.dat')
X_test_input = parse_testing_data('test.dat')

## Generate KMERs

Generate kmers for train and test data

In [4]:
def kmer(seq, k):
    return [seq[i:i+k] for i in range(0, len(seq)-k+1)]

def kmers(seq, k):
    kmers = []
    for i in range(1,k):
        kmers.extend(kmer(seq, k=i))
    return kmers

In [5]:
X_train_kmers = [kmers(seq, k=3) for seq in X_train_input]
X_test_kmers = [kmers(seq, k=3) for seq in X_test_input]
#print(X_train_encoded)

## Encode KMERs

Encode kmers into matrix

In [6]:
def generate_kmer_mapping(train_kmers, test_kmers):
    mapping = {}
    for peptide in train_kmers+test_kmers:
        for amino_acid in peptide:
            if amino_acid not in mapping:
                mapping[amino_acid] = len(mapping)
    return mapping

def encoding_matrix(data, mapping):
    num_rows = len(data)
    num_cols = len(mapping)
    
    matrix = np.zeros((num_rows, num_cols), dtype=float)
    
    for i, peptide in enumerate(data):
        unique_elements, counts = np.unique(peptide, return_counts=True)
        
        for kmer, count in zip(unique_elements, counts):
            if kmer in mapping:
                matrix[i, mapping[kmer]] = count
    
    return matrix

In [7]:
mapping = generate_kmer_mapping(X_train_kmers, X_test_kmers)

X_train_encoded = encoding_matrix(X_train_kmers, mapping)
X_test_encoded = encoding_matrix(X_test_kmers, mapping)

X_train_encoded = np.array(X_train_encoded)
X_test_encoded = np.array(X_test_encoded)
Y_train_input = np.array(Y_train_input)

## Balance Dataset

There is an imbalance of 1 and -1 samples
Need to make copies of 1 samples to balance the dataset

In [8]:
ones = X_train_encoded[Y_train_input == 1]
neg_ones = X_train_encoded[Y_train_input == -1]

#create copies
num_copies = int((neg_ones.shape[0] - ones.shape[0]) / 2)
print(num_copies)
copies = ones[np.random.randint(ones.shape[0], size=num_copies)]

#add copies to labels and values
balanced_X = np.vstack((X_train_encoded, copies))
balanced_Y = np.hstack((Y_train_input, np.ones(num_copies)))

641


## Shuffle Data

In [9]:
shuffle = np.arange(balanced_X.shape[0])
np.random.shuffle(shuffle)

X_train_shuffled = balanced_X[shuffle]
Y_train_shuffled = balanced_Y[shuffle]

## Create Validation Set

In [10]:
half = int(X_train_shuffled.shape[0] / 2)

#Training Set
X_train = X_train_shuffled[half:]
Y_train = Y_train_shuffled[half:]

#Validation Set
X_test_validation = X_train_shuffled[:half]
Y_test_validation = Y_train_shuffled[:half]

#Test Set
X_test = X_test_encoded

## Methods for Model Definition

In [30]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import torch.optim as optim

# Hyper-parameters 
epochs = 350
learning_rate = 0.01

In [56]:
class FeedForwardTorch:
    def __init__(self):
        self.layers = nn.ModuleList()
        self.loss_function = nn.MSELoss()
        #self.loss_derivative = nn.MSELoss

    def train(self, X_train, Y_train, epochs, learning_rate):
        optimizer = optim.SGD(self.parameters(), lr=learning_rate)

        for epoch in range(epochs):
            losses = 0
            for i in range(len(X_train)):
                output = X_train[i]
                for layer in self.layers:
                    output = layer(output)

                #loss
                loss = self.loss_function(output, Y_train[i].unsqueeze(0))
                losses += loss.item()

                # Backwards
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            average_loss = losses / len(X_train)
            if epoch % 20 == 0:
                print(f"Epoch {epoch}, Loss {average_loss:.8f}")

    #def train(self, X_train, Y_train, epochs, learning_rate):
    #    optimizer = optim.SGD(self.parameters(), lr=learning_rate)
        
    #    for epoch in range(epochs):
    #        total_loss = 0

    #        for i in range(len(X_train)):
                # Forwards
    #            output = X_train[i]
    #            for layer in self.layers:
    #                output = layer(output)

                #loss
    #            loss = self.loss_function(output, Y_train[i].unsqueeze(0))
    #            total_loss += loss.item()

                # Backwards
     #           optimizer.zero_grad()
     #           loss.backward()
     #           optimizer.step()

      #      average_loss = total_loss / len(X_train)
      #      if epoch % 20 == 0:
      #          print(f"Epoch {epoch}, Loss {average_loss:.8f}")

    def predict(self, X):
        outputs = []
        y_hat = []

        for i in range(len(X)):
            output = X[i]
            for layer in self.layers:
                output = layer(output)
            outputs.append(output)

        outputs = torch.stack([output[0] for output in outputs]).detach()
        y_hat = torch.where(outputs >= 0.5, 1, -1)

        return y_hat

    def parameters(self):
        return [p for layer in self.layers for p in layer.parameters()]

In [57]:
class Layer(nn.Module):
    def __init__(self, input_size, output_size):
        super(Layer, self).__init__()
        self.linear = nn.Linear(input_size, output_size)

    def forward(self, x):
        return self.linear(x)

# Instantiate the model
model = FeedForwardTorch()
model.layers.append(Layer(436, 64))
model.layers.append(Layer(64, 32))
model.layers.append(Layer(32, 16))
model.layers.append(Layer(16, 1))

ModuleList(
  (0): Layer(
    (linear): Linear(in_features=436, out_features=64, bias=True)
  )
  (1): Layer(
    (linear): Linear(in_features=64, out_features=32, bias=True)
  )
  (2): Layer(
    (linear): Linear(in_features=32, out_features=16, bias=True)
  )
  (3): Layer(
    (linear): Linear(in_features=16, out_features=1, bias=True)
  )
)

## Main Method

Create model layers, train model, and make predictions

In [58]:
if __name__ == "__main__":

    #feed in each sample one at a time -> add dimension to data sets
    X_train_matrix = np.expand_dims(X_train, axis=1)
    Y_train_matrix = np.expand_dims(Y_train, axis=1)

    X_test_validation_matrix = np.expand_dims(X_test_validation, axis=1)

    X_test_matrix = np.expand_dims(X_test, axis=1)

    X_tensor = torch.from_numpy(X_train_matrix).float()
    Y_tensor = torch.from_numpy(Y_train_matrix).float()

    X_test_validation_tensor = torch.from_numpy(X_test_validation_matrix).float()

    X_test_tensor = torch.from_numpy(X_test_matrix).float()

    
    #print(X_tensor)
    #print(Y_tensor)

    
    #ff = FeedForward()
    #ff.layers.append(DenseLayer(436, 64))
    #ff.layers.append(ActivationFunctionLayer(tanh, tanh_derivative)) 
    #ff.layers.append(DenseLayer(64, 32))
    #ff.layers.append(ActivationFunctionLayer(tanh, tanh_derivative))
    #ff.layers.append(DenseLayer(32, 16))
    #ff.layers.append(ActivationFunctionLayer(tanh, tanh_derivative))
    #ff.layers.append(DenseLayer(16, 1))
    #ff.layers.append(ActivationFunctionLayer(tanh, tanh_derivative))
    
    #ff.train(X_train_matrix, Y_train_matrix, epochs=200, learning_rate=0.01)

    model.train(X_tensor, Y_tensor, epochs, learning_rate)

    #prediction on validation set
    Y_hat_test_validation = model.predict(X_test_validation_tensor)


Epoch 0, Loss 0.40114225
Epoch 20, Loss 0.16741812
Epoch 40, Loss 0.15037435
Epoch 60, Loss 0.13609270
Epoch 80, Loss 0.11664521
Epoch 100, Loss 0.10003806
Epoch 120, Loss 0.09154387
Epoch 140, Loss 0.08565791
Epoch 160, Loss 0.08253271
Epoch 180, Loss 0.07942265
Epoch 200, Loss 0.07709282
Epoch 220, Loss 0.07573644
Epoch 240, Loss 0.07438206
Epoch 260, Loss 0.07378084
Epoch 280, Loss 0.07349369
Epoch 300, Loss 0.07115531
Epoch 320, Loss 0.06967966
Epoch 340, Loss 0.06842111


In [61]:
def mcc(y, y_hat):

    TP = np.sum((y == 1) & (y_hat == 1)) / len(y)
    #print(TP)
    TN = np.sum((y == -1) & (y_hat == -1)) / len(y)
    #print(TN)
    FP = np.sum((y == -1) & (y_hat == 1)) / len(y)
    #print(FP)
    FN = np.sum((y == 1) & (y_hat == -1)) / len(y)
    #print(FN)

    #print(((TP * TN) - (FP * FN)))

    #print(np.sqrt((TP + FP) * (TP + FN) * (TN + FP) * (TN + FN)))
    
    return ((TP * TN) + (FP * FN)) / np.sqrt((TP + FP) * (TP + FN) * (TN + FP) * (TN + FN))

Y_hat_array = Y_hat_test_validation.numpy()

#Y_test_validation_matrix = np.expand_dims(Y_test_validation, axis=1)
#Y_test_validation_tensor = torch.from_numpy(Y_test_validation_matrix).float()

print("MCC: ", mcc(Y_test_validation, Y_hat_array))

MCC:  0.4163026153212167


## Generate Test Set Predictions

In [None]:
# predict
#Y_hat_test = ff.predict(X_test_tensor)

#generate test predictions
#np.savetxt("predictions.dat", Y_hat_test, fmt="%d")