In [12]:
adfile_path = r"C:\Users\21lyb\Downloads\PBMC_simulated_cnas_041025.h5ad"

In [13]:
import numpy as np
import pandas as pd
import scanpy as sc
import infercnvpy as cnv
import matplotlib.pyplot as plt
from biomart import BiomartServer
from scipy.stats import norm
from io import StringIO
import anndata as ad
import warnings

warnings.simplefilter("ignore")

sc.settings.set_figure_params(figsize=(5, 5))

sc.logging.print_header()

Package,Version
Component,Info
numpy,1.26.4
pandas,2.2.3
scanpy,1.11.1
infercnvpy,0.6.0
matplotlib,3.10.0
biomart,0.9.2
scipy,1.15.2
anndata,0.11.4
Python,"3.11.6 | packaged by conda-forge | (main, Oct 3 2023, 10:29:11) [MSC v.1935 64 bit (AMD64)]"
OS,Windows-10-10.0.26100-SP0

Dependency,Version
wcwidth,0.2.5
packaging,24.2
typing_extensions,4.13.2
certifi,2025.4.26 (2025.04.26)
pyreadr,0.5.3
jupyter_core,5.7.2
pycparser,2.22
llvmlite,0.44.0
colorama,0.4.6
defusedxml,0.7.1


In [14]:
# Functions
def fetch_positions(adata):
    # Connect to Ensembl Biomart server
    server = BiomartServer("http://grch37.ensembl.org/biomart")
    dataset = server.datasets['hsapiens_gene_ensembl']

    # Query gene names for only missing gene positions
    no_positions = adata[:, adata.var[['start', 'end']].isna().any(axis=1)]
    with_positions = adata[:, ~adata.var[['start', 'end']].isna().any(axis=1)]
    response = dataset.search({
        'filters':{'ensembl_gene_id':list(no_positions.var['gene_ids'])},
        'attributes':['ensembl_gene_id','chromosome_name','start_position','end_position','strand']
    })

    # Convert response to DataFrame and merge with adata.var if response is successful
    if response.status_code == 200:
        print("Request successful!")
        gene_annotations_df = pd.read_csv(StringIO(response.text),sep='\t',header=None)
        gene_annotations_df.columns = ['gene_ids','chromosome','start','end','strand']
    else:
        print(f"Request failed with status code: {response.status_code}")
        print(response.text)

    # Isolate fetched genes from BioMart in no_positions adata
    fetched_positions = no_positions[:, no_positions.var['gene_ids'].isin(gene_annotations_df['gene_ids'])].copy()

    # Sort fetched genes based on ensembl gene IDs
    fetched_positions = fetched_positions[:, fetched_positions.var['gene_ids'].argsort()].copy()

    # Add the fetched gene positions to the adata
    fetched_positions.var['chromosome'] = gene_annotations_df['chromosome'].values
    fetched_positions.var['start'] = gene_annotations_df['start'].values
    fetched_positions.var['end'] = gene_annotations_df['end'].values
    fetched_positions.var['strand'] = gene_annotations_df['strand'].values

    # Concatenate fetched genes with isolated genes already with positions
    adClean = ad.concat([with_positions, fetched_positions], axis=1)

    # Include obs into the cleaned adata
    adClean.obs = with_positions.obs.copy()

    return adClean

def standardize_chromosomes(adata):
    
    adata1 = adata.copy()

    # Add 'chr' prefix to chromosome names
    adata1.var['chromosome'] = 'chr' + adata1.var['chromosome'].astype(str)

    # Define standard chromosome names with 'chr' prefix
    standard_chromosomes = ['chr' + str(i) for i in range(1, 23)] + ['chrX', 'chrY', 'chrMT']

    # Filter adata to include only genes on standard chromosomes
    adata1 = adata1[:, adata1.var['chromosome'].isin(standard_chromosomes)].copy()

    return adata1

def qc(adata,
    mt_threshold_pct=20,
    min_genes=200,
    max_counts=50000,
    min_cells=3):

    adata1 = adata.copy()

    # Find MT genes
    adata1.var['mt'] = adata1.var_names.str.startswith('MT-')
    sc.pp.calculate_qc_metrics(adata1, qc_vars=['mt'],
                            percent_top=None,
                            log1p=False,
                            inplace=True)

    # Filter out cells based on MT genes
    adClean = adata1[adata1.obs['pct_counts_mt']<mt_threshold_pct,:].copy()

    # Filter out cells based on number of genes expressed
    sc.pp.filter_cells(adClean, min_genes=min_genes)

    # Filter out cells based on total counts
    sc.pp.filter_cells(adClean, max_counts=max_counts)

    # Filter out genes expressed in few cells
    sc.pp.filter_genes(adClean, min_cells=min_cells)

    return adClean

def preprocess(adata,
               min_mean=0.0125,
               max_mean=6,
               min_disp=0.25):
    
    adNorm = adata.copy()

    # Normalize and log transform
    adata.layers['counts'] = adata.X.copy()
    sc.pp.normalize_total(adata, target_sum=1e4)
    sc.pp.log1p(adata)

    # Detect highly variable genes
    sc.pp.highly_variable_genes(adata, min_mean=min_mean, max_mean=max_mean, min_disp=min_disp)

    # PCA
    sc.tl.pca(adata, use_highly_variable=True)
    # sc.pl.pca_variance_ratio(adata,50)

    return adNorm

def KNN(adata, n_neighbors=20, n_pcs=10, annotation="cell_type"):
    sc.pp.neighbors(adata,n_neighbors=n_neighbors,n_pcs=n_pcs)
    sc.tl.umap(adata)
    sc.pl.umap(adata, color=annotation)

def downsample(adata,n_cells):
    if adata.n_obs <= n_cells:
        return adata
    else:
        return adata[np.random.choice(adata.obs_names, n_cells, replace=False), :].copy()

In [15]:
adata = sc.read_h5ad(adfile_path)

In [16]:
# Run if needed:
# adata = qc(adata)
adata = downsample(adata,n_cells=500)

In [17]:
adata = fetch_positions(adata)

Request successful!


In [18]:
adata = standardize_chromosomes(adata)

In [19]:
adata = preprocess(adata)

In [20]:
# KNN(adata,n_pcs=10)

In [21]:
def infer_cnvs(adata,reference_key='cell_type',reference_cat=[]):
    cnv_prob_mat = np.zeros((adata.shape[0], adata.shape[1]))

    # Check if reference categories are provided, if not, use unique categories from the adata
    if len(reference_cat) == 0:
        reference_cat = list(adata.obs[reference_key].unique())

    # Get separate adata of cells in reference_cat
    adata_ref = adata[~adata.obs[reference_key].isin(reference_cat),:].copy()

    for gene_idx in range(adata.shape[1]):
        if gene_idx % 10 == 0:
            print(f'------------------ Gene# {gene_idx} / {adata.shape[1]} ------------------')
        # Reference mean and std dev for this gene
        mean_ref = np.mean(adata_ref.X.toarray()[:,gene_idx])
        std_ref = np.std(adata_ref.X.toarray()[:,gene_idx])

        # Observed value
        obs_value = adata.X.toarray()[:,gene_idx]

        if np.all(obs_value)==0:
            continue

        # Assign CNV probability score based on comparing observed value to normal distribution
        cdf_value = norm.cdf(obs_value,mean_ref,std_ref)
        cnv_prob_mat[:][gene_idx] = [cdf_value if x<mean_ref else 1-cdf_value if x>mean_ref else 0 for x in obs_value]

    return cnv_prob_mat

In [22]:
reference_key = 'cell_type'
reference_cat = ['B cell', 'CD8 T cell', 'Dendritic', 'FCGR3A monocyte', 'Megakaryocyte', 'NK cell']

cnv_prob_mat = infer_cnvs(adata,reference_key,reference_cat)

------------------ Gene# 0 / 19852 ------------------
------------------ Gene# 10 / 19852 ------------------
------------------ Gene# 20 / 19852 ------------------
------------------ Gene# 30 / 19852 ------------------
------------------ Gene# 40 / 19852 ------------------
------------------ Gene# 50 / 19852 ------------------
------------------ Gene# 60 / 19852 ------------------
------------------ Gene# 70 / 19852 ------------------
------------------ Gene# 80 / 19852 ------------------
------------------ Gene# 90 / 19852 ------------------
------------------ Gene# 100 / 19852 ------------------
------------------ Gene# 110 / 19852 ------------------
------------------ Gene# 120 / 19852 ------------------
------------------ Gene# 130 / 19852 ------------------
------------------ Gene# 140 / 19852 ------------------
------------------ Gene# 150 / 19852 ------------------
------------------ Gene# 160 / 19852 ------------------
------------------ Gene# 170 / 19852 ------------------
---

IndexError: index 6153 is out of bounds for axis 0 with size 500

In [None]:
print(cnv_prob_mat)

numpy.ndarray

NameError: name 'obs_value' is not defined