In [None]:
import pandas as pd
import numpy as np
from scipy.spatial import KDTree
from statsmodels.stats.multitest import multipletests
from joblib import Parallel, delayed
import itertools
import seaborn as sns
import matplotlib.pyplot as plt
import os

# === Load dataset ===
df = pd.read_csv("CRC_data_cleaned_corrected.csv")

# === Output folder ===
output_dir = "directional_overlap_asymmetry-final"
os.makedirs(output_dir, exist_ok=True)

# === Define cell types ===
cell_types = [
    'tumor cells', 'CD163+ macros', 'smooth muscle', 'granulocytes', 'stroma',
    'CD8+ T cells', 'memory CD4+ T', 'B cells', 'vasculature', 'plasma cells',
    'generic immune', 'CD68+ macros', 'Tregs', 'CD4+ T cells', 'adipocytes', 'NK cells'
]

# === Parameters ===
threshold = 10
n_iterations = 1000
pathologies = df['pathology'].unique()
assert len(pathologies) == 2
p1, p2 = pathologies

# === Helper: Compute directional overlap percentage ===
def directional_overlap(df_path, type1, type2):
    df1 = df_path[df_path['type'] == type1][['X', 'Y']].values
    df2 = df_path[df_path['type'] == type2][['X', 'Y']].values
    if len(df1) == 0 or len(df2) == 0:
        return np.nan, np.nan
    tree1 = KDTree(df1)
    tree2 = KDTree(df2)
    a_to_b = sum(len(hits) > 0 for hits in tree2.query_ball_tree(tree1, r=threshold))
    b_to_a = sum(len(hits) > 0 for hits in tree1.query_ball_tree(tree2, r=threshold))
    return 100 * a_to_b / len(df2), 100 * b_to_a / len(df1)

# === Helper: Permutation function ===
def permuted_asym_diff(df, type1, type2, p1, p2):
    df_perm = df.copy()
    df_perm['pathology'] = np.random.permutation(df_perm['pathology'])
    df_p1 = df_perm[df_perm['pathology'] == p1]
    df_p2 = df_perm[df_perm['pathology'] == p2]
    a_to_b_p1, b_to_a_p1 = directional_overlap(df_p1, type1, type2)
    a_to_b_p2, b_to_a_p2 = directional_overlap(df_p2, type1, type2)
    if np.isnan(a_to_b_p1) or np.isnan(b_to_a_p1) or np.isnan(a_to_b_p2) or np.isnan(b_to_a_p2):
        return None
    asym_p1 = abs(a_to_b_p1 - b_to_a_p1)
    asym_p2 = abs(a_to_b_p2 - b_to_a_p2)
    return abs(asym_p1 - asym_p2)

# === Main loop: Unique unordered pairs only ===
results = []
df_p1 = df[df['pathology'] == p1]
df_p2 = df[df['pathology'] == p2]

for i, type1 in enumerate(cell_types):
    for j in range(i + 1, len(cell_types)):
        type2 = cell_types[j]

        a_to_b_p1, b_to_a_p1 = directional_overlap(df_p1, type1, type2)
        a_to_b_p2, b_to_a_p2 = directional_overlap(df_p2, type1, type2)

        if np.isnan(a_to_b_p1) or np.isnan(b_to_a_p1) or np.isnan(a_to_b_p2) or np.isnan(b_to_a_p2):
            continue

        asym_p1 = abs(a_to_b_p1 - b_to_a_p1)
        asym_p2 = abs(a_to_b_p2 - b_to_a_p2)
        observed_diff = abs(asym_p1 - asym_p2)

        print(f"\n {type1} ↔ {type2}")
        print(f"{p1}: A→B={a_to_b_p1:.2f}%, B→A={b_to_a_p1:.2f}% → Asym={asym_p1:.2f}")
        print(f"{p2}: A→B={a_to_b_p2:.2f}%, B→A={b_to_a_p2:.2f}% → Asym={asym_p2:.2f}")
        print(f"Observed Asymmetry Difference: {observed_diff:.2f}")

        # === Bootstrapping ===
        null_diffs = Parallel(n_jobs=-1)(
            delayed(permuted_asym_diff)(df, type1, type2, p1, p2) for _ in range(n_iterations)
        )
        null_diffs = [d for d in null_diffs if d is not None]
        null_diffs = np.array(null_diffs)
        p_value = np.mean(null_diffs >= observed_diff)

        # === Save null distribution plot ===
        plt.figure(figsize=(6, 4))
        plt.hist(null_diffs, bins=30, color='lightgray', edgecolor='black')
        plt.axvline(observed_diff, color='red', linestyle='--', label=f'Observed = {observed_diff:.2f}')
        plt.title(f"Null Distribution: {type1} ↔ {type2}")
        plt.xlabel("Permuted Asymmetry Difference (%)")
        plt.ylabel("Frequency")
        plt.legend()
        plt.tight_layout()
        plt.savefig(f"{output_dir}/null_distribution_{type1.replace(' ', '_')}_vs_{type2.replace(' ', '_')}.png", dpi=300)
        plt.close()

        results.append({
            "CellType1": type1,
            "CellType2": type2,
            f"{p1}_A→B": round(a_to_b_p1, 2),
            f"{p1}_B→A": round(b_to_a_p1, 2),
            f"{p2}_A→B": round(a_to_b_p2, 2),
            f"{p2}_B→A": round(b_to_a_p2, 2),
            "Asymmetry_P1": round(asym_p1, 2),
            "Asymmetry_P2": round(asym_p2, 2),
            "Observed_Diff": round(observed_diff, 2),
            "Empirical_p_value": round(p_value, 4)
        })

# === FDR Correction ===
p_values = [r["Empirical_p_value"] for r in results]
rejected, fdr_pvals, _, _ = multipletests(p_values, method='fdr_bh')
for i, r in enumerate(results):
    r["FDR_adjusted_p_value"] = round(fdr_pvals[i], 4)
    r["Significant"] = "Yes" if rejected[i] else "No"

# === Save results ===
results_df = pd.DataFrame(results)
results_df.sort_values("FDR_adjusted_p_value", inplace=True)
results_df.to_csv(f"{output_dir}/directional_asymmetry_significance_{p1}_vs_{p2}.csv", index=False)

# === Heatmap of observed asymmetry differences ===
heatmap_matrix = results_df.pivot(index="CellType1", columns="CellType2", values="Observed_Diff")
plt.figure(figsize=(12, 10))
sns.heatmap(heatmap_matrix.astype(float), annot=True, fmt=".2f", cmap="coolwarm", linewidths=0.5)
plt.title(f"Directional Asymmetry Difference Heatmap ({p1} vs {p2})")
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()
plt.savefig(f"{output_dir}/directional_asymmetry_heatmap_{p1}_vs_{p2}.png", dpi=300)
plt.close()

print(f"\nOptimized directional asymmetry analysis completed for {p1} vs {p2}")