In [2]:
from pathlib import Path

import pyarrow.dataset as ds
import pyarrow as pa
import pandas as pd
import numpy as np
import umap
import matplotlib.pyplot as plt
from scipy import stats
from sklearn.linear_model import LinearRegression

from lib.phenotype.constants import DEFAULT_METADATA_COLS
from lib.shared.file_utils import load_parquet_subset

  from .autonotebook import tqdm as notebook_tqdm
2025-08-19 16:59:04.143732: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1755637144.401794 2730524 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1755637144.476614 2730524 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1755637145.248828 2730524 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1755637145.248867 2730524 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1755637145.248870 2730524

In [3]:
aligned_data_path = "/lab/ops_analysis/cheeseman/denali-analysis/analysis/brieflow_output/aggregate/parquets/CeCl-all_ChCo-DAPI_CENPA__aligned.parquet"
# aligned_data = ds.dataset(aligned_data_path)
# aligned_data

# aligned_data = load_parquet_subset(aligned_data_path, n_rows=10000000)
# aligned_data

In [5]:
GENE = "RPS8"  # high perturbation score example
# GENE = "EFNB3" # low perturbation score example

In [6]:
import pyarrow.compute as pc

# Create dataset
dataset = ds.dataset(aligned_data_path)

# First, get gene rows
gene_filter = pc.equal(ds.field("gene_symbol_0"), GENE)
gene_table = dataset.to_table(filter=gene_filter)
gene_count = len(gene_table)

: 

In [None]:
GENE = "RPS8"  # high perturbation score example
# GENE = "EFNB3" # low perturbation score example

# perturbation_col = aligned_data.to_table(columns=["gene_symbol_0"]).to_pandas()[
#     "gene_symbol_0"
# ]

# gene_indices = perturbation_col.str.contains(GENE, na=False).to_numpy().nonzero()[0]
# nontargeting_indices = (
#     perturbation_col.str.contains("nontargeting", na=False).to_numpy().nonzero()[0]
# )
# nontargeting_indices = np.random.choice(
#     nontargeting_indices, size=len(gene_indices), replace=False
# )
# combined_indices = np.union1d(gene_indices, nontargeting_indices)

# subset_df = aligned_data.scanner().take(pa.array(combined_indices))
# subset_df = subset_df.to_pandas(use_threads=True, memory_pool=None).reset_index(
#     drop=True
# )

# Get gene rows
gene_rows = aligned_data[aligned_data["gene_symbol_0"] == GENE]

# Get nontargeting rows and sample equal number
nontargeting_rows = aligned_data[
    aligned_data["gene_symbol_0"].str.startswith("nontargeting")
]
sampled_nontargeting_rows = nontargeting_rows.sample(n=len(gene_rows), replace=False)

# Combine the dataframes
subset_df = pd.concat([gene_rows, sampled_nontargeting_rows], ignore_index=True)

metadata_cols = DEFAULT_METADATA_COLS + ["class", "confidence", "batch_values"]
feature_cols = subset_df.columns.difference(metadata_cols, sort=False)

In [None]:
def get_top_differential_features(cell_data, feature_cols, gene, n_features=100):
    """Get top differentially expressed features between control and gene using t-test"""
    results = []

    control_cells = cell_data[cell_data["gene_symbol_0"].str.startswith("nontargeting")]
    gene_cells = cell_data[cell_data["gene_symbol_0"] == gene]

    for feature in feature_cols:
        if feature not in control_cells.columns or feature not in gene_cells.columns:
            continue

        control_values = control_cells[feature].dropna()
        gene_values = gene_cells[feature].dropna()

        if len(control_values) < 2 or len(gene_values) < 2:
            continue

        t_stat, p_value = stats.ttest_ind(control_values, gene_values, equal_var=False)

        results.append(
            {
                "feature": feature,
                "pvalue": p_value,
                "control_mean": control_values.mean(),
                "gene_mean": gene_values.mean(),
            }
        )

    results_df = pd.DataFrame(results).sort_values("pvalue")
    return results_df.head(n_features)["feature"].tolist()


diff_exp_features = get_top_differential_features(
    subset_df,
    feature_cols,
    GENE,
    n_features=100,
)

## Derive Perturbation Score

We aim to implement a modified version of the perturbation score methodology from [Li et al., 2025](https://www.nature.com/articles/s41556-025-01626-9) on our phenotypic features.
Our implementation is:
1) Identify top n differential features with a two‑sample t‑test (perturbation vs. nontargeting controls).
2) Compute the perturbation signature β = mean(feature values in gene‑perturbed cells) − mean(feature values in controls) for those features.
3) Project and scale:
    - Project every cell onto β to obtain a scalar score.
    - Fit a 1‑D linear model to map this projection onto the binary gene label, then rescale the prediction to the [0, 1] interval to yield the final perturbation score.

In [None]:
def get_perturbation_score(cell_data, gene, diff_exp_features):
    """Compute perturbation scores for a given gene using a projection-based method."""

    feature_data = cell_data[diff_exp_features]
    gene_mask = cell_data["gene_symbol_0"] == gene
    control_mask = cell_data["gene_symbol_0"].str.startswith("nontargeting")

    gene_features = feature_data[gene_mask]
    control_features = feature_data[control_mask]

    # Compute signature vector (beta)
    beta = (gene_features.mean() - control_features.mean()).to_numpy()
    norm_squared = beta @ beta

    # Project all cells onto the signature
    projection = (feature_data @ beta) / norm_squared

    # Fit linear model to binary label
    binary_labels = gene_mask.astype(int)
    lin_model = LinearRegression().fit(projection.values.reshape(-1, 1), binary_labels)
    raw_scores = lin_model.predict(projection.values.reshape(-1, 1))

    # Scale scores to [0, 1]
    scaled_scores = (raw_scores - raw_scores.min()) / (
        raw_scores.max() - raw_scores.min()
    )
    return scaled_scores


perturbation_scores = get_perturbation_score(subset_df, GENE, diff_exp_features)
subset_df["perturbation_score"] = perturbation_scores
subset_df

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

plot_df = pd.concat(
    [
        subset_df.assign(group="all cells"),
        subset_df[subset_df["gene_symbol_0"] != GENE].assign(group="control cells"),
        subset_df[subset_df["gene_symbol_0"] == GENE].assign(group=f"{GENE} cells"),
    ]
)

plt.figure(figsize=(6, 4))
sns.histplot(
    data=plot_df,
    x="perturbation_score",
    hue="group",
    bins=50,
    element="step",
    fill=True,
    stat="count",
    common_norm=False,
    alpha=0.5,
)
plt.xlabel("Perturbation score")
plt.ylabel("Cell count")
sns.despine()
plt.tight_layout()
plt.show()

In [None]:
# Prepare data
X = subset_df_scaled[diff_exp_features]
control_X = subset_df_scaled[
    subset_df_scaled["gene_symbol_0"].str.startswith("nontargeting")
][diff_exp_features]
is_gene = subset_df_scaled["gene_symbol_0"] == GENE

# UMAP
embedding = umap.UMAP(n_jobs=-1).fit_transform(X)

# Plot
plt.figure(figsize=(8, 5))
plt.scatter(
    embedding[~is_gene, 0],
    embedding[~is_gene, 1],
    color="gray",
    s=5,
    alpha=0.3,
    label="nontargeting",
)
plt.scatter(
    embedding[is_gene, 0],
    embedding[is_gene, 1],
    color="orange",
    s=5,
    alpha=0.9,
    label=GENE,
)
plt.legend()
plt.title(f"UMAP of Cells Colored by Gene ({GENE})")
plt.xlabel("UMAP1")
plt.ylabel("UMAP2")
plt.tight_layout()
plt.show()

plt.figure(figsize=(8, 5))
plt.scatter(
    embedding[~is_gene, 0],
    embedding[~is_gene, 1],
    c="gray",
    s=5,
    alpha=0.3,
    label="nontargeting",
)
plt.scatter(
    embedding[is_gene, 0],
    embedding[is_gene, 1],
    c=subset_df_scaled[is_gene]["perturbation_score"],
    cmap="coolwarm",
    s=5,
    alpha=0.9,
    label=GENE,
)
plt.legend()
plt.title(f"UMAP Colored by Projection-Norm Perturbation Score ({GENE})")
plt.xlabel("UMAP1")
plt.ylabel("UMAP2")
plt.colorbar(label="Perturbation Score (0-1)")
plt.tight_layout()
plt.show()