### Import the necessary library

In [20]:
import torch.utils.data
import scanpy as sc
from sklearn.cluster import KMeans
from scipy.spatial import distance
import scipy.sparse
import scib 
import numpy as np
import random
import pandas as pd
from torch.distributions import  kl_divergence as kl
torch.backends.cudnn.benchmark = True
from sklearn.neighbors import KDTree
from sklearn import preprocessing
import scipy
import scib
import louvain

In [21]:
import scanpy as sc
import numpy as np
import pandas as pd
import time
import scvi
import anndata
import pandas as pd
from scipy.io import mmread
from scipy.sparse import csr_matrix
import umap
import matplotlib.pyplot as plt
import os
import tempfile
import seaborn as sns
import torch
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import f1_score

import warnings
warnings.filterwarnings("ignore")

In [22]:
import warnings

sc.set_figure_params(figsize=(6, 6), frameon=False)
sns.set_theme()
torch.set_float32_matmul_precision("high")
save_dir = tempfile.TemporaryDirectory()

%config InlineBackend.print_figure_kwargs={"facecolor": "w"}
%config InlineBackend.figure_format="retina"

import warnings
warnings.filterwarnings("ignore")

# Suppress specific ImportWarning
warnings.filterwarnings("ignore", category=ImportWarning, message=".*AltairImportHook.find_spec() not found; falling back to find_module.*")

In [36]:
import rpy2.robjects as robjects
from rpy2.robjects import pandas2ri
pandas2ri.activate()

#Perhaps require the library relocation (to the library contains kBET and lisi library)
robjects.r('.libPaths(c("current path", "library location"))')

rscript = '''
library(kBET)
library(lisi)
'''
robjects.r(rscript)
kbet = robjects.r('kBET')
lisi = robjects.r['compute_lisi']

### read the data

In [24]:
# load the prediction results to adata
import anndata
adata = anndata.read_h5ad("../human_pancreas_norm.h5ad")
adata

AnnData object with n_obs × n_vars = 16382 × 19093
    obs: 'tech', 'celltype', 'size_factors'
    layers: 'counts'

In [25]:
adata.obs['cell_type'] = adata.obs['celltype']
adata.obs['batch'] = adata.obs['tech']
adata

AnnData object with n_obs × n_vars = 16382 × 19093
    obs: 'tech', 'celltype', 'size_factors', 'cell_type', 'batch'
    layers: 'counts'

In [26]:
adata

AnnData object with n_obs × n_vars = 16382 × 19093
    obs: 'tech', 'celltype', 'size_factors', 'cell_type', 'batch'
    layers: 'counts'

## Print the normal metrics (for scib)

In [27]:
import pandas as pd
import scib

def calculate_metrics_for_embeddings(file_path, adata, embedding_keys, batch_key='batch', label_key='cell_type'):
    # Read the CSV file into a DataFrame
    combined_embeddings = pd.read_csv(file_path, index_col=0)

    # Ensure indices match
    combined_embeddings = combined_embeddings.loc[adata.obs_names]

    # Split the DataFrame into separate DataFrames for each embedding
    embeddings_dict = {key: combined_embeddings.filter(like=key) for key in embedding_keys}

    # Assign the embeddings back to adata.obsm
    for key, df in embeddings_dict.items():
        adata.obsm[key] = df.values

    # Initialize an empty list to store results
    results = []

    # Loop through each embedding and calculate metrics
    for key in embedding_keys:
        print(f"File: {file_path}, Embedding: {key}")
        sc.pp.neighbors(adata, use_rep=key)
        scib.me.cluster_optimal_resolution(adata, cluster_key="cluster", label_key=label_key)
        metrics = scib.me.metrics(
            adata, adata_int=adata, 
            silhouette_=True, graph_conn_=True, 
            ari_=True, nmi_=True, isolated_labels_=True, 
            isolated_labels_f1_=True,
            isolated_labels_asw_=True, 
            batch_key=batch_key, label_key=label_key, 
            embed=key
        )
        
        for metric, score in metrics.items():
            # Store results with identifying information
            result_entry = {
                'file_path': file_path,
                'embedding_key': key,
                'metric': metric,
                'score': score
            }
            results.append(result_entry)
            
            # Print the file path, embedding key, and result
            print(f"File: {file_path}, Embedding: {key}, Metric: {metric}, Score: {score}")

    return results

In [28]:
file_paths = [
    # "../embeddings/full_annotated_supervised_human_pancreas_new.csv",
    # "../embeddings/cellassign_human_pancreas_new.csv",
    "../embeddings/azimuth_human_pancreas_new.csv"
    # "../embeddings/singleR_human_pancreas_new.csv",
    # "../embeddings/partially_annotated_batches_human_pancreas_new_30.csv",
    # "../embeddings/partially_annotated_batches_human_pancreas_new_50.csv",
    # "../embeddings/partially_annotated_batches_human_pancreas_new_70.csv",
    # "../embeddings/mixing_at_edge_human_pancreas_new_30.csv",
    # "../embeddings/mixing_at_edge_human_pancreas_new_50.csv"
    # "../embeddings/mixing_at_edge_human_pancreas_new_70.csv",
    # "../embeddings/randomly_missing_human_pancreas_new_30.csv",
    # "../embeddings/randomly_missing_human_pancreas_new_50.csv",
    # "../embeddings/randomly_missing_human_pancreas_new_70.csv"
]

# Initialize the list of embedding keys to use
embedding_keys = ["X_scDREAMER", "X_ItClust"]

# Prepare a list to collect all results
all_results = []

# Loop through each file and calculate metrics
for file_path in file_paths:
    results = calculate_metrics_for_embeddings(file_path, adata, embedding_keys)
    all_results.extend(results)

# Now `all_results` contains metrics for all embedding files with identifying information

File: ../embeddings/azimuth_human_pancreas_new.csv, Embedding: X_scDREAMER
resolution: 0.1, nmi: 0.9074867042404949
resolution: 0.2, nmi: 0.9074098652502435
resolution: 0.3, nmi: 0.9126533242771944
resolution: 0.4, nmi: 0.9126533242771944
resolution: 0.5, nmi: 0.9130408917607411
resolution: 0.6, nmi: 0.9131643696638835
resolution: 0.7, nmi: 0.9131643696638835
resolution: 0.8, nmi: 0.9143679335240709
resolution: 0.9, nmi: 0.8452541929197618
resolution: 1.0, nmi: 0.8476517375981478
resolution: 1.1, nmi: 0.8418740534459841
resolution: 1.2, nmi: 0.8093199635380132
resolution: 1.3, nmi: 0.786862176171211
resolution: 1.4, nmi: 0.7863415161919314
resolution: 1.5, nmi: 0.7661431395331473
resolution: 1.6, nmi: 0.7587116880104486
resolution: 1.7, nmi: 0.7328886339873156
resolution: 1.8, nmi: 0.7306989748905441
resolution: 1.9, nmi: 0.7111661823089871
resolution: 2.0, nmi: 0.7032570561166525
optimised clustering against cell_type
optimal cluster resolution: 0.8
optimal score: 0.9143679335240709
N

KeyboardInterrupt: 

In [None]:
# Flatten the nested dictionaries and extract desired scores
results = []
for entry in all_results:
    file_path = entry['file_path']
    embedding_key = entry['embedding_key']
    score_series = entry['score']
    for metric, value in score_series.items():
        # Create an entry for each score
        results.append({
            'file_path': file_path,
            'embedding_key': embedding_key,
            'metric': metric,
            'score': value
        })

# Convert results list to DataFrame
results_df = pd.DataFrame(results)

In [None]:
results_df

In [12]:
results_df.to_csv('../metrics/human_pancreas_scib_metrics_new_3.csv', index=True)

## Print the overall metrics for TPT

In [29]:
# Positive and true positive cells defined in iMAP
def positive_true_positive(adata, batch_key='batch', celltype_key='cell_type', use_raw=False,k1=20, k2=100, tp_thr=3., distance='cosine', embed='X_pca'):
    celltype_list = adata.obs[celltype_key]
    batch_list = adata.obs[batch_key]

    temp_c = adata.obs[celltype_key].value_counts()
    temp_b = pd.crosstab(adata.obs[celltype_key], adata.obs[batch_key])
    temp_b_prob = temp_b.divide(temp_b.sum(1), axis=0)
    
    if use_raw:
        if isinstance(adata.X, scipy.sparse.csr.csr_matrix):
            X = adata.X.todense()
        else:
            X = adata.X
    else:
        X = adata.obsm[embed]
    if distance == 'cosine':
        X = preprocessing.normalize(X, axis=1)

    t1 = KDTree(X)

    p_list = []
    tp_list = []

    for cell in range(len(X)):

        # Discriminate positive cells
        neig1 = min(k1, temp_c[celltype_list[cell]])
        NNs = t1.query(X[cell].reshape(1,-1), neig1+1, return_distance=False)[0, 1:]
        c_NN = celltype_list[NNs]
        true_rate = sum(c_NN == celltype_list[cell])/neig1
        if true_rate > 0.5:
            p_list.append(True)
        else:
            p_list.append(False)

        # Discriminate true positive cells
        if p_list[cell] == True:
            neig2 = min(k2, temp_c[celltype_list[cell]])
            NNs = t1.query(X[cell].reshape(1,-1), neig2, return_distance=False)[0]
            NNs_c = celltype_list[NNs]
            NNs_i = NNs_c == celltype_list[cell]
            NNs = NNs[NNs_i] # get local neighbors that are from the same cell type
            neig2 = len(NNs)
            NNs_b = batch_list[NNs]

            max_b = 0
            b_prob = temp_b_prob.loc[celltype_list[cell]]
            for b in set(batch_list):
                if b_prob[b] > 0 and b_prob[b] < 1:
                    p_b = sum(NNs_b == b)
                    stat_b = abs(p_b - neig2*b_prob[b]) / np.sqrt(neig2*b_prob[b]*(1-b_prob[b]))
                    max_b = max(max_b, stat_b)
            if max_b <= tp_thr:
                tp_list.append(True)
            else:
                tp_list.append(False)
        else:
            tp_list.append(False)

    pos_rate = sum(p_list)/len(p_list)
    truepos_rate = sum(tp_list)/len(tp_list)
    return pos_rate, truepos_rate


In [30]:
import pandas as pd
import scanpy as sc

def evaluate_embeddings(adata, file_paths, embedding_keys, batch_key='batch', celltype_key='cell_type', use_raw=False, k1=20, k2=100, tp_thr=3.0, distance='cosine'):
    results = []  # Initialize results as a list
    
    for file_path in file_paths:
        # Read the CSV file into a DataFrame
        combined_embeddings = pd.read_csv(file_path, index_col=0)
        
        # Ensure indices match
        combined_embeddings = combined_embeddings.loc[adata.obs_names]
        
        # Split the DataFrame into separate DataFrames for each embedding
        embeddings_dict = {key: combined_embeddings.filter(like=key) for key in embedding_keys}
        
        # Assign the embeddings back to adata.obsm
        for key, df in embeddings_dict.items():
            adata.obsm[key] = df.values
            
            # Evaluate embedding
            pos_rate, truepos_rate = positive_true_positive(
                adata,
                batch_key=batch_key,
                celltype_key=celltype_key,
                use_raw=use_raw,
                k1=k1,
                k2=k2,
                tp_thr=tp_thr,
                distance=distance,
                embed=key
            )
            
            # Store results with identifying information
            result_entry = {
                'file_path': file_path,
                'embedding_key': key,
                'pos_rate': pos_rate,
                'truepos_rate': truepos_rate
            }
            results.append(result_entry)
            
            # Print the file path, embedding key, and result
            print(f"File Path: {file_path}, Embedding Key: {key}, pos_rate: {pos_rate}, truepos_rate: {truepos_rate}")
    
    # Convert results list to DataFrame
    results_df = pd.DataFrame(results)
    
    return results_df

In [31]:
file_paths = [
    # "../embeddings/full_annotated_supervised_human_pancreas_new.csv",
    # "../embeddings/cellassign_human_pancreas_new.csv",
    "../embeddings/azimuth_human_pancreas_new.csv"
    # "../embeddings/singleR_human_pancreas_new.csv",
    # "../embeddings/partially_annotated_batches_human_pancreas_new_30.csv",
    # "../embeddings/partially_annotated_batches_human_pancreas_new_50.csv",
    # "../embeddings/partially_annotated_batches_human_pancreas_new_70.csv",
    # "../embeddings/mixing_at_edge_human_pancreas_new_30.csv",
    # "../embeddings/mixing_at_edge_human_pancreas_new_50.csv"
    # "../embeddings/mixing_at_edge_human_pancreas_new_70.csv",
    # "../embeddings/randomly_missing_human_pancreas_new_30.csv",
    # "../embeddings/randomly_missing_human_pancreas_new_50.csv",
    # "../embeddings/randomly_missing_human_pancreas_new_70.csv"
]

# Initialize the list of embedding keys to use
embedding_keys = ["X_scDREAMER"]


results_df = evaluate_embeddings(adata, file_paths, embedding_keys)

# Save the DataFrame to a CSV file
# results_df.to_csv('../metrics/human_pancreas_true_positive_true_scores_new.csv', index=False)

File Path: ../embeddings/azimuth_human_pancreas_new.csv, Embedding Key: X_scDREAMER, pos_rate: 0.9702722500305213, truepos_rate: 0.6226345989500671


In [32]:
results_df

Unnamed: 0,file_path,embedding_key,pos_rate,truepos_rate
0,../embeddings/azimuth_human_pancreas_new.csv,X_scDREAMER,0.970272,0.622635


## Print the overall metrics for kBET

In [39]:
import pandas as pd
import scanpy as sc
import numpy as np
import scib  

def evaluate_embeddings(adata, file_paths, embedding_keys, batch_key='batch', celltype_key='cell_type'):
    results = []  # Initialize results as a list

    for file_path in file_paths:
        # Read the CSV file into a DataFrame
        combined_embeddings = pd.read_csv(file_path, index_col=0)

        # Ensure indices match
        combined_embeddings = combined_embeddings.loc[adata.obs_names]

        # Split the DataFrame into separate DataFrames for each embedding
        embeddings_dict = {key: combined_embeddings.filter(like=key) for key in embedding_keys}

        # Assign the embeddings back to adata.obsm
        for key, df in embeddings_dict.items():
            adata.obsm[key] = df.values

            # Calculate kBET score
            kbet_score = scib.metrics.kBET(
                adata,
                batch_key=batch_key,
                label_key=celltype_key,
                type_=None,
                embed=key,
                scaled=True,
                verbose=False,
            )

            # Store results with identifying information
            result_entry = {
                'file_path': file_path,
                'embedding_key': key,
                'kbet_score': kbet_score
            }
            results.append(result_entry)

            # Print the file path, embedding key, and result
            print(f"File Path: {file_path}, Embedding Key: {key}, kBET Score: {kbet_score}")

    # Convert results list to DataFrame
    results_df = pd.DataFrame(results)

    return results_df

In [12]:
file_paths = [
    "../embeddings/full_annotated_supervised_human_pancreas_new.csv",
    "../embeddings/cellassign_human_pancreas_new.csv",
    "../embeddings/azimuth_human_pancreas_new.csv",
    "../embeddings/singleR_human_pancreas_new.csv",
    "../embeddings/partially_annotated_batches_human_pancreas_new_30.csv",
    "../embeddings/partially_annotated_batches_human_pancreas_new_50.csv",
    "../embeddings/partially_annotated_batches_human_pancreas_new_70.csv",
    "../embeddings/mixing_at_edge_human_pancreas_new_30.csv",
    "../embeddings/mixing_at_edge_human_pancreas_new_50.csv",
    "../embeddings/mixing_at_edge_human_pancreas_new_70.csv",
    "../embeddings/randomly_missing_human_pancreas_new_30.csv",
    "../embeddings/randomly_missing_human_pancreas_new_50.csv",
    "../embeddings/randomly_missing_human_pancreas_new_70.csv"
]

# Initialize the list of embedding keys to use
embedding_keys = ["X_scDREAMER", "X_ItClust"]


results_df = evaluate_embeddings(adata, file_paths, embedding_keys)

# Save the DataFrame to a CSV file
results_df.to_csv('../metrics/human_pancreas_kbet_scores_new.csv', index=False)

OMP: Info #276: omp_set_nested routine deprecated, please use omp_set_max_active_levels instead.


Adding diffusion to step 4
t_cell consists of a single batch or is too small. Skip.
File Path: ../embeddings/full_annotated_supervised_human_pancreas_new.csv, Embedding Key: X_scDREAMER, kBET Score: 0.7717152844454236
Adding diffusion to step 4
Adding diffusion to step 4
Adding diffusion to step 4
Adding diffusion to step 5
t_cell consists of a single batch or is too small. Skip.
File Path: ../embeddings/full_annotated_supervised_human_pancreas_new.csv, Embedding Key: X_ItClust, kBET Score: 0.661290290489024
Adding diffusion to step 4
Adding diffusion to step 4
Adding diffusion to step 5
Adding diffusion to step 6
Adding diffusion to step 4
Adding diffusion to step 4
Adding diffusion to step 5
t_cell consists of a single batch or is too small. Skip.
File Path: ../embeddings/cellassign_human_pancreas_new.csv, Embedding Key: X_scDREAMER, kBET Score: 0.5840168834240113
Adding diffusion to step 4
Adding diffusion to step 5
Adding diffusion to step 4
Adding diffusion to step 5
Adding diffus

## Print the overall metrics for LISI

In [33]:
# LISI
def calculate_LISI(adata, labels=None, total_cells=None, batch_key='batch', celltype_key='celltype', embed = 'X_pca'):
    if celltype_key is None:
        # Calculate bLISI
        lisi_b = 0
        lisi_c = lisi_f1 = np.nan
        if adata.shape[0] < 90:
            perplexity = int(adata.shape[0]/6)
        else:
            perplexity = 30
        lisi_res = lisi(adata.obsm[embed], adata.obs, batch_key, perplexity=perplexity)
        lisi_b = (np.mean(lisi_res)[batch_key]-1.)/(len(set(adata.obs[batch_key]))-1.)
    else:
        # Calculate 1-cLISI
        lisi_res = lisi(adata.obsm[embed], adata.obs, celltype_key)
        lisi_c = 1 - (np.mean(lisi_res)-1.)/(len(set(adata.obs[celltype_key]))-1.)

        # Calculate bLISI
        lisi_b = 0
        for label in labels:
            adata_sub = adata[adata.obs[celltype_key] == label]
            if adata_sub.shape[0] < 90:
                perplexity = int(adata_sub.shape[0]/6)
            else:
                perplexity = 30
            lisi_res = lisi(adata_sub.obsm[embed], adata_sub.obs, batch_key, perplexity=perplexity)
            lisi_batch = (np.mean(lisi_res)-1.)/(len(set(adata_sub.obs[batch_key]))-1.)
            lisi_b += lisi_batch*adata_sub.shape[0]
        lisi_b /= total_cells
        lisi_c = lisi_c.item()
        lisi_b = lisi_b.item()  # This will print only the value without the data type

        # Calcualte F1 score
        lisi_f1 = (2*lisi_c*lisi_b)/(lisi_c + lisi_b)
    
    return lisi_c, lisi_b, lisi_f1

In [34]:
def evaluate_embeddings(adata, file_paths, embedding_keys, batch_key='batch', celltype_key='cell_type'):
    results = []  # Initialize results as a list
    
    for file_path in file_paths:
        # Read the CSV file into a DataFrame
        combined_embeddings = pd.read_csv(file_path, index_col=0)
        
        # Ensure indices match
        combined_embeddings = combined_embeddings.loc[adata.obs_names]
        
        # Split the DataFrame into separate DataFrames for each embedding
        embeddings_dict = {key: combined_embeddings.filter(like=key) for key in embedding_keys}
        
        # Assign the embeddings back to adata.obsm
        for key, df in embeddings_dict.items():
            adata.obsm[key] = df.values
            
            # Evaluate embedding
            lisi_c, lisi_b, lisi_f1 = calculate_LISI(
                adata, labels=adata.obs[celltype_key].unique(), total_cells=adata.shape[0], batch_key=batch_key, celltype_key=celltype_key, embed=key
            )
            
            # Store results with identifying information
            result_entry = {
                'file_path': file_path,
                'embedding_key': key,
                'lisi_c': lisi_c,
                'lisi_b': lisi_b,
                'lisi_f1': lisi_f1
            }
            results.append(result_entry)
            
            # Print the file path, embedding key, and result
            print(f"File Path: {file_path}, Embedding Key: {key}, LISI_c: {lisi_c}, LISI_b: {lisi_b}, LISI_f1: {lisi_f1}")
    
    # Convert results list to DataFrame
    results_df = pd.DataFrame(results)
    
    return results_df

In [37]:
file_paths = [
    # "../embeddings/full_annotated_supervised_human_pancreas_new.csv",
    # "../embeddings/cellassign_human_pancreas_new.csv",
    "../embeddings/azimuth_human_pancreas_new.csv",
    # "../embeddings/singleR_human_pancreas_new.csv",
    # "../embeddings/partially_annotated_batches_human_pancreas_new_30.csv",
    # "../embeddings/partially_annotated_batches_human_pancreas_new_50.csv",
    # "../embeddings/partially_annotated_batches_human_pancreas_new_70.csv",
    # "../embeddings/mixing_at_edge_human_pancreas_new_30.csv",
    "../embeddings/mixing_at_edge_human_pancreas_new_50.csv"
    # "../embeddings/mixing_at_edge_human_pancreas_new_70.csv",
    # "../embeddings/randomly_missing_human_pancreas_new_30.csv",
    # "../embeddings/randomly_missing_human_pancreas_new_50.csv",
    # "../embeddings/randomly_missing_human_pancreas_new_70.csv"
]

# Initialize the list of embedding keys to use
embedding_keys = ["X_scDREAMER"]

results_df = evaluate_embeddings(adata, file_paths, embedding_keys)

# Save the DataFrame to a CSV file
# results_df.to_csv('../metrics/human_pancreas_lisi_scores_new_2.csv', index=False)

File Path: ../embeddings/azimuth_human_pancreas_new.csv, Embedding Key: X_scDREAMER, LISI_c: 0.9953925246237143, LISI_b: 0.4059995542819925, LISI_f1: 0.5767535401633267
File Path: ../embeddings/mixing_at_edge_human_pancreas_new_50.csv, Embedding Key: X_scDREAMER, LISI_c: 0.9959673040677928, LISI_b: 0.4088728242144107, LISI_f1: 0.5797442089547263


In [38]:
results_df

Unnamed: 0,file_path,embedding_key,lisi_c,lisi_b,lisi_f1
0,../embeddings/azimuth_human_pancreas_new.csv,X_scDREAMER,0.995393,0.406,0.576754
1,../embeddings/mixing_at_edge_human_pancreas_ne...,X_scDREAMER,0.995967,0.408873,0.579744


# Combine the results for each settings

In [1]:
import pandas as pd

# 读入Excel文件
file_path = './human_pancreas_metrics.csv'  # 替换为你的文件路径
df = pd.read_csv(file_path)

# 处理数据，对每一个setting生成一个表
settings = df['type'].unique()
output_file = pd.ExcelWriter('./human_pancreas_metrics_grouped.xlsx')

# 创建一个Excel writer对象，利用openpyxl引擎
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
    for setting in settings:
        setting_df = df[df['type'] == setting]

        # 创建透视表，将行设为method，列设为metric，score为值
        pivot_table = setting_df.pivot(index='method', columns='metric', values='score')

        # 写入新的Excel sheet中
        pivot_table.to_excel(writer, sheet_name=setting)