In [None]:
import scanpy as sc
import pandas as pd

# Reading Datasets

In [None]:
foetal = sc.read_h5ad("/home/andreafabbricatore/thesis/data/foetal_filtered.h5ad")

In [None]:
PBMC = sc.read_h5ad("/home/andreafabbricatore/thesis/data/PBMC.h5ad")

In [None]:
PIC = sc.read_h5ad("/home/andreafabbricatore/thesis/data/PIC.h5ad")

In [None]:
hypoxia = sc.read_h5ad("/home/andreafabbricatore/thesis/data/scsHypoxia_filtered.h5ad")

# Getting Data

### Foetal

In [None]:
foetal.X.shape

In [None]:
foetal.obs

This object contains 4504 cells, each with 29680 genes and their respective counts. In the obs dataframe we can find two types of labels for the data: 'Label' which is the cell subtype adn 'Label_merged' which is the cell-type (macro with respect to the former).

In [None]:
foetal_cell_type_labels = foetal.obs['Label_merged'].tolist()
foetal_cell_subtype_labels = foetal.obs['Label'].tolist()

In [None]:
foetal.obs['Label'].unique().tolist()

### PBMC

In [None]:
PBMC.X.shape

In [None]:
PBMC.obs

In [None]:
PBMC.obs['sample'].value_counts()

This object contains 14039 cells, each with 12762 genes and their respective counts. In the obs dataframe we can find the 'Label' column which represents the cell-type.

In [None]:
PBMC_cell_type_labels = PBMC.obs['Label'].tolist()

In [None]:
PBMC.obs['Label'].unique()

### PIC

In [None]:
PIC.X.shape

In [None]:
PIC.obs

This object contains 6321 cells, each with 34363 genes and their respective counts. In the obs dataframe we can find the 'Label' column which represents the cell-type.

In [None]:
PIC_cell_type_labels = PIC.obs['Label'].tolist()

In [None]:
PIC.obs['Label'].unique()

### Hypoxia

In [None]:
hypoxia.X.shape

In [None]:
hypoxia.obs

This object contains 9324 cells, each with 19046 genes and their respective counts. In the obs dataframe we can find two types of labels for the data: 'Label' which is the cell-state and 'HypoxicState' which is the cell-type (Hypoxia or Normoxia).

In [None]:
hypoxia_cell_state_labels = hypoxia.obs['Label'].tolist()
hypoxia_cell_labels = hypoxia.obs['HypoxicState'].tolist()

In [None]:
hypoxia.obs['Label'].unique()

# Pre-Processing

Let's use the binning procedure, same as the scGPT one. We won't use it just yet, actually we will train the model on the binned version and not binned version for the benchmarks.

In [None]:
from preprocess import Preprocessor

In [None]:
data_is_raw = False
preprocessor = Preprocessor(
    use_key="X",  # the key in adata.layers to use as raw data
    #filter_gene_by_counts=filter_gene_by_counts,  # step 1
    #filter_cell_by_counts=False,  # step 2
    normalize_total=1e4,  # 3. whether to normalize the raw data and to what sum
    result_normed_key="X_normed",  # the key in adata.layers to store the normalized data
    log1p=data_is_raw,  # 4. whether to log1p the normalized data
    result_log1p_key="X_log1p",
    subset_hvg=False,  # 5. whether to subset the raw data to highly variable genes
    hvg_flavor="seurat_v3" if data_is_raw else "cell_ranger",
    binning=51,  # 6. whether to bin the raw data and to what number of bins
    result_binned_key="X_binned",  # the key in adata.layers to store the binned data
)

In [None]:
preprocessor(foetal, batch_key=None)

In [None]:
preprocessor(PBMC, batch_key=None)

In [None]:
preprocessor(PIC, batch_key=None)

In [None]:
preprocessor(hypoxia, batch_key=None)

Let's now produce the labels for each dataset. First, let's encode the labels into categorical variables for our xgboost model.

In [None]:
from sklearn.preprocessing import LabelEncoder

In [None]:
labelencoder = LabelEncoder()

In [None]:
foetal_cell_type_labels = labelencoder.fit_transform(foetal_cell_type_labels)
foetal_cell_subtype_labels = labelencoder.fit_transform(foetal_cell_subtype_labels)

In [None]:
PBMC_cell_type_labels = labelencoder.fit_transform(PBMC_cell_type_labels)

In [None]:
PIC_cell_type_labels = labelencoder.fit_transform(PIC_cell_type_labels)

In [None]:
hypoxia_cell_state_labels = labelencoder.fit_transform(hypoxia_cell_state_labels)
hypoxia_cell_labels = labelencoder.fit_transform(hypoxia_cell_labels)

In [None]:
labelencoder.fit(hypoxia_cell_state_labels)
le_name_mapping = dict(zip(labelencoder.classes_, labelencoder.transform(labelencoder.classes_)))

In [None]:
le_name_mapping

In [None]:
import numpy as np

In [None]:
def create_ohe_matrix(labels):
    unique_labels = np.unique(labels)
    label_matrix = np.zeros((len(labels), len(unique_labels)))

    for i, label in enumerate(labels):
        label_matrix[i, label] = 1

    return label_matrix

In [None]:
foetal_cell_type_labels_matrix = create_ohe_matrix(foetal_cell_type_labels)
foetal_cell_subtype_labels_matrix = create_ohe_matrix(foetal_cell_subtype_labels)

In [None]:
PBMC_cell_type_labels_matrix = create_ohe_matrix(PBMC_cell_type_labels)

In [None]:
PIC_cell_type_labels_matrix = create_ohe_matrix(PIC_cell_type_labels)

In [None]:
hypoxia_cell_state_labels_matrix = create_ohe_matrix(hypoxia_cell_state_labels)
hypoxia_cell_labels_matrix = create_ohe_matrix(hypoxia_cell_labels)

# Models

## XGBoost

In [None]:
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

### BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(foetal.layers['X_binned'], foetal_cell_type_labels, test_size=0.2, random_state=42)
model = xgb.XGBClassifier(objective='multi:softmax', num_class=len(set(foetal_cell_type_labels)))
model.fit(X_train, y_train)
preds = model.predict(X_test)

In [None]:
accuracy_score(y_test, preds)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(foetal.layers['X_binned'], foetal_cell_subtype_labels, test_size=0.2, random_state=42)
model = xgb.XGBClassifier(objective='multi:softmax', num_class=len(set(foetal_cell_subtype_labels)))
model.fit(X_train, y_train)
preds = model.predict(X_test)

In [None]:
accuracy_score(y_test, preds)

### NON BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(foetal.X, foetal_cell_type_labels, test_size=0.2, random_state=42)
model = xgb.XGBClassifier(objective='multi:softmax', num_class=len(set(foetal_cell_type_labels)))
model.fit(X_train, y_train)
preds = model.predict(X_test)

In [None]:
accuracy_score(y_test, preds)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(foetal.X, foetal_cell_subtype_labels, test_size=0.2, random_state=42)
model = xgb.XGBClassifier(objective='multi:softmax', num_class=len(set(foetal_cell_subtype_labels)))
model.fit(X_train, y_train)
preds = model.predict(X_test)

In [None]:
accuracy_score(y_test, preds)

### BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(PBMC.layers['X_binned'], PBMC_cell_type_labels, test_size=0.2, random_state=42)
model = xgb.XGBClassifier(objective='multi:softmax', num_class=len(set(PBMC_cell_type_labels)))
model.fit(X_train, y_train)
preds = model.predict(X_test)

In [None]:
accuracy_score(y_test, preds)

### NON BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(PBMC.X, PBMC_cell_type_labels, test_size=0.2, random_state=42)
model = xgb.XGBClassifier(objective='multi:softmax', num_class=len(set(PBMC_cell_type_labels)))
model.fit(X_train, y_train)
preds = model.predict(X_test)

In [None]:
accuracy_score(y_test, preds)

### BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(PIC.layers['X_binned'], PIC_cell_type_labels, test_size=0.2, random_state=42)
model = xgb.XGBClassifier(objective='multi:softmax', num_class=len(set(PIC_cell_type_labels)))
model.fit(X_train, y_train)
preds = model.predict(X_test)

In [None]:
accuracy_score(y_test, preds)

### NON BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(PIC.X, PIC_cell_type_labels, test_size=0.2, random_state=42)
model = xgb.XGBClassifier(objective='multi:softmax', num_class=len(set(PIC_cell_type_labels)))
model.fit(X_train, y_train)
preds = model.predict(X_test)

In [None]:
accuracy_score(y_test, preds)

### BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(hypoxia.layers['X_binned'], hypoxia_cell_state_labels, test_size=0.2, random_state=42)
model = xgb.XGBClassifier(objective='multi:softmax', num_class=len(set(hypoxia_cell_state_labels)))
model.fit(X_train, y_train)
preds = model.predict(X_test)

In [None]:
counter = 0
total = 0
for i,j in enumerate(y_test):
    if j < 4:
        total += 1
        if j == preds[i]:
            counter += 1

print(counter/total)

### NON BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(hypoxia.X, hypoxia_cell_state_labels, test_size=0.2, random_state=42)
model = xgb.XGBClassifier(objective='multi:softmax', num_class=len(set(hypoxia_cell_state_labels)))
model.fit(X_train, y_train)
preds = model.predict(X_test)

In [None]:
counter = 0
total = 0
for i,j in enumerate(y_test):
    if j < 4:
        total += 1
        if j == preds[i]:
            counter += 1

print(counter/total)

## NN

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# Define the neural network architecture
class Classifier(nn.Module):
    def __init__(self, input_size, output_size):
        super(Classifier, self).__init__()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(input_size, 2048),
            nn.ReLU(),
            nn.BatchNorm1d(2048),
            nn.Linear(2048, 1024),
            nn.ReLU(),
            nn.BatchNorm1d(1024),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Linear(512, output_size)
        )

    def forward(self, x):
        x = self.linear_relu_stack(x)
        x = torch.softmax(x, dim=1)
        return x

def train_model(model, train_data, train_labels, num_epochs, learning_rate, convergence_threshold=1e-5):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    train_data = train_data.to(device)
    train_labels = train_labels.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=learning_rate)

    prev_loss = float('inf')  # Initialize with a large value
    for epoch in range(num_epochs):
        logits = model.forward(train_data)
        loss = criterion(logits, train_labels)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if (epoch+1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}')

        # Check convergence
        if abs(prev_loss - loss.item()) < convergence_threshold:
            print(f'Converged at epoch [{epoch+1}/{num_epochs}], Loss: {loss.item()}')
            break
        
        prev_loss = loss.item()

def validate_model(model, test_data, test_labels):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    test_data = test_data.to(device)
    test_labels = test_labels.to(device)
    outputs = model(test_data)
    _, outputs = torch.max(outputs, dim=1)
    _, labels = torch.max(test_labels, dim=1)
    # count = 0
    # total = 0
    # for i,j in enumerate(labels):
    #     if j < 4:
    #         total += 1
    #         if j == outputs[i]:
    #             count += 1
    # print(len(outputs))
    # print(total)
    # print(f"Accuracy: {count/total}")
    print(f"Accuracy: {accuracy_score(labels, outputs)}")

### BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(foetal.layers['X_binned'], foetal_cell_type_labels_matrix, test_size=0.2, random_state=42)
# Convert data to PyTorch tensors
train_input_data = torch.tensor(X_train, dtype=torch.float32)
train_labels = torch.tensor(y_train, dtype=torch.float32)
test_input_data = torch.tensor(X_test, dtype=torch.float32)
test_labels = torch.tensor(y_test, dtype=torch.float32)
# Define hyperparameters
input_size = X_train.shape[1]
output_size = len(np.unique(foetal_cell_type_labels))
num_epochs = 5000
learning_rate = 0.001

# Initialize the model
model = Classifier(input_size, output_size)

# Train the model
train_model(model, train_input_data, train_labels, num_epochs, learning_rate)
validate_model(model, test_input_data, test_labels)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(foetal.layers['X_binned'], foetal_cell_subtype_labels_matrix, test_size=0.2, random_state=42)
# Convert data to PyTorch tensors
train_input_data = torch.tensor(X_train, dtype=torch.float32)
train_labels = torch.tensor(y_train, dtype=torch.float32)
test_input_data = torch.tensor(X_test, dtype=torch.float32)
test_labels = torch.tensor(y_test, dtype=torch.float32)
# Define hyperparameters
input_size = X_train.shape[1]
output_size = len(np.unique(foetal_cell_subtype_labels))
num_epochs = 5000
learning_rate = 0.001

# Initialize the model
model = Classifier(input_size, output_size)

# Train the model
train_model(model, train_input_data, train_labels, num_epochs, learning_rate)
validate_model(model, test_input_data, test_labels)

### NON BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(foetal.X, foetal_cell_type_labels_matrix, test_size=0.2, random_state=42)
# Convert data to PyTorch tensors
train_input_data = torch.tensor(X_train, dtype=torch.float32)
train_labels = torch.tensor(y_train, dtype=torch.float32)
test_input_data = torch.tensor(X_test, dtype=torch.float32)
test_labels = torch.tensor(y_test, dtype=torch.float32)
# Define hyperparameters
input_size = X_train.shape[1]
output_size = len(np.unique(foetal_cell_type_labels))
num_epochs = 5000
learning_rate = 0.001

# Initialize the model
model = Classifier(input_size, output_size)

# Train the model
train_model(model, train_input_data, train_labels, num_epochs, learning_rate)
validate_model(model, test_input_data, test_labels)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(foetal.X, foetal_cell_subtype_labels_matrix, test_size=0.2, random_state=42)
# Convert data to PyTorch tensors
train_input_data = torch.tensor(X_train, dtype=torch.float32)
train_labels = torch.tensor(y_train, dtype=torch.float32)
test_input_data = torch.tensor(X_test, dtype=torch.float32)
test_labels = torch.tensor(y_test, dtype=torch.float32)
# Define hyperparameters
input_size = X_train.shape[1]
output_size = len(np.unique(foetal_cell_subtype_labels))
num_epochs = 5000
learning_rate = 0.001

# Initialize the model
model = Classifier(input_size, output_size)

# Train the model
train_model(model, train_input_data, train_labels, num_epochs, learning_rate)
validate_model(model, test_input_data, test_labels)

### BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(PBMC.layers['X_binned'], PBMC_cell_type_labels_matrix, test_size=0.2, random_state=42)
# Convert data to PyTorch tensors
train_input_data = torch.tensor(X_train, dtype=torch.float32)
train_labels = torch.tensor(y_train, dtype=torch.float32)
test_input_data = torch.tensor(X_test, dtype=torch.float32)
test_labels = torch.tensor(y_test, dtype=torch.float32)
# Define hyperparameters
input_size = X_train.shape[1]
output_size = len(np.unique(PBMC_cell_type_labels))
num_epochs = 5000
learning_rate = 0.001

# Initialize the model
model = Classifier(input_size, output_size)

# Train the model
train_model(model, train_input_data, train_labels, num_epochs, learning_rate)
validate_model(model, test_input_data, test_labels)

### NON BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(PBMC.X, PBMC_cell_type_labels_matrix, test_size=0.2, random_state=42)
# Convert data to PyTorch tensors
train_input_data = torch.tensor(X_train, dtype=torch.float32)
train_labels = torch.tensor(y_train, dtype=torch.float32)
test_input_data = torch.tensor(X_test, dtype=torch.float32)
test_labels = torch.tensor(y_test, dtype=torch.float32)
# Define hyperparameters
input_size = X_train.shape[1]
output_size = len(np.unique(PBMC_cell_type_labels))
num_epochs = 5000
learning_rate = 0.001

# Initialize the model
model = Classifier(input_size, output_size)

# Train the model
train_model(model, train_input_data, train_labels, num_epochs, learning_rate)
validate_model(model, test_input_data, test_labels)

### BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(PIC.layers['X_binned'], PIC_cell_type_labels_matrix, test_size=0.2, random_state=42)
# Convert data to PyTorch tensors
train_input_data = torch.tensor(X_train, dtype=torch.float32)
train_labels = torch.tensor(y_train, dtype=torch.float32)
test_input_data = torch.tensor(X_test, dtype=torch.float32)
test_labels = torch.tensor(y_test, dtype=torch.float32)
# Define hyperparameters
input_size = X_train.shape[1]
output_size = len(np.unique(PIC_cell_type_labels))
num_epochs = 5000
learning_rate = 0.001

# Initialize the model
model = Classifier(input_size, output_size)

# Train the model
train_model(model, train_input_data, train_labels, num_epochs, learning_rate)
validate_model(model, test_input_data, test_labels)

### NON BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(PIC.X, PIC_cell_type_labels_matrix, test_size=0.2, random_state=42)
# Convert data to PyTorch tensors
train_input_data = torch.tensor(X_train, dtype=torch.float32)
train_labels = torch.tensor(y_train, dtype=torch.float32)
test_input_data = torch.tensor(X_test, dtype=torch.float32)
test_labels = torch.tensor(y_test, dtype=torch.float32)
# Define hyperparameters
input_size = X_train.shape[1]
output_size = len(np.unique(PIC_cell_type_labels))
num_epochs = 5000
learning_rate = 0.001

# Initialize the model
model = Classifier(input_size, output_size)

# Train the model
train_model(model, train_input_data, train_labels, num_epochs, learning_rate)
validate_model(model, test_input_data, test_labels)

### BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(hypoxia.layers['X_binned'], hypoxia_cell_state_labels_matrix, test_size=0.2, random_state=42)
# Convert data to PyTorch tensors
train_input_data = torch.tensor(X_train, dtype=torch.float32)
train_labels = torch.tensor(y_train, dtype=torch.float32)
test_input_data = torch.tensor(X_test, dtype=torch.float32)
test_labels = torch.tensor(y_test, dtype=torch.float32)
# Define hyperparameters
input_size = X_train.shape[1]
output_size = len(np.unique(hypoxia_cell_state_labels))
num_epochs = 5000
learning_rate = 0.001

# Initialize the model
model = Classifier(input_size, output_size)

# Train the model
train_model(model, train_input_data, train_labels, num_epochs, learning_rate)
validate_model(model, test_input_data, test_labels)

In [None]:
validate_model(model, test_input_data, test_labels)

### NON BINNED

In [None]:
X_train, X_test, y_train, y_test = train_test_split(hypoxia.X, hypoxia_cell_state_labels_matrix, test_size=0.2, random_state=42)
# Convert data to PyTorch tensors
train_input_data = torch.tensor(X_train, dtype=torch.float32)
train_labels = torch.tensor(y_train, dtype=torch.float32)
test_input_data = torch.tensor(X_test, dtype=torch.float32)
test_labels = torch.tensor(y_test, dtype=torch.float32)
# Define hyperparameters
input_size = X_train.shape[1]
output_size = len(np.unique(hypoxia_cell_state_labels))
num_epochs = 5000
learning_rate = 0.001

# Initialize the model
model = Classifier(input_size, output_size)

# Train the model
train_model(model, train_input_data, train_labels, num_epochs, learning_rate)
validate_model(model, test_input_data, test_labels)