In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import torch
from torch import nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

from sklearn.model_selection import train_test_split

### 1.   Prepare input data
-----

In [None]:
# Function: determine PHA-L read cut-offs for binary classification 
def categorize_lectin(data_all, quantile_high, quantile_low, ref_col_loc):
    cutoff = np.quantile(data_all.iloc[:,ref_col_loc], [quantile_high, quantile_low], interpolation="nearest").tolist()
    print(f"Cut-off for PHA-L high: {cutoff[0]}; Cut-off for PHA-L low: {cutoff[1]}")
    
    high_indices = np.array(data_all.iloc[:,ref_col_loc]>=cutoff[0])
    low_indices = np.array(data_all.iloc[:,ref_col_loc]<cutoff[1])
    high_low_indices = np.logical_or(high_indices, low_indices)

    high_count = high_indices.sum()
    low_count = low_indices.sum()
    
    return cutoff, [high_indices, low_indices, high_low_indices], [high_count, low_count]

In [None]:
# Load input file
input_df = pd.read_csv('TIL_transformed_data.csv')

In [None]:
# Process data: binary classification
quantile_high, quantile_low = 0.75, 0.25
cutoff, indices, count = categorize_lectin(input_df, quantile_high, quantile_low, -1)

input_df.loc[indices[0], "PHA-L"] = 1
input_df.loc[indices[1], "PHA-L"] = 0

input_df = input_df.loc[indices[2], :]

In [None]:
#y: class array
y = input_df['PHA-L'].values 
#X: transcript data array
X = input_df.iloc[:, 1:-1].values

In [None]:
# Split training, validation and test set
X_train_val, X_test, y_train_val, y_test = train_test_split(
    X, y, test_size=0.1, random_state=342, stratify=y)

X_train, X_val, y_train, y_val = train_test_split(
    X_train_val, y_train_val, test_size=0.2, random_state=2, stratify=y_train_val)

In [None]:
# PHA-L high and PHA-L low counts in each set
count_train = [y_train.sum(), len(y_train)-y_train.sum()]
count_val = [y_val.sum(), len(y_val)-y_val.sum()]
count_test = [y_test.sum(), len(y_test)-y_test.sum()]

In [None]:
# Define class of SingleCellDataset
class SingleCellDataset(Dataset):
    # Initialize
    def __init__(self, rna, lectin):
        self.transcript = torch.tensor(rna, dtype=torch.float)
        self.lectin = torch.tensor(lectin, dtype=torch.float)
    
    # Total number of cells
    def __len__(self):
        return self.transcript.shape[0]
    
    # Index cells
    def __getitem__(self, idx):
        transcript_value = self.transcript[idx, :]
        lectin_value = self.lectin[idx]
        return transcript_value, lectin_value

In [None]:
# Create datasets
TrainDataSet = SingleCellDataset(X_train, y_train)
ValDataSet = SingleCellDataset(X_val, y_val)
TestDataSet = SingleCellDataset(X_test, y_test)

In [None]:
# Create dataloaders
batch_size = 128

train_data_loader = DataLoader(TrainDataSet, batch_size=batch_size, shuffle=True)
val_data_loader = DataLoader(ValDataSet, batch_size=batch_size, shuffle=True)
test_data_loader = DataLoader(TestDataSet, batch_size=X_test.shape[0], shuffle=True)

### 2.   Model training
-----

In [None]:
# Define the model class
class NeuralNetwork(nn.Module):
    def __init__(self, input_size):
        super(NeuralNetwork, self).__init__()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.LeakyReLU(),
            nn.Dropout(0.4),
            nn.BatchNorm1d(128),
            nn.Linear(128, 64),
            nn.LeakyReLU(),
            nn.Dropout(0.4),
            nn.BatchNorm1d(64),
            nn.Linear(64, 16),
            nn.LeakyReLU(),
            nn.Dropout(0.2),
            nn.BatchNorm1d(16),
            nn.Linear(16, 8),
            nn.LeakyReLU(),
            nn.BatchNorm1d(8),
            nn.Linear(8, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        y = self.linear_relu_stack(x)
        return y

In [None]:
# function: call label based on preset probability cutoff
def call_label(pred, prob_cutoff):
    pred_label = []
    for i in pred.squeeze():
        if i >= prob_cutoff:
            pred_label.append(1)
        else:
            pred_label.append(0)
    return torch.tensor(pred_label).reshape(len(pred_label),)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
# Training loop
def train(dataloader, model, loss_fn, optimizer, scheduler, prob_cutoff, count):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.train()
    train_loss, correct_high, correct_low = 0, 0, 0
    high_count, low_count = count[0], count[1]
    
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        y=torch.squeeze(y)

        # Compute prediction error
        pred = model(X)
        pred_label = call_label(pred, prob_cutoff)
        loss = loss_fn(pred, y.unsqueeze(1))
        train_loss += loss.item()

        correct_high += (torch.logical_and(pred_label == 1, y == 1)).type(torch.float).sum().item()
        correct_low += (torch.logical_and(pred_label == 0, y == 0)).type(torch.float).sum().item()

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 10 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
    
    train_loss /= num_batches
    print(f"Avg loss of training set: {train_loss:.4f} \n")
    print(f"Accuracy for 'PHA-L high' class of training set: {correct_high}/{high_count} ({100*correct_high/high_count:.4f}%)")
    print(f"Accuracy for 'PHA-L low' class of training set: {correct_low}/{low_count} ({100*correct_low/low_count:.4f}%)")
    print(f"Overall accuracy: {correct_high+correct_low}/{high_count+low_count} ({100*(correct_high+correct_low)/(high_count+low_count):.4f}%)\n")
    
    scheduler.step()
    
    return train_loss, 100*correct_high/high_count, 100*correct_low/low_count

In [None]:
# Validation loop
def val(dataloader, model, loss_fn, prob_cutoff, count):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    val_loss, correct_high, correct_low = 0, 0, 0
    high_count, low_count = count[0], count[1]
    
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            y=torch.squeeze(y)
            
            pred = model(X)
            pred_label = call_label(pred, prob_cutoff)
            val_loss += loss_fn(pred, y.unsqueeze(1)).item()
            correct_high += (torch.logical_and(pred_label == 1, y == 1)).type(torch.float).sum().item()
            correct_low += (torch.logical_and(pred_label == 0, y == 0)).type(torch.float).sum().item()
    
    val_loss /= num_batches
    print(f"Avg loss of test set: {val_loss:.4f} \n")
    print(f"Accuracy for 'PHA-L high' class of validation set: {correct_high}/{high_count} ({100*correct_high/high_count:.4f}%)")
    print(f"Accuracy for 'PHA-L low' class of validation set: {correct_low}/{low_count} ({100*correct_low/low_count:.4f}%)")
    print(f"Overall accuracy: {correct_high+correct_low}/{high_count+low_count} ({100*(correct_high+correct_low)/(high_count+low_count):.4f}%)\n")
    
    return val_loss, 100*correct_high/high_count, 100*correct_low/low_count

In [None]:
# Start training
gene_number = X.shape[1]

model = NeuralNetwork(input_size=gene_number).to(device)
print(model)

loss_fn = nn.BCELoss()
prob_cutoff = 0.5
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
epochs = 30
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs)

train_loss = []
train_accuracy_high = []
train_accuracy_low = []

val_loss = []
val_accuracy_high = []
val_accuracy_low = []

for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    loss_train_epoch, acc_train_epoch_high, acc_train_epoch_low = train(train_data_loader, model, loss_fn, optimizer, scheduler, prob_cutoff, count_train)
    train_loss.append(loss_train_epoch)
    train_accuracy_high.append(acc_train_epoch_high)
    train_accuracy_low.append(acc_train_epoch_low)
    
    loss_val_epoch, acc_val_epoch_high, acc_val_epoch_low = val(val_data_loader, model, loss_fn, prob_cutoff, count_val)
    val_loss.append(loss_val_epoch)
    val_accuracy_high.append(acc_val_epoch_high)
    val_accuracy_low.append(acc_val_epoch_low)
    
print("Training finished.")

In [None]:
# Plot training set and validation set loss
plt.plot(np.arange(1, epochs+1), train_loss, label="train_loss")
plt.plot(np.arange(1, epochs+1), val_loss, label="test_loss")
plt.xlabel('Epoch')
plt.ylabel(str(loss_fn))
plt.legend(loc=1)
plt.xticks(np.arange(0, epochs+2, step=2))
plt.xlim(1, epochs)
plt.grid()
plt.show()

In [None]:
# Plot training set and validation set accuracy
plt.plot(np.arange(1, epochs+1), train_accuracy_high, label="train_accuracy_high")
plt.plot(np.arange(1, epochs+1), train_accuracy_low, label="train_accuracy_low")
plt.plot(np.arange(1, epochs+1), val_accuracy_high, label="val_accuracy_high")
plt.plot(np.arange(1, epochs+1), val_accuracy_low, label="val_accuracy_low")
plt.xlabel('Epoch')
plt.ylabel('Accuracy(%)')
plt.legend(loc=4)
plt.xticks(np.arange(0, epochs+2, step=2))
plt.yticks(np.arange(0, 110, step=10))
plt.xlim(1, epochs)
plt.ylim(0,105)
plt.grid()
plt.show()