In [None]:
import os
from pathlib import Path
from typing import Annotated

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import scanpy as sc
import seaborn as sns
import tifffile

from sklearn.cluster import KMeans
from skimage.color import label2rgb
from sklearn.neighbors import radius_neighbors_graph
from sklearn.neighbors import NearestNeighbors

from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import euclidean_distances
from scipy.stats import wilcoxon
from statsmodels.stats.multitest import multipletests
from scipy.stats import entropy, chi2_contingency
from matplotlib.backends.backend_pdf import PdfPages
from statannotations.Annotator import Annotator

from scipy import sparse
import matplotlib as mpl

plt.rcParams['svg.fonttype'] = 'none'
plt.rcParams['pdf.fonttype'] = 42 #make text editable in pdf
os.chdir('/diskmnt/Projects/myeloma_scRNA_analysis/MMY_IRD/Xenium/analysis/compare_ct_abundance')
os.getcwd()


In [None]:
merged = sc.read_h5ad('/diskmnt/Projects/myeloma_scRNA_analysis/MMY_IRD/Xenium/analysis/merged.h5ad')
print(merged.shape)
len(merged.obs['Sample'].unique())

In [None]:
def p_to_stars(p):
    return "ns" if p >= 0.05 else ("*" if p < 0.05 and p >= 0.01 else ("**" if p < 0.01 and p >= 0.001 else ("***" if p < 0.001 and p >= 1e-4 else "****")))

In [None]:
merged.obs['ct'].value_counts()

In [None]:
ct_palette = {
    "HSPC": "#d6e376",
    "Erythroid": "#cfcfcf" ,
    "Megakaryocyte": "#8f8f8f",
    "GMP": "#88cf46",
    "Late Myeloid": "#4ab300",
    "Neutrophil": "#95ad74",
    "Ba/Eo/Ma": "#618038",
    "cDC": "#3bff8c",
    "Monocyte": "#3dd49f",
    "Macrophage": "#03ab70" ,
    "pDC": "#a5c3c4",
    "CD4 T": "#ff8400",
    "CD8 T": "#ff0000",
    "NK": "#9302d1",
    "Early B": "#7cb2e6" ,
    "Mature B": "#045eb5",
    "PC": "#ffbafd",
    "MSC": "#cfc10a",
    "Fibro/Osteo": "#ba9e00",
    "Adipocyte": "#ffe600",
    "Endothelial": "#cc7e7e",
    "vSMC/Pericyte": "#ad4b8e",
    "Low Confidence": "#FFFFFF"
}

timecols = {"NBM": "#0C7515", "NDMM": "#E619B9", "PT": "#CF99C3"} 



In [None]:
# make a new row combining UPN and collection
merged.obs['UPN_Collection'] = merged.obs['UPN'].astype('string').str.cat(merged.obs['Collection'].astype('string'), sep='|', na_rep='')

sample_to_collection = merged.obs.set_index('Sample')['Collection'].to_dict()
uc_to_collection = merged.obs.set_index('UPN_Collection')['Collection'].to_dict()
uc_to_upn =  merged.obs.set_index('UPN_Collection')['UPN'].to_dict()

In [None]:
def remove_outliers_iqr(df, col='frac_ct', k=1.5):
    q1 = df[col].quantile(0.25)
    q3 = df[col].quantile(0.75)
    iqr = q3 - q1
    lower = q1 - k * iqr
    upper = q3 + k * iqr
    return df[(df[col] >= lower) & (df[col] <= upper)]

In [None]:
collection_order= ['NBM', 'NDMM', 'PT']

def plot_fraction_boxplot(df_allpts, title, pdf, min_width=5,):
    width = len(df_allpts['ct'].unique())
    fig, ax = plt.subplots(figsize=(max(min_width,width-1), 5))

    df = remove_outliers_iqr(df_allpts)
    
    sns.boxplot(
        data=df,
        x="ct", y="frac_ct",
        hue="Collection",
        hue_order=collection_order,
        palette=timecols,
        fliersize=0, linewidth=1, ax=ax
    )

    sns.stripplot(
        data=df,
        x="ct", y="frac_ct",
        hue="Collection",
        hue_order=collection_order,
        dodge=True, alpha=1, size=2,
        palette="dark:black", ax=ax
    )

    # remove duplicate legend entries
    handles, labels = ax.get_legend_handles_labels()
    ax.legend(handles[:len(collection_order)],
              labels[:len(collection_order)],
              title="Collection",
              bbox_to_anchor=(1.05, 1), loc="upper left", frameon=False)

    # significance pairs 
    pairs = []
    for ct in df["ct"].unique():
        pairs.extend([
            ((ct, "NBM"), (ct, "NDMM")),
            ((ct, "NDMM"), (ct, "PT")),
            ((ct, "NBM"), (ct, "PT")),
        ])


    if pairs:
        annotator = Annotator(
            ax, pairs, data=df_allpts, x="ct", y="frac_ct",
            hue="Collection", hue_order=collection_order
        )
        annotator.configure(
            test="Mann-Whitney", text_format="star", loc="inside",
            comparisons_correction="BH", hide_non_significant=True, verbose=0
        )
        annotator.apply_and_annotate()

    ax.set_ylabel("Fraction of cells")
    ax.set_xlabel("Cell Type")
    ax.set_title(title)
    sns.despine(ax=ax)
    plt.tight_layout()
    plt.xticks(rotation=90)
    pdf.savefig(fig, bbox_inches="tight")
    plt.close(fig)


In [None]:
# get PC abundance
obs = merged.obs[['UPN_Collection', 'ct']]

# Compute per-UPN fractions of each subset
counts = (
    obs.groupby(["UPN_Collection", "ct"], observed=False)
        .size()
        .reset_index(name="n_cells")
)

totals = (
    counts.groupby(["UPN_Collection"], observed=True)["n_cells"]
          .sum()
          .reset_index(name="total_cells")
)

frac_df = counts.merge(totals, on=["UPN_Collection"])
frac_df["frac_ct"] = frac_df["n_cells"] / frac_df["total_cells"]
frac_df[['UPN', 'Collection']] = frac_df['UPN_Collection'].str.split('|', expand=True)

pc = frac_df[frac_df['ct']=='PC'].copy()


pdf_out = "PC_abundance.pdf"
with PdfPages(pdf_out) as pdf:
    plot_fraction_boxplot(pc,
                          "High-abundance subsets", pdf, min_width=1)

In [None]:
pdf_out = "allCT_abundance_FDR.pdf"
with PdfPages(pdf_out) as pdf:
    plot_fraction_boxplot(frac_df, "All CT", pdf)

In [None]:
#  exclude PCs
exclude = ["PC", "Multiplet"]   # excluded from denominator
obs = merged.obs[['UPN_Collection', 'ct']]
obs = obs[~obs['ct'].isin(exclude)]
obs['ct'] = obs['ct'].cat.remove_unused_categories()

# Compute per-UPN fractions of each subset
counts = (
    obs.groupby(["UPN_Collection", "ct"], observed=False)
        .size()
        .reset_index(name="n_cells")
)

totals = (
    counts.groupby(["UPN_Collection"], observed=True)["n_cells"]
          .sum()
          .reset_index(name="total_cells")
)

frac_df = counts.merge(totals, on=["UPN_Collection"])
frac_df["frac_ct"] = frac_df["n_cells"] / frac_df["total_cells"]
frac_df[['UPN', 'Collection']] = frac_df['UPN_Collection'].str.split('|', expand=True)

# get order of subsets based on PT abundance
pt = frac_df[frac_df['Collection'] == 'PT']
median_pt = (
    pt.groupby('ct', observed=False)['frac_ct']
        .median()
        .sort_values(ascending=False)
)
ct_order = median_pt.index.tolist()

frac_df["ct"] = pd.Categorical(
    frac_df["ct"],
    categories=ct_order,
    ordered=True
)

median_pt

In [None]:
high_subsets = frac_df[frac_df['ct'].isin(['Erythroid', 'Late Myeloid', 'Low Confidence', 'Neutrophil', 'GMP'])].copy()
high_subsets['ct'] = high_subsets['ct'].cat.remove_unused_categories()
mid_subsets_1  = frac_df[frac_df['ct'].isin([ 'MSC', 'Macrophage', 'Endothelial', 'Mature B', 'Adipocyte', 'Monocyte', 'CD8 T'])].copy()
mid_subsets_2 = frac_df[frac_df['ct'].isin([ 'Early B', 'Megakaryocyte', 'Fibro/Osteo', 'Ba/Eo/Ma', 'CD4 T', 'vSMC/Pericyte'])].copy()
low_subsets = frac_df[frac_df['ct'].isin([ 'pDC', 'cDC', 'NK', 'HSPC'])].copy()
high_subsets['ct'].unique()

In [None]:
pdf_out = "ct_relative_abundance_split_yaxes_exclPC.pdf"
with PdfPages(pdf_out) as pdf:
    plot_fraction_boxplot(high_subsets,
                          "High-abundance subsets", pdf)
    plot_fraction_boxplot(mid_subsets_1,
                          "Mid-abundance subsets", pdf)
    plot_fraction_boxplot( mid_subsets_2,
                          "Mid-abundance subsets", pdf)
    plot_fraction_boxplot( low_subsets,
                          "Low-abundance subsets", pdf)
    

In [None]:
# get paired comparisons of abundance in non-PC fraction

paired_upns = (
    frac_df[frac_df['Collection'].isin(['NDMM', 'PT'])]
    .groupby('UPN', observed=True)['Collection']
    .nunique()
)
paired_upns = paired_upns[paired_upns == 2].index.tolist()
paired_upns

paired = frac_df[(frac_df['UPN'].isin(paired_upns))].copy()

paired['Collection'] = paired['Collection'].astype('category').cat.remove_unused_categories()
paired['ct'] = paired['ct'].astype('category').cat.remove_unused_categories()

# one figure per ct:
pdf_out = "ct_relative_abundance_exclPCs_paired_boxplots.pdf"
with PdfPages(pdf_out) as pdf:
    for ct in paired['ct'].cat.categories:  
        sub = paired[paired['ct'] == ct].copy()
        fig, ax = plt.subplots(figsize=(2, 3))
    
        sns.boxplot(
            data=sub,
            x="Collection",
            y="frac_ct",
            hue="Collection",
            palette=timecols,
            fliersize=0,
            linewidth=1,
            ax=ax,
        )
    
        sns.stripplot(
            data=sub,
            x="Collection",
            y="frac_ct",
            hue="Collection",
            #dodge=True,
            alpha=1,
            size=3,
            palette="dark:black",
            ax=ax,
        )
        
        # Paired lines
        for upn, g in sub.groupby('UPN'):
            if len(g) == 2:
                pts = g.sort_values("Collection")["frac_ct"].values
                xs = [0, 1]
                ax.plot(xs, pts, color='gray', alpha=1, linewidth=1)
    
        ax.set_title(f"{ct} (paired NDMM vs PT)")
        ax.set_ylabel("Fraction of cells")
        
        pairs = [("NDMM", "PT")]
        annot = Annotator(
            ax,
            pairs,
            data=sub,
            x="Collection",
            y="frac_ct",
            order=["NDMM", "PT"],
        )
        annot.configure(
            test='Wilcoxon',  # paired Wilcoxon
            text_format='star',
            loc='inside',
            hide_non_significant=True
        )
        annot.apply_and_annotate()
    
        ax.set_ylabel("Fraction of cells")
        ax.set_xlabel("Cell Type")
        ax.set_title(f"{ct}")
        sns.despine(ax=ax)
        plt.tight_layout()
    
        pdf.savefig(fig, bbox_inches="tight")
        plt.close(fig)
