# Options

In [None]:
%load_ext autoreload
%autoreload 2

import os
import re
import functools
import numpy as np
import pandas as pd
import corescpy as cr

# Count Threshold for Cell Quantification
count_threshold = 1

# File Paths
libid = "Uninflamed-50336C"
col_assignment = "group"
dir_data = "/mnt/cho_lab/bbdata2/outputs/TUQ97N"
out_dir = str("/mnt/cho_lab/disk2/elizabeth/data/shared-xenium-library/"
              "outputs/TUQ97N/nebraska")
path_ann = "~/corescpy/examples/markers_lineages.csv"

# Clustering Version
c_t = "leiden_res1pt5_dist0_npc30"  # high resolution
# c_t = "leiden_res0pt75_dist0pt3_npc30"  # medium resolution
# c_t = "leiden_res0pt5_dist0pt5_npc30"  # low resolution

# ToppGene Sources
srcs = ["Cells of the human intestinal tract mapped across space and time",
        "Human Ileal Epithelial cells from Crohn’s Disease",
        "Human Ileal Immune cells from Crohn’s Disease"]

# Display
pd.options.display.max_colwidth = 1000
pd.options.display.max_columns = 100
pd.options.display.max_rows = 500

# Load Data

In [None]:
# Annotation Guide File
anf = pd.read_csv(path_ann)
assign = anf.dropna(subset=col_assignment).set_index(
    "gene").rename_axis("Gene")  # markers

# Find File for Sample
files = functools.reduce(lambda i, j: i + j, [[os.path.join(
    run, i) for i in os.listdir(os.path.join(
        dir_data, run))] for run in os.listdir(dir_data)])
file_path = np.array(files)[np.where(["-".join(libid.split(
    "-")[1:]) == os.path.basename(x).split("__")[2].split(
        "-")[0] for x in files])[0][0]]

# Load Spatial Data
self = cr.Spatial(os.path.join(dir_data, file_path), library_id=libid)
self.update_from_h5ad(os.path.join(out_dir, libid + ".h5ad"))
self.get_layer("counts", inplace=True)

# ToppGene

In [None]:
tgdf, mks = self.annotate_clusters(
        None, sources=srcs, col_cell_type=c_t, max_results=10000,
        name_pattern={srcs[0]: "SmallIntestine"}, p_threshold=1e-15,
        lfc_threshold=1, n_top_genes=20, n_top_annotations=40)
longs = [(" / Per Region, Age_group, Lineage, cell class, cell type", ""),
         ("SmallIntestine", "SmInt")]
for x in longs:
    tgdf.loc[:, "Name"] = tgdf.Name.apply(lambda y: re.sub(x[0], x[1], y))
tgdf.loc[:, "Name"] = tgdf.Name.apply(lambda y: "|".join(y.split("|")[
        :-1]) if y.split("|")[-1].split("-")[0] in y.split("-")[0] and (any(
            i in y for i in ["Child", "Adult", "Pediatric", "Trim"])) else y)
tgdf = tgdf.rename_axis([c_t, "Rank"])
tgdf = tgdf.join(tgdf.apply(
    lambda x: f"{x['GenesInTermInQuery']} / {x['GenesInQuery']}",
    axis=1).to_frame("Marker Matches"))
tgdf = tgdf[list(tgdf.columns[:1]) + ["Marker Matches"] + list(
        tgdf.columns[1:-1])]
tgdf

# Quantification

## Calculate

In [None]:
%%time

outs, failed = {}, []  # hold results
for i, k in enumerate(self.rna.obs[c_t].unique()):
    if tgdf is not None and k in tgdf.reset_index(1).index.values:
        print(f"{'=' * 80}\n{k}\n ({i + 1}/{len(self.rna.obs[c_t].unique())})"
              f"\n{'=' * 80}\n\n{tgdf.loc[k].iloc[:, :3]}\n\n")
    try:
        outs[k] = self.print_markers(
                k, assign, col_cell_type=c_t, lfc_threshold=2,
                count_threshold=count_threshold, p_threshold=1e-15,
                print_threshold=15, n_top_genes=20)
    except Exception as err:
        failed += [(k, err)]
if len(failed) > 0:
    print(f"Failed: {pd.Series(dict(failed))}")

## Format

In [None]:
quant = {}
for k in outs:
    _, percs_exp, n_exp, _, _ = outs[k]
    n_exp = n_exp.copy()
    quant[k] = pd.concat([x.rename_axis("Measure", axis=1).stack() for x in [
        percs_exp.rename_axis("Cell Types", axis=1).stack().replace(
            "", np.nan).dropna().to_frame("n_exp"), n_exp.set_index(
                "Cell Types", append=True).rename({k: "Cluster"}, axis=1)]],
                         keys=["quantification", "representation"]).unstack(
                             0).unstack(-1).dropna(how="all", axis=1)
quant = pd.concat(quant, names=["Cluster"])

## Write

In [None]:
if out_dir:
    quant.to_excel(os.path.join(
        out_dir, "quantification",
        f"{self._library_id}__{c_t}_quantification_annotation.xlsx"))

# Extras

## Specific Genes

In [None]:
%%time

ggg = pd.DataFrame(["CDKN1A", "CDKN2A", "TP53", "PLAUR", "IL6ST"])[
    0].to_frame("Gene").assign(Annotation="Senescence").set_index("Gene")
outs_snc, failed = {}, []  # hold results
for i, k in enumerate(self.rna.obs[c_t].unique()):
    outs_snc[k] = self.print_markers(
        k, ggg, col_cell_type=c_t, lfc_threshold=None, print_threshold=0,
        count_threshold=count_threshold, p_threshold=None, n_top_genes=None)
if len(failed) > 0:
    print(f"Failed: {pd.Series(dict(failed))}")

In [None]:
q_snc = {}
for k in outs:
    _, percs_exp, n_exp, _, _ = outs[k]
    n_exp = n_exp.copy()
    q_snc[k] = pd.concat([x.rename_axis("Measure", axis=1).stack() for x in [
        percs_exp.rename_axis("Cell Types", axis=1).stack().replace(
            "", np.nan).dropna().to_frame("n_exp"), n_exp.set_index(
                "Cell Types", append=True).rename({k: "Cluster"}, axis=1)]],
                         keys=["quantification", "representation"]).unstack(
                             0).unstack(-1).dropna(how="all", axis=1)
q_snc = pd.concat(quant, names=["Cluster"])
q_snc