
# Pytorch Experimentation

    Developed by: Christian Eger
    Würzburg Institute for Systems Immunology, Faculty of Medicine, Julius-Maximilian-Universität Würzburg
    Created: 240321
    Latest version: 240321



## Import Statements

In [30]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torch.utils.data import TensorDataset
import torch.nn.functional as F

import anndata as ad
import numpy as np
import matplotlib.pyplot as plt

## Helper Functions

In [31]:
# This function takes in your adata object as well as the category you want to train your data on as well as parameters for defining the datasets sizes and returns two pytorch datasets, one for training and one for testing
def obs_to_tensor(adata, category=None, training_size=None, test_size=None):
    # This helper function returns a single pytorch dataset from the adata object given a list of observations
    def tensors_to_dataset(adata, obs_list, category, label_to_id):
        tensors = []
        labels = []
        for obs in obs_list:
            tensors.append(
                torch.tensor(
                    adata[adata.obs_names == obs].X
                    .toarray()
                )
            )
            labels.append(
                adata[adata.obs_names == obs].obs[category].iloc[0]
            )
        tensors = torch.squeeze(torch.stack(tensors))
        labels = torch.tensor([label_to_id[label] for label in labels])
        return TensorDataset(tensors, labels)

    # The purpose of these dictionaries is to encode the values of the desired category as integers
    label_to_id = {label: idx for idx, label in enumerate(adata.obs[category].unique())}
    id_to_label = {idx: label for label, idx in label_to_id.items()}

    # Making sure the total dataset size doesnt exceed the number of observations in the adata object
    if test_size + training_size <= adata.shape[0]:
        # Randomly sampling from the observations of the adata object
        random_obs = np.random.choice(
            adata.obs.index,
            size=training_size + test_size,
            replace=False,
        )
        # Creating two subsets of the sampled observations for training and testing purposes
        random_obs_train = random_obs[:training_size]
        random_obs_test = random_obs[-test_size:]

        # Creating the datasets using the helper function from the two subset samples
        training_data = tensors_to_dataset(adata, random_obs_train, category, label_to_id)
        testing_data = tensors_to_dataset(adata, random_obs_test, category, label_to_id)

    return(
        training_data,
        testing_data,
        id_to_label,
    )

## Data loading

In [None]:
adata = ad.read_h5ad('../data/Marburg_cell_states_locked_scANVI_ctl230901.raw.h5ad')
adata

AnnData object with n_obs × n_vars = 97573 × 27208
    obs: 'sex', 'age', 'ethnicity', 'PaCO2', 'donor', 'infection', 'disease', 'SMK', 'illumina_stimunr', 'bd_rhapsody', 'n_genes', 'doublet_scores', 'predicted_doublets', 'batch', 'n_genes_by_counts', 'total_counts', 'total_counts_mt', 'pct_counts_mt', 'total_counts_ribo', 'pct_counts_ribo', 'percent_mt2', 'n_counts', 'percent_chrY', 'XIST-counts', 'S_score', 'G2M_score', 'condition', 'sample_group', 'IAV_score', 'group', 'Viral_score', 'cell_type', 'cell_states', 'leiden', 'cell_compartment', 'seed_labels', '_scvi_batch', '_scvi_labels', 'C_scANVI'
    var: 'mt', 'ribo'
    obsm: 'X_scANVI', 'X_scVI', 'X_umap'

: 

In [None]:
training_data, test_data, labels_map = obs_to_tensor(adata, category='disease', training_size=10000, test_size=200)

: 

In [None]:
train_dataloader = DataLoader(training_data, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=32, shuffle=False)

: 

In [None]:
if torch.cuda.is_available():
    num_devices = torch.cuda.device_count()
    print("Available CUDA devices:")
    for i in range(num_devices):
        print(f"  {i}: {torch.cuda.get_device_name(i)}")
    device = torch.device("cuda:1")  # Change the index if you want to use a different device
    print(f"Using CUDA device: {device}")
    torch.cuda.set_device(device)
else:
    print("CUDA is not available. Using CPU.")
    device = torch.device("cpu")


Available CUDA devices:
  0: NVIDIA RTX 6000 Ada Generation
  1: NVIDIA RTX 6000 Ada Generation
Using CUDA device: cuda:1


: 

## Model Construction

In [None]:
input_size = adata.shape[1]
num_classes = len(labels_map)

: 

In [None]:
class GeneExpressionClassifier(nn.Module):
    def __init__(self, input_size, num_classes):
        super(GeneExpressionClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, num_classes)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

model = GeneExpressionClassifier(input_size, num_classes)

: 

## Training Loop

In [None]:
loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

: 

In [None]:
num_epochs = 20
train_losses, val_losses = [], []

model = GeneExpressionClassifier(input_size, num_classes)
model.to(device)
for i in range(num_epochs):
    model.train()
    running_loss = 0.0
    for expression, label in train_dataloader:
        expression, label = expression.to(device), label.to(device)
        optimizer.zero_grad()
        output = model(expression)
        loss = loss_fn(output, label)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() / expression.size(0)
    train_loss = running_loss / len(train_dataloader.dataset)
    train_losses.append(train_loss)
        # Validation phase
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for expression, label in test_dataloader:
            # Move inputs and labels to the device
            expression, label = expression.to(device), label.to(device)
            outputs = model(expression)
            loss = loss_fn(outputs, label)
            running_loss += loss.item() * label.size(0)
    val_loss = running_loss / len(test_dataloader.dataset)
    val_losses.append(val_loss)
    print(f"Epoch {i+1}/{num_epochs} - Train loss: {train_loss}, Validation loss: {val_loss}")

Epoch 1/20 - Train loss: 0.001728820150345564, Validation loss: 1.813855800628662
Epoch 2/20 - Train loss: 0.001734794738329947, Validation loss: 1.813855800628662
Epoch 3/20 - Train loss: 0.001730889684520662, Validation loss: 1.813855800628662
Epoch 4/20 - Train loss: 0.0017277152478694915, Validation loss: 1.813855800628662
Epoch 5/20 - Train loss: 0.001738824024796486, Validation loss: 1.813855800628662
Epoch 6/20 - Train loss: 0.0017314267260953784, Validation loss: 1.813855800628662
Epoch 7/20 - Train loss: 0.001733431640639901, Validation loss: 1.813855800628662
Epoch 8/20 - Train loss: 0.0017282135169953109, Validation loss: 1.813855800628662
Epoch 9/20 - Train loss: 0.0017381363954395055, Validation loss: 1.813855800628662
Epoch 10/20 - Train loss: 0.0017278763400390745, Validation loss: 1.813855800628662
Epoch 11/20 - Train loss: 0.0017326958841644227, Validation loss: 1.813855800628662
Epoch 12/20 - Train loss: 0.0017412024576216936, Validation loss: 1.813855800628662
Epoch 

: 

In [None]:
model.to('cpu')

GeneExpressionClassifier(
  (fc1): Linear(in_features=27208, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=64, bias=True)
  (fc3): Linear(in_features=64, out_features=2, bias=True)
)

: 

In [None]:
model(test_data[2][0])

tensor([-2.1556,  1.4240], grad_fn=<ViewBackward0>)

: 