### 1. Imports + paths

In [None]:
from pathlib import Path
import json
import numpy as np
import pandas as pd
import scanpy as sc

from src.paths import project_paths

print("Scanpy:", sc.__version__)

P = project_paths(Path.cwd())
PROJECT_ROOT = P["PROJECT_ROOT"]
CONFIG_DIR   = P["CONFIG_DIR"]
DATA_DIR     = P["DATA_DIR"]
RESULTS_DIR  = P["RESULTS_DIR"]
FIGURES_DIR  = P["FIGURES_DIR"]

RESULTS_DIR.mkdir(parents=True, exist_ok=True)

# Outputs (solo results/)
OUT_DIR = RESULTS_DIR / "summary_tables" / "conv_t_other_cleanup"
OUT_DIR.mkdir(parents=True, exist_ok=True)

print("PROJECT_ROOT:", PROJECT_ROOT)
print("CONFIG_DIR  :", CONFIG_DIR)
print("DATA_DIR    :", DATA_DIR)
print("RESULTS_DIR :", RESULTS_DIR)
print("OUT_DIR     :", OUT_DIR)

### 2. Leer config + parámetros

In [None]:
def load_simple_yaml(path: Path) -> dict:
    cfg = {}
    for line in path.read_text(encoding="utf-8").splitlines():
        line = line.strip()
        if not line or line.startswith("#"):
            continue
        if ":" not in line:
            continue
        k, v = line.split(":", 1)
        cfg[k.strip()] = v.strip().strip('"').strip("'")
    return cfg

cfg_path = CONFIG_DIR / "config.yaml"
if not cfg_path.exists():
    raise FileNotFoundError(f"Falta {cfg_path}")

CFG = load_simple_yaml(cfg_path)

# Input: salida del NB10 (main_filtered_for_analysis)
IN_NAME = CFG.get("main_filtered_for_analysis_h5ad_filename", "TFM_CIRRHOSIS_main_filtered_for_analysis.h5ad")
IN_PATH = RESULTS_DIR / IN_NAME

# Keys esperadas
LEVEL1_REFINED_KEY = CFG.get("level1_refined_key", "Level1_refined")
LEVEL2_KEY         = CFG.get("level2_key", "Level2")
PATIENT_KEY        = CFG.get("patient_id_key", "patientID")

# Layer (mantiene default histórico)
LAYER = CFG.get("analysis_layer", "log1p_10k")

print("IN_PATH            :", IN_PATH)
print("LEVEL1_REFINED_KEY :", LEVEL1_REFINED_KEY)
print("LEVEL2_KEY         :", LEVEL2_KEY)
print("PATIENT_KEY        :", PATIENT_KEY)
print("LAYER              :", LAYER)

if not IN_PATH.exists():
    raise FileNotFoundError(f"No existe IN_PATH:\n{IN_PATH}")

### 3. Helpers: mapping símbolo -> varname

In [None]:
def symbol_to_varname(adata, symbol: str):
    # 1) si var_names ya son símbolos
    if symbol in adata.var_names:
        return symbol
    # 2) si hay columna var['symbol']
    if "symbol" in adata.var.columns:
        sym = adata.var["symbol"].astype(str).to_numpy()
        hits = np.where(sym == symbol)[0]
        if hits.size > 0:
            return adata.var_names[hits[0]]
    return None

def symbols_to_varnames(adata, symbols):
    out = []
    for s in symbols:
        out.append(symbol_to_varname(adata, s))
    return out

### 4. Cargar T cells × panel genes

In [None]:
# Panel de genes (firmas) para decidir qué es Conv_T_other
signatures = {
    "Naive_Memory_like": ["IL7R","CCR7","LTB","TCF7","LEF1","MALAT1"],
    "Cytotoxic_like":    ["NKG7","GNLY","PRF1","GZMB","FGFBP2"],
    "Treg_like":         ["FOXP3","IL2RA","CTLA4","IKZF2","TNFRSF18"],
    "Exhausted_like":    ["PDCD1","LAG3","TIGIT","TOX","HAVCR2"],
    "Proliferative_like":["MKI67","TOP2A","STMN1","HMGB2","TYMS"],
    "ISG_like":          ["ISG15","IFIT1","IFIT3","MX1","OAS1"],
}

markers_extra = [
    "TRAC","TRBC1","TRBC2","CD3D","CD3E",
    "CD4","CD8A","CD8B",
    "KLRD1","FCGR3A",
    "JUN","FOS","IL32","HLA-DRA"
]

panel_symbols = []
for _, genes in signatures.items():
    panel_symbols += genes
panel_symbols += markers_extra
panel_symbols = list(dict.fromkeys(panel_symbols))  # dedup manteniendo orden

adata_b = sc.read_h5ad(IN_PATH, backed="r")

try:
    needed = [PATIENT_KEY, LEVEL1_REFINED_KEY, LEVEL2_KEY]
    missing = [c for c in needed if c not in adata_b.obs.columns]
    if missing:
        raise KeyError(f"Faltan columnas en obs: {missing}")

    # RBC-out sanity (informativo)
    rbc_n = int((adata_b.obs[LEVEL2_KEY].astype(str) == "RBC").sum())
    print("RBC cells (should be 0):", rbc_n)

    obs = adata_b.obs[[PATIENT_KEY, LEVEL1_REFINED_KEY, LEVEL2_KEY]].copy()
    obs[PATIENT_KEY]        = obs[PATIENT_KEY].astype(str)
    obs[LEVEL1_REFINED_KEY] = obs[LEVEL1_REFINED_KEY].astype(str)
    obs[LEVEL2_KEY]         = obs[LEVEL2_KEY].astype(str)

    # Subset: SOLO T (Level1_refined == T)
    mask_T = (obs[LEVEL1_REFINED_KEY].values == "T")
    nT = int(mask_T.sum())
    print("T cells:", nT)
    if nT == 0:
        raise ValueError("No hay células T en Level1_refined; esto no debería pasar.")

    # ¿Existe Conv_T_other?
    n_conv = int((obs.loc[mask_T, LEVEL2_KEY].values == "Conv_T_other").sum())
    print("Conv_T_other in T:", n_conv)
    if n_conv == 0:
        raise ValueError(
            "No hay Conv_T_other en el objeto. Si ya lo eliminaste/renombraste, este notebook no aplica."
        )

    # map symbol -> varname (robusto a Ensembl)
    panel_varnames = symbols_to_varnames(adata_b, panel_symbols)
    kept = [(s, v) for s, v in zip(panel_symbols, panel_varnames) if v is not None]
    missing_genes = [s for s, v in zip(panel_symbols, panel_varnames) if v is None]

    if len(kept) == 0:
        raise RuntimeError(
            "No se pudo mapear ningún gen del panel a var_names (ni por var['symbol']). "
            "Revisa adata.var_names / adata.var['symbol']."
        )

    present_symbols = [s for s, _ in kept]
    present_varnames = [v for _, v in kept]
    varname_to_symbol = {v: s for s, v in kept}

    print("Panel genes mapped:", len(present_symbols), "| missing:", len(missing_genes))
    if missing_genes:
        print("[INFO] Missing genes (ok):", missing_genes)

    # Cargar a memoria SOLO T cells x panel genes
    idx_T = np.where(mask_T)[0]
    adata_small = adata_b[idx_T, present_varnames].to_memory()

finally:
    # cerrar backed siempre
    try:
        adata_b.file.close()
    except Exception:
        pass

# Layer preferida
if LAYER not in adata_small.layers.keys():
    raise KeyError(f"No existe layer {LAYER} en el objeto; layers={list(adata_small.layers.keys())}")

print("adata_small:", adata_small)

### 5. Tabla de expresión + medias por Level2

In [None]:
# tabla expresión por célula (panel) usando varnames, luego renombrar a símbolos
df_expr = sc.get.obs_df(adata_small, keys=present_varnames, layer=LAYER, use_raw=False)
df_expr = df_expr.rename(columns=varname_to_symbol)

df_expr[LEVEL2_KEY] = adata_small.obs[LEVEL2_KEY].astype(str).values
df_expr[PATIENT_KEY] = adata_small.obs[PATIENT_KEY].astype(str).values

# Medias por Level2 (dentro de T) para marcadores del panel
means_by_l2 = df_expr.groupby(LEVEL2_KEY)[present_symbols].mean()

top_rows = []
for l2 in means_by_l2.index:
    s = means_by_l2.loc[l2].sort_values(ascending=False).head(25)
    for g, v in s.items():
        top_rows.append([l2, g, float(v)])

top_df = pd.DataFrame(top_rows, columns=[LEVEL2_KEY, "gene", "mean_log1p"])
out_top = OUT_DIR / "QA_ConvT_other_topmarkers_meanlog1p.csv"
top_df.to_csv(out_top, index=False)
print("Saved:", out_top)

### 6. Scores de firmas por Level2 + guardado CSV

In [None]:
score_rows = []
for l2 in means_by_l2.index:
    for sig, genes in signatures.items():
        genes_ok = [g for g in genes if g in means_by_l2.columns]
        if len(genes_ok) == 0:
            score = np.nan
        else:
            score = float(means_by_l2.loc[l2, genes_ok].mean())
        score_rows.append([l2, sig, score, len(genes_ok)])

scores = pd.DataFrame(score_rows, columns=[LEVEL2_KEY, "signature", "score_meanlog1p", "n_genes_used"])
scores_pivot = scores.pivot(index=LEVEL2_KEY, columns="signature", values="score_meanlog1p")

out_scores = OUT_DIR / "QA_ConvT_other_signature_scores.csv"
scores_pivot.to_csv(out_scores)
print("Saved:", out_scores)

print("\n=== Signature scores (Conv_T_other row) ===")
if "Conv_T_other" in scores_pivot.index:
    print(scores_pivot.loc[["Conv_T_other"]])
else:
    raise RuntimeError("No se encontró fila 'Conv_T_other' en scores_pivot (esto no debería pasar).")

### 7. Decisión automática reproducible + guardado

In [None]:
conv_scores = scores_pivot.loc["Conv_T_other"].dropna()
if conv_scores.empty:
    raise RuntimeError("Conv_T_other tiene todos los scores NaN; no se puede decidir mapping.")

best_sig = conv_scores.sort_values(ascending=False).index[0]
best_val = float(conv_scores.loc[best_sig])

# mapping final SOLO para Conv_T_other
if best_sig == "Naive_Memory_like":
    new_label = "CD4_Memory"
elif best_sig == "Cytotoxic_like":
    new_label = "CD8_Effector_Cytotoxic"     # merge
elif best_sig == "Treg_like":
    new_label = "Treg"                       # merge
elif best_sig == "Exhausted_like":
    new_label = "Exhausted_T"                # merge
elif best_sig == "Proliferative_like":
    new_label = "Proliferative_T"            # merge
elif best_sig == "ISG_like":
    new_label = "ISG_T"
else:
    new_label = "T_Other"

level2_map = {"Conv_T_other": new_label}

# Guardar mapping
out_map = OUT_DIR / "Level2_final_map.json"
with open(out_map, "w", encoding="utf-8") as f:
    json.dump(level2_map, f, indent=2)

# Guardar decisión en TXT
out_dec = OUT_DIR / "ConvT_other_decision.txt"
with open(out_dec, "w", encoding="utf-8") as f:
    f.write("Conv_T_other decision\n")
    f.write(f"- best_signature: {best_sig}\n")
    f.write(f"- best_score_meanlog1p: {best_val:.6f}\n")
    f.write(f"- new_label: {new_label}\n")
    f.write(f"- n_cells_Conv_T_other (in T): {n_conv}\n")

print("\n=== DECISIÓN FINAL ===")
print("best_signature:", best_sig)
print("best_score_meanlog1p:", best_val)
print("Conv_T_other ->", new_label)
print("Saved:", out_map)
print("Saved:", out_dec)

### 8. QA final

In [None]:
print("\n" + "="*80)
print("QA FINAL — Conv_T_other_cleanup")
print("="*80)
print("Input:", IN_PATH)
print("Output dir:", OUT_DIR)
print("n_T_cells:", int(nT))
print("n_Conv_T_other_in_T:", int(n_conv))
print("Mapping JSON:", out_map)
print("Decision TXT:", out_dec)
print("="*80)
print("\n[OK] Conv_T_other caracterizado + mapping final generado.")