### Load packages

In [None]:
import os
import warnings
import numpy as np
import pandas as pd
import scanpy as sc
import scanpy.external as sce
import seaborn as sns
import anndata as ad
import matplotlib.pyplot as plt
import matplotlib
from scipy.stats import median_abs_deviation
import pyranges as pr

import celltypist
from celltypist import models

warnings.simplefilter('ignore', FutureWarning)
warnings.simplefilter('ignore', UserWarning)
warnings.simplefilter('ignore', RuntimeWarning)

sc.settings.set_figure_params(dpi=50, facecolor="white", figsize=(10, 10))
matplotlib.rcParams['pdf.fonttype'] = 42

### Load data

In [None]:
# set variables to fetch data gene_counts.tsv files per sample

datadir = 'path/to/scnanonseq/results'
sample_prefixes = ['D0', 'Q1', 'Q2', 'Q3'] # populate list with sample names
sample_dirs = [datadir + '/' + x + '/isoquant/' for x in sample_prefixes]
sample_raw_ft_mtx_dirs = [x + f'{y}.gene_counts.tsv/' for x, y in zip(sample_dirs, sample_prefixes)]

In [None]:
# set variables to fetch data DNA barcode count files per sample

dna_counts_dir = 'path/to/scdnalong/deduplicated/barcode counts/'
dna_counts_txt = [dna_counts_dir+i+'_bc_counts.txt' for i in sample_prefixes]

In [None]:
# set variables to fetch data ATAC and RNA CellRanger barcode files to generate mappers

atac_barcodes_path = "path/to/cellranger_arc_atac_barcodes"
rna_barcodes_path = "path/to/cellranger_arc_rna_barcodes"

atac_barcodes_df = pd.read_csv(atac_barcodes_path, sep='\t', header=None)
rna_barcodes_df = pd.read_csv(rna_barcodes_path, sep='\t', header=None)

rna_barcode_mapper = dict(zip(rna_barcodes_df[0], atac_barcodes_df[0]))
atac_barcode_mapper = dict(zip(atac_barcodes_df[0], rna_barcodes_df[0]))

In [None]:
# mapper to match DNA barcode counts to RNA barcodes

file_to_suffix = {
    dna_counts_txt[0]: "-D0",
    dna_counts_txt[1]: "-Q1",
    dna_counts_txt[2]: "-Q2",
    dna_counts_txt[3]: "-Q3"
}

column_to_modify = 1

dna_count_barcode_mapper = {}

for file_path, suffix in file_to_suffix.items():
    with open(file_path, 'r') as f:
        for line in f:
            if line.strip():  # skip empty lines
                parts = line.strip().split()
                if len(parts) < 2:
                    continue  # skip malformed lines

                # Append suffix to the target column
                dna_bc = parts[column_to_modify]
                rna_bc = atac_barcode_mapper[dna_bc]
                rna_bc += suffix

                # Use the other column as key
                value = int(parts[1 - column_to_modify])
                key = rna_bc

                dna_count_barcode_mapper[key] = value

In [None]:
# load gtf file to replace gene IDs with gene names

gtf_file_path = 'path/to/gtf'
gtf_file = pr.read_gtf(gtf_file_path, as_df=True)
gene_id_to_name = dict(zip(gtf_file.gene_id, gtf_file.gene_name))

In [None]:
# load RNA data

adatas = {}

for path, sample_id in zip(sample_raw_ft_mtx_dirs, sample_prefixes):
    sample_adata = sc.read(path, delimiter='\t')
    sample_adata = sample_adata.transpose()
    sample_adata.var_names = sample_adata.var_names.map(gene_id_to_name)
    sample_adata.var_names_make_unique()
    sample_adata.obs['sample'] = sample_id
    sample_adata.obs.index = sample_adata.obs.index + '-' + sample_id
    adatas[sample_id] = sample_adata
    print(sample_id)
    print(sample_adata)

In [None]:
# concatenate adata

adata = sc.concat(adatas, label="sample")
adata.obs_names_make_unique()

In [None]:
# assign DNA counts based on the previosly generated mapper object
# replace NA values with 0

adata.obs["dna_total_counts"] = adata.obs.index.map(dna_count_barcode_mapper)
adata.obs["dna_total_counts"] = adata.obs["dna_total_counts"].fillna(0)

### QC

In [None]:
def qc_metrics(adata):
    
    # Saving count data
    adata.layers["counts"] = adata.X.copy()
    
    adata.var['mt'] = adata.var_names.str.startswith('MT-')
    adata.var['ribo'] = adata.var_names.str.startswith('RPS', 'RPL')
    adata.var['hb'] = adata.var_names.str.startswith('^HB[^(P)')
    sc.pp.calculate_qc_metrics(adata, qc_vars=['mt', 'ribo', 'hb'], inplace=True, percent_top=[20], log1p=True)

    remove = [
        'total_counts_mt',
        'total_counts_ribo',
        'total_counts_hb',
        'log1p_total_counts_mt',
        'log1p_total_counts_ribo',
        'log1p_total_counts_hb'
    ]

    adata.obs = adata.obs[[x for x in adata.obs.columns if x not in remove]]

    return adata

In [None]:
adata = qc_metrics(adata)

In [None]:
# function to generate and save QC plots

def plot_fn_b(adata, qc_step='before'):

    sc.pl.violin(
        adata,
        ["n_genes_by_counts", "total_counts", "pct_counts_mt", "pct_counts_ribo"],
        jitter=0.4,
        multi_panel=True,
        save=f'_n_genes_by_counts_total_counts_pct_counts_mt_pct_counts_ribo_{qc_step}_qc.pdf'
    )

    sc.pl.scatter(
        adata, "total_counts", "n_genes_by_counts", color="pct_counts_mt",
        save=f'_total_counts_n_genes_by_counts_pct_counts_mt_{qc_step}_qc.pdf'
    )

    sc.pl.scatter(
        adata, "total_counts", "n_genes_by_counts", color="pct_counts_ribo",
        save=f'_total_counts_n_genes_by_counts_pct_counts_ribo_{qc_step}_qc.pdf'
    )

In [None]:
# plot QC metrics before filtering

plot_fn_b(adata=adata, qc_step='before')
print('\n')

### Filtering

In [None]:
# functions to identify outliers and doublets

def is_outlier(adata, metric: str, nmads: int, show_threshold=False):
    M = adata.obs[metric]
    outlier = (M < np.median(M) - nmads * median_abs_deviation(M)) | (
        np.median(M) + nmads * median_abs_deviation(M) < M
    )
    if show_threshold == True:
        print(f'{metric} lower threshold: {np.median(M) - nmads * median_abs_deviation(M)}')
        print(f'{metric} upper threshold: {np.median(M) + nmads * median_abs_deviation(M)}')
    else:
        return outlier

def mt_is_outlier(adata, metric: str, nmads: int, show_threshold=False):
    M = adata.obs[metric]
    outlier = (
        np.median(M) + nmads * median_abs_deviation(M) < M
    )
    if show_threshold == True:
        print(f'{metric} upper threshold: {np.median(M) + nmads * median_abs_deviation(M)}')
    else:
        return outlier

def ribo_is_outlier(adata, metric: str, nmads: int, show_threshold=False):
    M = adata.obs[metric]
    outlier = (
        np.median(M) + nmads * median_abs_deviation(M) < M
    )
    if show_threshold == True:
        print(f'{metric} upper threshold: {np.median(M) + nmads * median_abs_deviation(M)}')
    else:
        return outlier

def doublet_outlier(adata):
    outlier = (adata.obs['dna_total_counts'] > 60000) | (adata.obs['total_counts'] > 20000)
    return outlier

In [None]:
adata.obs["outlier"] = (
    is_outlier(adata, "log1p_total_counts", 5)
    | is_outlier(adata, "log1p_n_genes_by_counts", 5)
    | is_outlier(adata, "pct_counts_in_top_20_genes", 5)
)

adata.obs["mt_outlier"] = mt_is_outlier(adata, "pct_counts_mt", 5)

adata.obs["ribo_outlier"] = ribo_is_outlier(adata, "pct_counts_ribo", 5)

adata.obs['doublet_outlier'] = doublet_outlier(adata)

print(adata.obs.outlier.value_counts())
print('\n')
print(adata.obs.mt_outlier.value_counts())
print('\n')
print(adata.obs.ribo_outlier.value_counts())
print('\n')
print(adata.obs.doublet_outlier.value_counts())
print('\n')

In [None]:
print(f"Total number of cells before filtering: {adata.n_obs}")
adata = adata[(~adata.obs.outlier) & (~adata.obs.mt_outlier) & (~adata.obs.ribo_outlier) & (~adata.obs.doublet_outlier)].copy()
print(f"Number of cells after filtering of low quality cells and doublets: {adata.n_obs}")

In [None]:
# plot QC metrics after filtering

plot_fn_b(adata=adata, qc_step='after')
print('\n')

### Normalization

In [None]:
def normalization_fn(adata):
    adata.layers['log_counts'] = sc.pp.log1p(adata, copy=True).X
    # Normalizing to median total counts
    sc.pp.normalize_total(adata, exclude_highly_expressed=True)
    # Logarithmize the data
    sc.pp.log1p(adata)

In [None]:
normalization_fn(adata)

### Feature selection

In [None]:
def feature_selection(adata):
    
    sc.pp.highly_variable_genes(
        adata,
        layer='log_counts',
        n_top_genes=2000,
        flavor='seurat',
        batch_key='sample'
        )
    
    sc.pl.highly_variable_genes(
        adata,
        save='_highly_variable_genes_2000_seurat.pdf'
    )

In [None]:
feature_selection(adata)

### Dimensionality reduction

In [None]:
sc.tl.pca(adata, zero_center=True, n_comps=50, use_highly_variable=True, random_state=0)

In [None]:
sc.pl.pca_variance_ratio(adata, n_pcs=50, log=True)

In [None]:
sc.pp.neighbors(adata, n_neighbors=15, knn=True, random_state=0)

In [None]:
sc.tl.umap(adata, random_state=0)

### Clustering

In [None]:
sc.tl.leiden(adata, flavor="leidenalg", n_iterations=2, random_state=0)

In [None]:
for res in [0.05, 0.5, 0.6, 0.75, 0.8, 0.85, 0.9, 0.95, 1.0, 1.25, 1.5, 2.0, 5.0, 10.0, 20.0]:
    sc.tl.leiden(
        adata, key_added=f"leiden_{res}", resolution=res, flavor="leidenalg", random_state=0
    )

### Cell type annotation

In [None]:
adata_celltypist = adata.copy()  # make a copy of our adata
adata_celltypist.X = adata.layers["counts"].copy()  # set adata.X to raw counts

In [None]:
sc.pp.normalize_total(
    adata_celltypist, target_sum=10**4
)  # normalize to 10,000 counts per cell

In [None]:
adata_celltypist.X = sc.pp.log1p(adata_celltypist.X)  # log-transform

In [None]:
models.download_models(
    force_update=True, model=["Immune_All_Low.pkl", "Immune_All_High.pkl"]
)

In [None]:
model_low = models.Model.load(model="Immune_All_Low.pkl")
model_high = models.Model.load(model="Immune_All_High.pkl")

In [None]:
predictions_high = celltypist.annotate(
    adata_celltypist,
    model=model_high,
    majority_voting=True,
    mode='best match',
    over_clustering=adata.obs['leiden_10.0']
)

In [None]:
predictions_high_adata = predictions_high.to_adata()

In [None]:
adata.obs["celltypist_cell_label_coarse"] = predictions_high_adata.obs.loc[
    adata.obs.index, "majority_voting"
]
adata.obs["celltypist_conf_score_coarse"] = predictions_high_adata.obs.loc[
    adata.obs.index, "conf_score"
]

In [None]:
predictions_low = celltypist.annotate(
    adata_celltypist,
    model=model_low,
    majority_voting=True,
    mode='best match',
    over_clustering=adata.obs['leiden_10.0']
)

In [None]:
predictions_low_adata = predictions_low.to_adata()

In [None]:
adata.obs["celltypist_cell_label_fine"] = predictions_low_adata.obs.loc[
    adata.obs.index, "majority_voting"
]
adata.obs["celltypist_conf_score_fine"] = predictions_low_adata.obs.loc[
    adata.obs.index, "conf_score"
]

### Save final adata object

In [None]:
path_to_save_adata = "path/adata.h5ad"

In [None]:
adata.write(path_to_save_adata)