In [26]:
import matplotlib.colors as clr
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import scanpy as sc
import shutil
from scipy.spatial import cKDTree
from sklearn.neighbors import NearestNeighbors

import warnings
warnings.filterwarnings("ignore")
sc.settings.verbosity = 0

In [27]:
# Specify data, setting, and paths
settings = {"Xenium_5K_BC": {"coords": ["global_x", "global_y"], "figsize": (5, 8)},
            "Xenium_5K_OC": {"coords": ["global_y", "global_x"], "figsize": (5, 7)}}

data = "Xenium_5K_OC"
plot_figsize = settings[data]["figsize"]
plot_coords = settings[data]["coords"]

data_dir = f"../../data/{data}/"
utils_dir = "../../data/utils/"
output_dir = f"../../output/{data}/"

plot_dir = output_dir + "stress_scores/"
if os.path.exists(plot_dir):
    shutil.rmtree(plot_dir)
    os.makedirs(plot_dir)
else:
    os.makedirs(plot_dir)

In [28]:
# Colors
color_cts = clr.LinearSegmentedColormap.from_list("bwr", ["#3B4CC0", "#4F69C6", "#FFFFFF", "#D24E4E", "#B40426"], N=256)
color_reds = plt.get_cmap("Reds")

In [29]:
# Read data
genes = pd.read_csv(data_dir + "processed_data/genes.csv")
genes = list(genes.iloc[:, 0])

adata = sc.read_h5ad(data_dir + "intermediate_data/adata.h5ad")
adata_tumor = adata[adata.obs["cell_type_merged"].isin(["Malignant cell"])].copy()
adata_tumor.obs["cell_type_merged"] = pd.Categorical(adata_tumor.obs["cell_type_merged"], categories = ["Malignant cell"], ordered = True)

sc.pp.normalize_total(adata_tumor, target_sum = 1e4)
sc.pp.log1p(adata_tumor)

adata_tumor

AnnData object with n_obs × n_vars = 160250 × 5101
    obs: 'cell_id', 'global_x', 'global_y', 'transcript_counts', 'control_probe_counts', 'genomic_control_counts', 'control_codeword_counts', 'unassigned_codeword_counts', 'deprecated_codeword_counts', 'total_counts', 'cell_area', 'nucleus_area', 'nucleus_count', 'segmentation_method', 'cell_type', 'cell_type_merged'
    var: 'gene_ids', 'feature_types', 'genome', 'gene'
    uns: 'cell_type_colors', 'cell_type_merged_colors', 'log1p'

In [None]:
# Functions to compute stress scores
def zscore_series(x):
    mu = np.nanmean(x)
    sd = np.nanstd(x)
    return (x - mu) / (sd + 1e-8)


def compute_expression_score(adata, key, target_genes, bg_genes, binary_subtyping = True):
    
    # compute score
    target_genes = [i for i in target_genes if i in bg_genes]
    sc.tl.score_genes(adata, gene_list = target_genes, ctrl_size = len(target_genes), score_name = f"{key}_raw", use_raw = False)
    
    # z-score normalization
    adata.obs[key] = np.nan
    adata.obs.loc[:, key] = zscore_series(adata.obs.loc[:, f"{key}_raw"].values)
    del adata.obs[f"{key}_raw"]

    # symmetric clipping
    vals = adata.obs[key].to_numpy()
    vmin, vmax = np.nanmin(vals), np.nanmax(vals)
    assert vmin < 0 and vmax > 0, f"Expected vmin < 0 and vmax > 0, got vmin = {vmin}, vmax = {vmax}"
    clip_val = np.min(np.abs((vmin, vmax)))
    adata.obs[f"{key}_clipped"] = adata.obs[key].clip(-clip_val, clip_val)
    
    # binary subtyping
    if binary_subtyping:
        thr = adata.obs[key].median()
        adata.obs[f"{key}_subtype"] = pd.Categorical(np.where(adata.obs[key] <= thr, "Low", "High"), categories = ["Low", "High"], ordered = True)
    
    return target_genes, clip_val, adata


def compute_spatial_score(adata_all, adata_tumor, key, source_col = "cell_type_merged", source_values = ("T cell",), k = 20, radius = 50.0, sigma = None, binary_subtyping = True, neighbor_weight_col = None):

    # select source cells
    if isinstance(source_values, (list, tuple, set)):
        src_mask = adata_all.obs[source_col].isin(source_values)
    else:
        src_mask = adata_all.obs[source_col] == source_values
    src_idx = np.where(src_mask.values)[0]

    # coordinates
    # XY_src = np.c_[adata_all.obs.loc[adata_all.obs.index[src_idx], "global_x"].values, adata_all.obs.loc[adata_all.obs.index[src_idx], "global_y"].values]
    XY_src = np.c_[adata_all.obs["global_x"].values[src_idx],  adata_all.obs["global_y"].values[src_idx]]
    XY_tum = np.c_[adata_tumor.obs["global_x"].values, adata_tumor.obs["global_y"].values]
    
    # number of neighbors within radius
    tree = cKDTree(XY_src)
    neighbor_counts = tree.query_ball_point(XY_tum, r = radius, return_length = True)
    adata_tumor.obs[f"{key}_neighbor_counts"] = neighbor_counts
    adata_tumor.obs[f"log_{key}_neighbor_counts"] = np.log1p(neighbor_counts)

    if neighbor_weight_col is not None:
        
        # retrieve weights for source cells
        expr = adata_all[src_idx, neighbor_weight_col].X
        if not isinstance(expr, np.ndarray):
            expr = expr.toarray()
        W = expr.mean(axis = 1).ravel()
        W = np.where(np.isfinite(W), W, 0.0)

        # kNN search for distances and indices
        n_neighbors = min(k, XY_src.shape[0])
        nbrs = NearestNeighbors(n_neighbors = n_neighbors, algorithm = "kd_tree").fit(XY_src)
        dist, nn_ind = nbrs.kneighbors(XY_tum, return_distance = True)

        # apply radius mask
        within = dist <= radius

        # Gaussian kernel
        if sigma is None:
            sigma = radius / 2.0
        K = np.exp(-(dist ** 2) / (2.0 * (sigma ** 2)))
        K = K * within

        # apply weights
        W_neighbors = W[nn_ind]  # shape: (n_tumor, n_neighbors)
        contrib = K * W_neighbors

        # aggregate: mean over neighbors
        alpha = 0.5
        denom = np.maximum(within.sum(axis = 1, keepdims = True), 1) ** alpha
        raw_score = (contrib.sum(axis = 1, keepdims = True) / denom).ravel()
        adata_tumor.obs[f"{key}_weighted_raw"] = raw_score

        # z-score normalization
        adata_tumor.obs[f"{key}_weighted"] = np.nan
        adata_tumor.obs.loc[:, f"{key}_weighted"] = zscore_series(adata_tumor.obs.loc[:, f"{key}_weighted_raw"].values)
        del adata_tumor.obs[f"{key}_weighted_raw"]
    
    # binary subtyping
    if binary_subtyping:
        counts = np.array(neighbor_counts)
        adata_tumor.obs[f"{key}_subtype"] = pd.Categorical(np.where(counts == 0, "Away", "Close"), categories = ["Away", "Close"], ordered = True)
        if neighbor_weight_col is not None:
            thr = adata_tumor.obs[f"{key}_weighted"].median()
            adata_tumor.obs[f"{key}_weighted_subtype"] = pd.Categorical(np.where(adata_tumor.obs[f"{key}_weighted"] <= thr, "Low", "High"), categories = ["Low", "High"], ordered = True)

    return adata_tumor


def compute_mechanical_crowding(adata_all, adata_tumor, key, k = 20, binary_subtyping = True):
    
    # coordinates
    XY_all = np.c_[adata_all.obs["global_x"].values, adata_all.obs["global_y"].values]
    XY_tum = np.c_[adata_tumor.obs["global_x"].values, adata_tumor.obs["global_y"].values]

    # kNN on all cells
    nbrs = NearestNeighbors(n_neighbors = k, algorithm = "kd_tree").fit(XY_all)

    # distances from tumor cells to all cells
    dist, idx = nbrs.kneighbors(XY_tum, return_distance = True)

    # mean distance to k nearest neighbors
    mean_dist = dist.mean(axis = 1)
    mean_dist = np.maximum(mean_dist, 1e-6)

    # inverse distance
    crowding_raw = 1.0 / mean_dist
    adata_tumor.obs[f"{key}_raw"] = crowding_raw

    # z-score normalization
    adata_tumor.obs[key] = np.nan
    adata_tumor.obs.loc[:, key] = zscore_series(adata_tumor.obs.loc[:, f"{key}_raw"].values)
    del adata_tumor.obs[f"{key}_raw"]
    
    # symmetric clipping
    vals = adata_tumor.obs[key].to_numpy()
    vmin, vmax = np.nanmin(vals), np.nanmax(vals)
    assert vmin < 0 and vmax > 0, f"Expected vmin < 0 and vmax > 0, got vmin = {vmin}, vmax = {vmax}"
    clip_val = np.min(np.abs((vmin, vmax)))
    adata_tumor.obs[f"{key}_clipped"] = adata_tumor.obs[key].clip(-clip_val, clip_val)
    
    if binary_subtyping:
        thr = adata_tumor.obs[key].median()
        adata_tumor.obs[f"{key}_subtype"] = pd.Categorical(np.where(adata_tumor.obs[key] <= thr, "Low", "High"), categories = ["Low", "High"], ordered = True)
    
    return clip_val, adata_tumor

In [31]:
# def compute_spatial_score(adata_all, adata_tumor, key, source_col = "cell_type_merged", source_values = ("T cell",), k = 20, radius = 50.0, sigma = None, binary_subtyping = True, neighbor_weight_col = None, quantile_thr = 0.05):

#     # select source cells
#     if isinstance(source_values, (list, tuple, set)):
#         src_mask = adata_all.obs[source_col].isin(source_values)
#     else:
#         src_mask = adata_all.obs[source_col] == source_values
#     src_idx = np.where(src_mask.values)[0]

#     # coordinates
#     XY_tum = np.c_[adata_tumor.obs["global_x"].values, adata_tumor.obs["global_y"].values]
    
#     if neighbor_weight_col is not None:
        
#         # retrieve weights for source cells
#         expr = adata_all[src_idx, neighbor_weight_col].X
#         if not isinstance(expr, np.ndarray):
#             expr = expr.toarray()
#         W = expr.mean(axis = 1).ravel()
#         W = np.where(np.isfinite(W), W, 0.0)
        
#         # restrict threshold to non-zero W
#         W_nz = W[W > 0]
#         if W_nz.size == 0:
#             active_idx = src_idx
#         else:
#             thr = np.quantile(W_nz, quantile_thr)
#             active_mask = (W > thr)
#             active_idx = src_idx[active_mask]
        
#         XY_src = np.c_[adata_all.obs.loc[adata_all.obs.index[active_idx], "global_x"].values, adata_all.obs.loc[adata_all.obs.index[active_idx], "global_y"].values]
        
#     else:
#         XY_src = np.c_[adata_all.obs.loc[adata_all.obs.index[src_idx], "global_x"].values, adata_all.obs.loc[adata_all.obs.index[src_idx], "global_y"].values]
    
#     # number of neighbors within radius
#     tree = cKDTree(XY_src)
#     neighbor_counts = tree.query_ball_point(XY_tum, r = radius, return_length = True)
#     adata_tumor.obs[f"{key}_neighbor_counts"] = neighbor_counts
#     adata_tumor.obs[f"log_{key}_neighbor_counts"] = np.log1p(neighbor_counts)
    
#     # binary subtyping
#     if binary_subtyping:
#         counts = np.array(neighbor_counts)
#         adata_tumor.obs[f"{key}_subtype"] = pd.Categorical(np.where(counts == 0, "Away", "Close"), categories = ["Away", "Close"], ordered = True)

#     return adata_tumor

### 1. Expression-based scores, e.g., hypoxia

In [32]:
gene_programs = {"hypoxia": ["HIF1A", "EPAS1", "NFE2L2", "CREB1", "RELA", "RELB", "NFKB1", "NFKB2"],
                 "heat_shock": ["HSP90AA1", "HSP90AB1", "HSPA1A", "HSPA1B", "HSPA6", "HSPA8", "HSPH1", "DNAJB1", "HSPB1", "HSPD1", "HSPE1"]}

gene_programs_overlap = {}

for key, geneset in gene_programs.items():
    
    # compute score
    overlap, clip_val, adata_tumor = compute_expression_score(adata_tumor, key = key, target_genes = geneset, bg_genes = genes)
    gene_programs_overlap[key] = overlap
    
    # plot score
    sc.set_figure_params(figsize = plot_figsize)
    ax = sc.pl.scatter(adata_tumor, x = plot_coords[0], y = plot_coords[1], color = f"{key}_clipped", color_map = color_cts, size = 1, title = " ", show = False)
    ax.grid(False)
    ax.set_xticks([])
    ax.set_yticks([])
    ax.set_xlabel("")
    ax.set_ylabel("")
    for spine in ax.spines.values():
        spine.set_visible(False)
    plt.savefig(plot_dir + f"{key}.jpeg", dpi = 500, bbox_inches = "tight")
    plt.close()
    
    # plot subtypes
    sc.set_figure_params(figsize = plot_figsize)
    ax = sc.pl.scatter(adata_tumor, x = plot_coords[0], y = plot_coords[1], color = f"{key}_subtype", size = 0.5, title = " ", show = False)
    ax.grid(False)
    ax.set_xticks([])
    ax.set_yticks([])
    ax.set_xlabel("")
    ax.set_ylabel("")
    for spine in ax.spines.values():
        spine.set_visible(False)
    plt.savefig(plot_dir + f"{key}_subtype.jpeg", dpi = 500, bbox_inches = "tight")
    plt.close()

### 2. Spatial-based scores, e.g., immune attack

In [33]:
# Define proximity stress programs
spatial_programs = {"immune_cell_proximity": ["CD4+ T cell", "CD8+ T cell", "T cell", "B cell", "Dendritic cell", "Myeloid cell", "Mast cell"],
                    "bcell_proximity": ["B cell"],
                    "tcell_proximity": ["CD4+ T cell", "CD8+ T cell", "T cell"],
                    "tcell_attack": ["CD4+ T cell", "CD8+ T cell", "T cell"]}

for key, src_vals in spatial_programs.items():
    
    if key == "tcell_attack":
        
        # compute score
        adata_tumor = compute_spatial_score(adata, adata_tumor, key = key, source_col = "cell_type_merged", source_values = src_vals, k = 20, radius = 100, binary_subtyping = True, neighbor_weight_col = ["GZMB", "GZMK", "GZMA"])
        
        # plot score
        sc.set_figure_params(figsize = plot_figsize)
        ax = sc.pl.scatter(adata_tumor, x = plot_coords[0], y = plot_coords[1], color = f"{key}_weighted", color_map = color_cts, size = 1, title = " ", show = False)
        ax.grid(False)
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_xlabel("")
        ax.set_ylabel("")
        for spine in ax.spines.values():
            spine.set_visible(False)
        plt.savefig(plot_dir + f"{key}_weighted_score.jpeg", dpi = 500, bbox_inches = "tight")
        plt.close()
        
        # plot subtypes
        sc.set_figure_params(figsize = plot_figsize)
        ax = sc.pl.scatter(adata_tumor, x = plot_coords[0], y = plot_coords[1], color = f"{key}_weighted_subtype", size = 0.5, title = " ", show = False)
        ax.grid(False)
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_xlabel("")
        ax.set_ylabel("")
        for spine in ax.spines.values():
            spine.set_visible(False)
        plt.savefig(plot_dir + f"{key}_weighted_subtype.jpeg", dpi = 500, bbox_inches = "tight")
        plt.close()
        
    else:
        
        # compute score
        adata_tumor = compute_spatial_score(adata, adata_tumor, key = key, source_col = "cell_type_merged", source_values = src_vals, k = 20, radius = 50, binary_subtyping = True)
        
        # plot score (number of neighbors)
        sc.set_figure_params(figsize = plot_figsize)
        ax = sc.pl.scatter(adata_tumor, x = plot_coords[0], y = plot_coords[1], color = f"log_{key}_neighbor_counts", color_map = color_cts, size = 1, title = " ", show = False)
        ax.grid(False)
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_xlabel("")
        ax.set_ylabel("")
        for spine in ax.spines.values():
            spine.set_visible(False)
        plt.savefig(plot_dir + f"{key}_neighbor_counts.jpeg", dpi = 500, bbox_inches = "tight")
        plt.close()
        
        # plot subtypes
        sc.set_figure_params(figsize = plot_figsize)
        ax = sc.pl.scatter(adata_tumor, x = plot_coords[0], y = plot_coords[1], color = f"{key}_subtype", size = 0.5, title = " ", show = False)
        ax.grid(False)
        ax.set_xticks([])
        ax.set_yticks([])
        ax.set_xlabel("")
        ax.set_ylabel("")
        for spine in ax.spines.values():
            spine.set_visible(False)
        plt.savefig(plot_dir + f"{key}_subtype.jpeg", dpi = 500, bbox_inches = "tight")
        plt.close()

In [34]:
# # Define proximity stress programs
# spatial_programs = {"immune_cell_proximity": ["CD4+ T cell", "CD8+ T cell", "T cell", "B cell", "Dendritic cell", "Myeloid cell", "Mast cell"],
#                     "bcell_proximity": ["B cell"],
#                     "tcell_proximity": ["CD4+ T cell", "CD8+ T cell", "T cell"],
#                     "tcell_attack": ["CD4+ T cell", "CD8+ T cell", "T cell"]}

# for key, src_vals in spatial_programs.items():
        
#     # compute score
#     if key == "tcell_attack":
#         adata_tumor = compute_spatial_score(adata, adata_tumor, key = key, source_col = "cell_type_merged", source_values = src_vals, k = 20, radius = 50, binary_subtyping = True, neighbor_weight_col = ["GZMB", "GZMK", "GZMA"])
#     else:
#         adata_tumor = compute_spatial_score(adata, adata_tumor, key = key, source_col = "cell_type_merged", source_values = src_vals, k = 20, radius = 50, binary_subtyping = True)
        
#     # plot score (number of neighbors)
#     sc.set_figure_params(figsize = plot_figsize)
#     ax = sc.pl.scatter(adata_tumor, x = plot_coords[0], y = plot_coords[1], color = f"log_{key}_neighbor_counts", color_map = color_cts, size = 1, title = " ", show = False)
#     ax.grid(False)
#     ax.set_xticks([])
#     ax.set_yticks([])
#     ax.set_xlabel("")
#     ax.set_ylabel("")
#     for spine in ax.spines.values():
#         spine.set_visible(False)
#     plt.savefig(plot_dir + f"{key}_neighbor_counts.jpeg", dpi = 500, bbox_inches = "tight")
#     plt.close()
    
#     # plot subtypes
#     sc.set_figure_params(figsize = plot_figsize)
#     ax = sc.pl.scatter(adata_tumor, x = plot_coords[0], y = plot_coords[1], color = f"{key}_subtype", size = 0.5, title = " ", show = False)
#     ax.grid(False)
#     ax.set_xticks([])
#     ax.set_yticks([])
#     ax.set_xlabel("")
#     ax.set_ylabel("")
#     for spine in ax.spines.values():
#         spine.set_visible(False)
#     plt.savefig(plot_dir + f"{key}_subtype.jpeg", dpi = 500, bbox_inches = "tight")
#     plt.close()

### 3. Mechanical crowding scores, e.g., mechanical stress

In [35]:
key = "mechanical"

# compute score
clip_val, adata_tumor = compute_mechanical_crowding(adata, adata_tumor, key = key, k = 20, binary_subtyping = True)

# plot score
sc.set_figure_params(figsize = plot_figsize)
ax = sc.pl.scatter(adata_tumor, x = plot_coords[0], y = plot_coords[1], color = f"{key}_clipped", color_map = color_cts, size = 1, title = " ", show = False)
ax.grid(False)
ax.set_xticks([])
ax.set_yticks([])
ax.set_xlabel("")
ax.set_ylabel("")
for spine in ax.spines.values():
    spine.set_visible(False)
plt.savefig(plot_dir + f"{key}.jpeg", dpi = 500, bbox_inches = "tight")
plt.close()

# plot subtypes
sc.set_figure_params(figsize = plot_figsize)
ax = sc.pl.scatter(adata_tumor, x = plot_coords[0], y = plot_coords[1], color = f"{key}_subtype", size = 0.5, title = " ", show = False)
ax.grid(False)
ax.set_xticks([])
ax.set_yticks([])
ax.set_xlabel("")
ax.set_ylabel("")
for spine in ax.spines.values():
    spine.set_visible(False)
plt.savefig(plot_dir + f"{key}_subtype.jpeg", dpi = 500, bbox_inches = "tight")
plt.close()

In [36]:
# Save adata_tumor
adata_tumor.write(data_dir + "processed_data/adata_tumor.h5ad")