# Basic Pre-processing of 10X scRNA-seq data

Load the processed dataset from the snakemake pipeline

In [None]:
# Load params
import os

tenx_outdir = os.getenv("SNAKEMAKE_TENX_OUTPUT_DIR")
if tenx_outdir is None:
    raise ValueError("SNAKEMAKE_TENX_OUTPUT_DIR environment variable is not set.")
outlier_threshold = os.getenv("SNAKEMAKE_OUTLIER_THRESHOLD", "5")
outlier_threshold = float(outlier_threshold)
n_hvgs = os.getenv("SNAKEMAKE_N_HVGS", "5000")
n_hvgs = int(n_hvgs)
processed_filename = os.getenv("SNAKEMAKE_PROCESSED_FILENAME", "processed_adata.h5ad")
print("Cellranger output directory:", tenx_outdir)
print("Outlier threshold:", outlier_threshold)
print("Number of highly variable genes to compute:", n_hvgs)
print("Processed filename:", processed_filename)

Convert into a `Scanpy` object

In [None]:
import scanpy as sc
from glob import glob

# Find the 10X output directory containing the `filtered_feature_bc_matrix` directory
# Recursively search for the directory
tenx_dirs = glob(os.path.join(tenx_outdir, "**/filtered_feature_bc_matrix"), recursive=True)

if not tenx_dirs:
    raise FileNotFoundError("No 10X output directory found with 'filtered_feature_bc_matrix'.")

if len(tenx_dirs) > 1:
    print("Multiple 10X output directories found. Using the first one:", tenx_dirs[0])
else:
    print("Using 10X output directory:", tenx_dirs[0])


adata = sc.read_10x_mtx(
    tenx_dirs[0],
    gex_only=False
)

print("Loaded AnnData object successfully.")
print("Shape: ", adata.shape)

# Perform basic QC analysis and filtering

First, we will evaluate low-quality cells. Much of this is based on the [single-cell best practices workflow](https://www.sc-best-practices.org/preprocessing_visualization/quality_control.html).

In [None]:
# mitochondrial genes
adata.var["mt"] = adata.var_names.str.startswith("MT-")
# ribosomal genes
adata.var["ribo"] = adata.var_names.str.startswith(("RPS", "RPL"))
# hemoglobin genes.
adata.var["hb"] = adata.var_names.str.contains("^HB[^(P)]")
sc.pp.calculate_qc_metrics(
    adata, qc_vars=["mt", "ribo", "hb"], inplace=True, percent_top=[20], log1p=True
)
adata

Evaluate and filter cells

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

sns.displot(adata.obs["total_counts"], bins=100, kde=False)
plt.xlabel("Total counts per cell")
plt.ylabel("Number of cells")
plt.title("Distribution of total counts per cell")
plt.show()

In [None]:
fig, ax = plt.subplots()
ax = sc.pl.violin(adata, ["pct_counts_mt", "pct_counts_ribo", "pct_counts_hb"], jitter=0.4, ax=ax, show=False)
ax.set_ylabel("Percentage of counts")
ax.set_title("Percentage of counts in mitochondrial, ribosomal, and hemoglobin genes")
plt.show()

In [None]:
fig, ax = plt.subplots()
ax = sc.pl.scatter(adata, "total_counts", "n_genes_by_counts", color="pct_counts_mt", show=False, ax=ax)
ax.set_xlabel("Total counts per cell")
ax.set_ylabel("Number of genes per cell")
ax.set_title("Total counts vs. number of genes per cell")
plt.show()

Compute median standard deviation (MAD): $MAD = \text{median}(|x_i - \text{median}(x)|)$ to automatically threshold outliers

In [None]:
from scipy.stats import median_abs_deviation
import numpy as np

# Save a copy in .raw
adata.raw = adata.copy()

def evaluate_outlier(observations: np.ndarray, mad_threshold: int) -> np.ndarray:
    """
    Evaluate outliers based on the median absolute deviation (MAD) method.
    :param observations: The observations to evaluate.
    :param mad_threshold: The number of standard deviations above which to consider an outlier.
    :return: A boolean array indicating which observations are outliers.
    """
    med = np.median(observations)
    outlier = (observations < med - mad_threshold * median_abs_deviation(observations)) | (
        med + mad_threshold * median_abs_deviation(observations) < observations
    )
    return outlier

# Evaluate outliers
adata.obs["outlier"] = (
    evaluate_outlier(adata.obs["log1p_total_counts"], outlier_threshold)
    | evaluate_outlier(adata.obs["log1p_n_genes_by_counts"], outlier_threshold)
    | evaluate_outlier(adata.obs["pct_counts_in_top_20_genes"], outlier_threshold)
)
adata.obs.outlier.value_counts()

In [None]:
# MT-outliers are slightly different
adata.obs["mt_outlier"] = evaluate_outlier(adata.obs["pct_counts_mt"], outlier_threshold - 2) | (adata.obs['pct_counts_mt'] > 10)
adata.obs.mt_outlier.value_counts()

In [None]:
to_filter_out = adata.obs["outlier"] | adata.obs["mt_outlier"]
print("Number of cells to filter out:", to_filter_out.sum())
print("Out of total cells:", adata.shape[0])

adata = adata[~to_filter_out].copy()
adata

In [None]:
fig, ax = plt.subplots()
ax = sc.pl.scatter(adata, "total_counts", "n_genes_by_counts", color="pct_counts_mt", show=False, ax=ax)
ax.set_xlabel("Total counts per cell")
ax.set_ylabel("Number of genes per cell")
ax.set_title("Total counts vs. number of genes per cell (after filtering)")
plt.show()

We have skipped ambient RNA removal and doublet detection since cellranger generally does well on its own and its hard to do autonomously.

# Basic Pre-processing analysis

We will call HVGs using the pearson-residual method which acts on raw counts

In [None]:
sc.experimental.pp.highly_variable_genes(adata, n_top_genes=n_hvgs, flavor="pearson_residuals", layer=None)

print("Number of highly variable genes:", adata.var["highly_variable"].sum())

Compute the number of PCs needed using the kneedle method

In [None]:
# Create a logX layer
n_pcs = 100
adata.layers["logX"] = adata.X.copy()
sc.pp.normalize_total(adata, target_sum=1e4, layer="logX", inplace=True)
sc.pp.log1p(adata, layer="logX", copy=False)
sc.pp.pca(adata, mask_var='highly_variable', layer="logX", n_comps=n_pcs, svd_solver="arpack")
adata

In [None]:
var_ratio = adata.uns["pca"]["variance_ratio"]
cum_var = np.cumsum(var_ratio)
cum_var

In [None]:
from kneed import KneeLocator
kl = KneeLocator(
    range(1, len(cum_var) + 1),
    cum_var,
    curve="concave",
    direction="increasing",
    S=1.0,
)
if not kl.knee:
    print("Warning: no knee point. Using all PCs.")
else:
    n_pcs = int(kl.knee)
    print(f"Number of PCs to use: {n_pcs}")

In [None]:
# Plot the knee
sc.pl.pca_variance_ratio(adata, log=True, show=False)
ax = plt.gca()
ax.axvline(n_pcs, color="red", linestyle="--", label=f"n_pcs={n_pcs}")
ax.set_xlabel("Number of PCs")
ax.set_ylabel("Variance explained (log scale)")
ax.set_title("Variance explained by PCs (with knee point)")
plt.show()

# Finally, some basic clustering and visualization

In [None]:
sc.pp.neighbors(adata, n_pcs=n_pcs)
sc.tl.umap(adata, min_dist=0.5, spread=1.0)
sc.tl.leiden(adata, resolution=0.5)
sc.pl.umap(adata, color=["leiden", "n_genes_by_counts", "total_counts", "pct_counts_mt"], frameon=False, ncols=2)

# Saving the processed data

In [None]:
adata.write(processed_filename, compression="gzip")