In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import anndata as ad
import numpy as np
import pandas as pd

In [2]:
def embedding_label_gen(X_train, adata_train): 
    
    # Replace NaN values in the 'cell_type' column with "undifferentiated"
    adata_train.obs["cell_type"] = adata_train.obs["cell_type"].cat.add_categories("undifferentiated").fillna("undifferentiated")

    # Verify the replacement
    print(adata_train.obs["cell_type"].unique())

    train_labels = adata_train.obs["clone_id"].to_numpy()
    print("train_labels.shape:", train_labels.shape)

    ### generate the labels
    adata_28 = adata_train[adata_train.obs["reprogramming_day"] == "28"]
    print("adata_28.shape:", adata_28.shape)

    # Initialize an empty dictionary to store the cell type distributions
    clone_cell_type_distribution = {}

    # Get the unique clone IDs
    unique_clone_ids = adata_28.obs["clone_id"].unique()

    # Loop through each unique clone_id
    for clone_id in unique_clone_ids:
        # Filter the data to get only rows with the current clone_id
        clone_data = adata_28.obs[adata_28.obs["clone_id"] == clone_id]
        
        # Get the distribution of cell types in the current clone_id
        cell_type_distribution = clone_data["cell_type"].value_counts(normalize=True)
        
        # Round each percentage to 4 decimal places and convert to a dictionary
        cell_type_distribution = cell_type_distribution.round(4).to_dict()
        
        # Store the rounded distribution in the main dictionary
        clone_cell_type_distribution[clone_id] = cell_type_distribution

    # Print the resulting dictionary for verification
    i = 0
    for clone_id, distribution in clone_cell_type_distribution.items():
        print(f"Clone ID: {clone_id}, Cell Type Distribution: {distribution}")
        i+=1
        if i ==3:
            break


    # Step 1: Get embeddings for Day 12 cells
    day12_mask = adata_train.obs["reprogramming_day"] == "12" 
    X_train_day12 = X_train[day12_mask.values] 
    print(f"Day 12 embeddings shape: {X_train_day12.shape}")

    # Step 2: Get the clone labels for Day 12 cells
    clone_labels_day12 = adata_train.obs.loc[day12_mask, "clone_id"].to_numpy()

    # Step 3: Initialize y_train_prob matrix to store the probabilities
    # There are now 4 possible cell types: ['iEP', 'undifferentiated', 'Ambiguous', 'Fibroblast']
    n_classes = len(adata_train.obs["cell_type"].unique())
    y_train_prob = np.zeros((X_train_day12.shape[0], n_classes))

    # Step 4: Assign the distributions from clone_cell_type_distribution to each cell based on its clone_id
    for i, clone_id in enumerate(clone_labels_day12):
        if clone_id in clone_cell_type_distribution:
            # Get the distribution for the clone
            distribution = clone_cell_type_distribution[clone_id]
            
            # Ensure the order of cell types matches ['iEP', 'undifferentiated', 'Ambiguous', 'Fibroblast']
            y_train_prob[i, 0] = distribution.get('iEP', 0)  # Default to 0 if not present
            y_train_prob[i, 1] = distribution.get('undifferentiated', 0)  # Default to 0 if not present
            y_train_prob[i, 2] = distribution.get('Ambiguous', 0)  # Default to 0 if not present
            y_train_prob[i, 3] = distribution.get('Fibroblast', 0)  # Default to 0 if not present

    # Print the shape and first few examples of y_train_prob
    print(f"y_train_prob shape: {y_train_prob.shape}")
    print(f"First 5 rows of y_train_prob:\n{y_train_prob[:5]}")


    X_train_day12 = torch.tensor(X_train_day12, dtype=torch.float32)

    # Example soft labels: 5 samples, each with a probability distribution over 3 classes
    y_train_prob = torch.tensor(y_train_prob, dtype=torch.float32)

    return X_train_day12, y_train_prob

In [3]:
input_dir = "/Users/apple/Desktop/KB/data"
# Load dataset

X_train = np.load(input_dir+'/feat_train_test/biddy/bs30_sf0025/scBaseEncoderFeat_Z_bs30_tau0.5.npy')
print(X_train.shape)

adata_train = ad.read_h5ad(input_dir+'/BiddyData/Biddy_train.h5ad')
X_train_day12, y_train_prob = embedding_label_gen(X_train, adata_train)

(9220, 64)
['iEP', 'undifferentiated', 'Ambiguous', 'Fibroblast']
Categories (4, object): ['Ambiguous', 'Fibroblast', 'iEP', 'undifferentiated']
train_labels.shape: (9220,)
adata_28.shape: (3239, 2000)
Clone ID: 493.0, Cell Type Distribution: {'iEP': 0.6573, 'undifferentiated': 0.2571, 'Ambiguous': 0.0631, 'Fibroblast': 0.0226}
Clone ID: 2352.0, Cell Type Distribution: {'Fibroblast': 0.3806, 'undifferentiated': 0.2985, 'Ambiguous': 0.2463, 'iEP': 0.0746}
Clone ID: 487.0, Cell Type Distribution: {'iEP': 0.4872, 'undifferentiated': 0.2821, 'Ambiguous': 0.2179, 'Fibroblast': 0.0128}
Day 12 embeddings shape: (1477, 64)
y_train_prob shape: (1477, 4)
First 5 rows of y_train_prob:
[[0.6573 0.2571 0.0631 0.0226]
 [0.6573 0.2571 0.0631 0.0226]
 [0.6573 0.2571 0.0631 0.0226]
 [0.6573 0.2571 0.0631 0.0226]
 [0.6573 0.2571 0.0631 0.0226]]


In [4]:
X_test = np.load(input_dir+'/feat_train_test/biddy/bs30_sf0025/test_embedding.npy')
print(X_test.shape)

adata_test = ad.read_h5ad(input_dir+'/BiddyData/Biddy_test.h5ad')

X_test_day12, y_test_prob = embedding_label_gen(X_test, adata_test)

(995, 64)
['undifferentiated', 'iEP', 'Ambiguous', 'Fibroblast']
Categories (4, object): ['Ambiguous', 'Fibroblast', 'iEP', 'undifferentiated']
train_labels.shape: (995,)
adata_28.shape: (367, 2000)
Clone ID: 493.0, Cell Type Distribution: {'iEP': 0.6032, 'undifferentiated': 0.3016, 'Ambiguous': 0.0794, 'Fibroblast': 0.0159}
Clone ID: 2352.0, Cell Type Distribution: {'Fibroblast': 0.4737, 'undifferentiated': 0.3158, 'Ambiguous': 0.1579, 'iEP': 0.0526}
Clone ID: 487.0, Cell Type Distribution: {'Ambiguous': 0.5, 'iEP': 0.5, 'Fibroblast': 0.0, 'undifferentiated': 0.0}
Day 12 embeddings shape: (140, 64)
y_train_prob shape: (140, 4)
First 5 rows of y_train_prob:
[[0.6032 0.3016 0.0794 0.0159]
 [0.6032 0.3016 0.0794 0.0159]
 [0.6032 0.3016 0.0794 0.0159]
 [0.6032 0.3016 0.0794 0.0159]
 [0.6032 0.3016 0.0794 0.0159]]


In [5]:
class SoftLabelNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(SoftLabelNN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)  # Raw output before softmax
        return out


class Trainer:
    def __init__(self, model, optimizer, criterion, X_train, y_train_prob, num_epochs=10000, lr=0.01):
        self.model = model
        self.optimizer = optimizer
        self.criterion = criterion
        self.X_train = X_train
        self.y_train_prob = y_train_prob
        self.num_epochs = num_epochs
        self.lr = lr

    def train(self):
        for epoch in range(self.num_epochs):
            # Forward pass
            outputs = self.model(self.X_train)
            
            # Apply log_softmax to get log probabilities
            outputs_log_prob = torch.log_softmax(outputs, dim=1)
            
            # Calculate the KL divergence loss
            loss = self.criterion(outputs_log_prob, self.y_train_prob)
            
            # Backward pass and optimization
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            
            # Print loss every 50 epochs
            if (epoch+1) % 50 == 0:
                print(f'Epoch [{epoch+1}/{self.num_epochs}], Loss: {loss.item():.4f}')
    
    def predict(self, X_test):
        # Set the model to evaluation mode
        self.model.eval()
        with torch.no_grad():
            outputs = self.model(X_test)
            # Apply softmax to get predicted probabilities
            probabilities = torch.softmax(outputs, dim=1)
        return probabilities

    def accuracy(self, X_test, y_test):
        # Get the predicted probabilities
        predicted_probabilities = self.predict(X_test)
        
        # Get the predicted labels (argmax of probabilities)
        _, predicted_labels = torch.max(predicted_probabilities, dim=1)
        
        # Get the true labels (argmax of true probabilities)
        _, true_labels = torch.max(y_test, dim=1)
        
        # Calculate accuracy
        accuracy = (predicted_labels == true_labels).sum().item() / len(true_labels)
        return accuracy


In [6]:
# Initialize the model, optimizer, and KLDivLoss function
input_size = X_train_day12.shape[1]
hidden_size = 10
output_size = y_train_prob.shape[1]

model = SoftLabelNN(input_size, hidden_size, output_size)
criterion = nn.KLDivLoss(reduction='batchmean')  # KLDivLoss for comparing distributions
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Instantiate the Trainer class and start training
trainer = Trainer(model, optimizer, criterion, X_train_day12, y_train_prob)
trainer.train()

Epoch [50/10000], Loss: 0.0501
Epoch [100/10000], Loss: 0.0147
Epoch [150/10000], Loss: 0.0079
Epoch [200/10000], Loss: 0.0056
Epoch [250/10000], Loss: 0.0046
Epoch [300/10000], Loss: 0.0040
Epoch [350/10000], Loss: 0.0036
Epoch [400/10000], Loss: 0.0032
Epoch [450/10000], Loss: 0.0029
Epoch [500/10000], Loss: 0.0027
Epoch [550/10000], Loss: 0.0025
Epoch [600/10000], Loss: 0.0023
Epoch [650/10000], Loss: 0.0023
Epoch [700/10000], Loss: 0.0021
Epoch [750/10000], Loss: 0.0019
Epoch [800/10000], Loss: 0.0018
Epoch [850/10000], Loss: 0.0017
Epoch [900/10000], Loss: 0.0016
Epoch [950/10000], Loss: 0.0015
Epoch [1000/10000], Loss: 0.0015
Epoch [1050/10000], Loss: 0.0014
Epoch [1100/10000], Loss: 0.0013
Epoch [1150/10000], Loss: 0.0013
Epoch [1200/10000], Loss: 0.0013
Epoch [1250/10000], Loss: 0.0012
Epoch [1300/10000], Loss: 0.0012
Epoch [1350/10000], Loss: 0.0011
Epoch [1400/10000], Loss: 0.0010
Epoch [1450/10000], Loss: 0.0010
Epoch [1500/10000], Loss: 0.0009
Epoch [1550/10000], Loss: 0.00

In [7]:
# After training, use the model to predict on the test set and calculate accuracy
test_accuracy = trainer.accuracy(X_test_day12, y_test_prob)
print(f'Test Accuracy: {test_accuracy:.4f}')

Test Accuracy: 0.4000
