In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, roc_curve, auc, precision_recall_curve, average_precision_score
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np

#Define the neural network model
class BinaryClassifier(nn.Module):
    def __init__(self, input_dim):
        super(BinaryClassifier, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64,1))
    
    def forward(self, x):
        return self.network(x)

Prediction of PD1 response status

In [None]:
# read datasets for prediction of pd1
df = pd.read_csv("../result/pd1/metadict.csv", index_col=0)
count_metadict = df.to_numpy().T

# read datasets
df0 = pd.read_csv("../result/pd1/conqur.csv", index_col=0)
count_conqur = df0.to_numpy().T

df1 = pd.read_csv("../result/pd1/combatseq.csv", index_col=0)
count_combatseq = df1.to_numpy().T

df2 = pd.read_csv("../result/pd1/percentile.csv", index_col=0)
count_percentile = df2.to_numpy().T

df3 = pd.read_csv("../result/pd1/debiasm.csv", index_col=0)
count_debiasm = df3.to_numpy().T

df4 = pd.read_csv("../result/pd1/plsda.csv", index_col=0)
count_plsda = df4.to_numpy().T

df5 = pd.read_csv("../result/pd1/mmuphin.csv", index_col=0)
count_mmuphin = df5.to_numpy().T

df6 = pd.read_csv("../result/pd1/scanvi.csv", index_col=0)
count_scanvi = df6.to_numpy().T

df7 = pd.read_csv("../result/pd1/count_otu.csv", index_col=0)
count_raw = df7.to_numpy().T

count_dir = {"MetaDICT": count_metadict, "ComBatSeq": count_combatseq, "DEBIAS-M": count_debiasm, "Percentile-Norm": count_percentile, 
             "PLSDA-batch": count_plsda, "MMUPHin": count_mmuphin, "Unprocessed": count_raw, "scANVI": count_scanvi, "ConQuR": count_conqur}

In [None]:
metadf = pd.read_csv("../result/pd1/meta.csv", index_col=0)
Response = metadf["Response"].to_list()
Y = np.array([1 if item == "R" else 0 for item in Response])

In [None]:
# leave-one-out
Dataset = np.array(metadf["Dataset"].to_list())
dataset_unique = np.unique(Dataset)
dataset_unique

In [None]:
batch_size_l = [16,64,64,64,64,64] # predefined batch size
number_epoch = [50,50,50,50,50,50]
roc_auc_curve = {}
roc_auc_score = []
test = []
method_l = []
for i in range(len(dataset_unique)):
    name = dataset_unique[i]
    roc_auc_curve[name] = {}
    for method, count in count_dir.items():
        split_data_training = {dataset_name: count[Dataset != dataset_name,:] for dataset_name in dataset_unique}
        split_label_training = {dataset_name: Y[Dataset != dataset_name] for dataset_name in dataset_unique}

        split_data_test = {dataset_name: count[Dataset == dataset_name,:] for dataset_name in dataset_unique}
        split_label_test = {dataset_name: Y[Dataset == dataset_name] for dataset_name in dataset_unique}
        
        
        # Convert the data to PyTorch tensors
        torch.manual_seed(2025)
        X_train = torch.tensor(split_data_training[name], dtype=torch.float32)
        y_train = torch.tensor(split_label_training[name], dtype=torch.float32).view(-1, 1)
        X_test = torch.tensor(split_data_test[name], dtype=torch.float32)
        y_test = torch.tensor(split_label_test[name], dtype=torch.float32).view(-1, 1)

        # Create PyTorch datasets and loaders
        train_dataset = TensorDataset(X_train, y_train)

        batch_size = batch_size_l[i]
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        model = BinaryClassifier(input_dim=X_train.shape[1])

        #Define the loss function and optimizer
        criterion = nn.BCEWithLogitsLoss() 
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        # Training loop
        num_epochs = number_epoch[i]
        for epoch in tqdm(range(num_epochs),desc = f"processing"):
            model.train()
            running_loss = 0.0
            for inputs, labels in train_loader:
                optimizer.zero_grad()          # Zero the parameter gradients
                outputs = model(inputs)          # Forward pass
                loss = criterion(outputs, labels)  # Compute loss
                loss.backward()                # Backward pass
                optimizer.step()               # Update weights
                running_loss += loss.item() * inputs.size(0)

            epoch_loss = running_loss / len(train_loader.dataset)
        #Evaluation
        model.eval()
        with torch.no_grad():
            outputs = model(X_test)
            # Concatenate predictions and labels from all batches
        fpr, tpr, _ = roc_curve(y_test, outputs)  # Compute ROC curve
        roc_auc = auc(fpr, tpr)  # Compute AUC score
        print(f"{method} ROC-AUC Score: {roc_auc:.4f}")
        results = pd.DataFrame({"fpr": fpr, "tpr": tpr})
        results.to_csv(f"../result/pd1/roc_{name}_{method}.csv", index=False)

        test.append(name)
        method_l.append(method)
        roc_auc_score.append(roc_auc)


        roc_auc_curve[name][method] = [fpr, tpr]


In [None]:
roc_auc_nc = {"Method":method_l, "Test": test, "ROC-AUC":roc_auc_score}
df = pd.DataFrame(roc_auc_nc)
df.to_csv("../result/pd1/roc_pd1_nn.csv")

In [None]:
# Create a new figure
line_styles = ['-', '--', '-.', ':', '-', '--','solid','dashed']
# Iterate over your curves and plot each one
for name, all_curve in roc_auc_curve.items():
    k = 0
    plt.figure(figsize=(10, 8))
    for method, curve in all_curve.items():
        fpr = curve[0]
        tpr = curve[1]
        score = auc(fpr, tpr)
        plt.plot(fpr, tpr, label=f'{method}: AUC = {score: .2f}',linestyle=line_styles[k],linewidth=2)
        k += 1

    
    # Add a diagonal line for reference (random classifier)
    plt.plot([0, 1], [0, 1], color='gray', linestyle='--')

    # Set plot labels and title
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title(f"ROC Curves ({name})")
    plt.legend(loc="lower right")
    plt.grid(True)
    plt.savefig(f"../fig/roc_all_{name}.jpeg",dpi = 300)
    plt.show()


Negative Control experiment

In [None]:
# read datasets for negative control experiments
df = pd.read_csv("../result/pd1/metadict_nc.csv", index_col=0)
count_metadict = df.to_numpy().T

# read datasets
df0 = pd.read_csv("../result/pd1/conqur_nc.csv", index_col=0)
count_conqur = df0.to_numpy().T

df1 = pd.read_csv("../result/pd1/combatseq_nc.csv", index_col=0)
count_combatseq = df1.to_numpy().T

df2 = pd.read_csv("../result/pd1/percentile_nc.csv", index_col=0)
count_percentile = df2.to_numpy().T

df3 = pd.read_csv("../result/pd1/debiasm_nc.csv", index_col=0)
count_debiasm = df3.to_numpy().T

df4 = pd.read_csv("../result/pd1/plsda_nc.csv", index_col=0)
count_plsda = df4.to_numpy().T

df5 = pd.read_csv("../result/pd1/mmuphin_nc.csv", index_col=0)
count_mmuphin = df5.to_numpy().T

df6 = pd.read_csv("../result/pd1/scanvi_nc.csv", index_col=0)
count_scanvi = df6.to_numpy().T

df7 = pd.read_csv("../result/pd1/count_otu.csv", index_col=0)
count_raw = df7.to_numpy().T

count_dir = {"MetaDICT": count_metadict, "ComBatSeq": count_combatseq, "DEBIAS-M": count_debiasm, "Percentile-Norm": count_percentile, 
             "PLSDA-batch": count_plsda, "MMUPHin": count_mmuphin, "Unprocessed": count_raw, "scANVI": count_scanvi, "ConQuR": count_conqur}

In [None]:
metadf1 = pd.read_csv("../result/pd1/meta_nc.csv", index_col=0)
Group = metadf1["Y"].to_list()
Y1 = np.array([1 if item == "Group 1" else 0 for item in Group])

In [None]:
batch_size_l = [16,64,64,64,64,64] # predefined batch size
number_epoch = [50,50,50,50,50,50]
roc_auc_curve = {}
roc_auc_score = []
test = []
method_l = []
for i in range(len(dataset_unique)):
    name = dataset_unique[i]
    roc_auc_curve[name] = {}
    for method, count in count_dir.items():
        split_data_training = {dataset_name: count[Dataset != dataset_name,:] for dataset_name in dataset_unique}
        split_label_training = {dataset_name: Y1[Dataset != dataset_name] for dataset_name in dataset_unique}

        split_data_test = {dataset_name: count[Dataset == dataset_name,:] for dataset_name in dataset_unique}
        split_label_test = {dataset_name: Y1[Dataset == dataset_name] for dataset_name in dataset_unique}
        
        
        # Convert the data to PyTorch tensors
        torch.manual_seed(2025)
        X_train = torch.tensor(split_data_training[name], dtype=torch.float32)
        y_train = torch.tensor(split_label_training[name], dtype=torch.float32).view(-1, 1)
        X_test = torch.tensor(split_data_test[name], dtype=torch.float32)
        y_test = torch.tensor(split_label_test[name], dtype=torch.float32).view(-1, 1)

        # Create PyTorch datasets and loaders
        train_dataset = TensorDataset(X_train, y_train)

        batch_size = batch_size_l[i]
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        model = BinaryClassifier(input_dim=X_train.shape[1])

        #Define the loss function and optimizer
        criterion = nn.BCEWithLogitsLoss() 
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        # Training loop
        num_epochs = number_epoch[i]
        for epoch in tqdm(range(num_epochs),desc = f"processing"):
            model.train()
            running_loss = 0.0
            for inputs, labels in train_loader:
                optimizer.zero_grad()          # Zero the parameter gradients
                outputs = model(inputs)          # Forward pass
                loss = criterion(outputs, labels)  # Compute loss
                loss.backward()                # Backward pass
                optimizer.step()               # Update weights
                running_loss += loss.item() * inputs.size(0)

            epoch_loss = running_loss / len(train_loader.dataset)
        #Evaluation
        model.eval()
        with torch.no_grad():
            outputs = model(X_test)
            # Concatenate predictions and labels from all batches
        fpr, tpr, _ = roc_curve(y_test, outputs)  # Compute ROC curve
        roc_auc = auc(fpr, tpr)  # Compute AUC score
        print(f"{method} ROC-AUC Score: {roc_auc:.4f}")
        results = pd.DataFrame({"fpr": fpr, "tpr": tpr})
        results.to_csv(f"../result/pd1/roc_{name}_{method}_nc.csv", index=False)
        
        test.append(name)
        method_l.append(method)
        roc_auc_score.append(roc_auc)


        roc_auc_curve[name][method] = [fpr, tpr]


In [None]:
roc_auc_nc = {"Method":method_l, "Test": test, "ROC-AUC":roc_auc_score}
df = pd.DataFrame(roc_auc_nc)
df.to_csv("../result/pd1/roc_nc_nn.csv")

In [None]:
# Create a new figure
line_styles = ['-', '--', '-.', ':', '-', '--']
# Iterate over your curves and plot each one
for name, all_curve in roc_auc_curve.items():
    k = 0
    plt.figure(figsize=(10, 8))
    for method, curve in all_curve.items():
        if method in ['MetaDICT', "Unprocessed"]:
            fpr = curve[0]
            tpr = curve[1]
            score = auc(fpr, tpr)
            plt.plot(fpr, tpr, label=f'{method}: AUC = {score: .2f}',linestyle=line_styles[k],linewidth=2)
            k += 1

    
    # Add a diagonal line for reference (random classifier)
    plt.plot([0, 1], [0, 1], color='gray', linestyle='--')

    # Set plot labels and title
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title(f"ROC Curves ({name})")
    plt.legend(loc="lower right")
    plt.grid(True)
    plt.savefig(f"../fig/roc_{name}.jpeg",dpi = 300)
    plt.show()
