In [10]:
import obonet
import pandas as pd
from collections import defaultdict
import sys
from pathlib import Path
import os
from datetime import datetime
import json
# from pypalettes import load_cmap

# Import de la fonction
from helpers import load_ontology, create_id_name_mappings, subset_ontology_by_term

In [3]:
hpo = load_ontology("../data/hpo_v2025_01_16.obo") #  Charger l'ontologie
hpor = hpo.reverse()
hpos = subset_ontology_by_term(hpo,'HP:0000118') # Subset
hpors = hpos.reverse() # reverse tree

id2name, name2id = create_id_name_mappings(hpo) # 

In [None]:
# Transformer la matrice binaire comme : hpo_id | count
DATA_PATH = "../data/ohe_20kRennes_EHR_2025_03_10.csv"
ehr_hpo = pd.read_csv(DATA_PATH, index_col=0)
FREQ_LABEL_THRESHOLD = 200


print(ehr_hpo.shape)
if "HP:0100021" in ehr_hpo.columns: # petit check de sécu
    ehr_hpo = ehr_hpo.drop(columns=["HP:0100021"])
    print(ehr_hpo.shape)


# print(ehr_hpo.head())
hpo_count = ehr_hpo.sum(axis=0)
hpo_count_df = hpo_count.reset_index()
hpo_count_df.columns = ["hpo_id", "count"]

(19763, 4085)
(19763, 4084)


In [5]:
ROOT_ID = 'HP:0000118'

def get_path_to_root(term_id):
    """Remonte les parents jusqu'à la racine et retourne le chemin"""
    path = [term_id]
    while True:
        parents = list(hpors.predecessors(term_id))
        if not parents:
            break
        term_id = parents[0]  # Choix arbitraire d’un seul parent
        path.append(term_id)
    return path

print(get_path_to_root("HP:0000252"))

['HP:0000252', 'HP:0007364', 'HP:0002060', 'HP:0100547', 'HP:0012443', 'HP:0002011', 'HP:0012639', 'HP:0000707', 'HP:0000118']


In [6]:
import networkx as nx

# Identifier la racine de l'ontologie
ROOT_ID = "HP:0000118"  # ID de la racine de l'ontologie HPO

# Extraire les enfants directs de la racine
children_of_root = list(hpors.successors(ROOT_ID))  # Liste des enfants directs de ROOT_ID

# Limiter à 25 branches principales (si plus de 25 enfants)
main_branches = children_of_root[:25]

# Définir les couleurs pour ces 25 branches principales
custom_colors = [
    "#CA3C66", "#CBEFB6", "#3357FF", "#B36A5E", "#7C9ACC",
    "#33FFF5", "#FFC733", "#8DFF33", "#A7E0E0", "#FEBB5F",
    "#3366FF", "#E3997E", "#FAEDCD", "#33CCFF", "#FFCC33",
    "#A0C6A9", "#EAC3A9", "#805050", "#EED7C5", "#F7AF9D",
    "#66CCFF", "#8CACD3", "#99FF33", "#CC7567", "#FF33FF"
]

# Vérifier qu'on a bien 25 couleurs (dans le cas où il y en a moins)
if len(main_branches) > len(custom_colors):
    raise ValueError("Il y a plus de 25 branches principales, ajustez la liste de couleurs.")

# Mapper les couleurs aux branches principales
branch_color_mapping = {
    branch: custom_colors[i] for i, branch in enumerate(main_branches)
}

# Ajouter la couleur pour "Phenotypic abnormality" (si nécessaire)
phenotypic_abnormality_color = "#FF6347"  # Rouge tomate
branch_color_mapping[ROOT_ID] = phenotypic_abnormality_color  # Associer à la racine si nécessaire


In [7]:
# CREATION DU DICTIONNAIRE POUR LA REPRESENTATION
TRUNCATED_ROOT_ID = "HP:0000118"  # ou tout autre ID de départ

data = {
    "hpo_code": [],
    "hpo_name": [],
    "parent": [],
    "frequency": [],
    "main_branch": [],
    "branch_color": []  # Ajout explicite ici
}

added_nodes = set()

for _, row in hpo_count_df.iterrows():
    term_id = row['hpo_id']
    count = row['count']
    path = get_path_to_root(term_id)

    branch_root = None
    if term_id == ROOT_ID:
        branch_root = ROOT_ID  # Attribue explicitement la racine comme branche principale
    elif TRUNCATED_ROOT_ID in path:
        idx = path.index(TRUNCATED_ROOT_ID)
        if idx > 0:
            branch_root = path[idx - 1]  # Enfant direct de la racine tronquée

    for i, node in enumerate(path):
        if node in added_nodes:
            continue

        parent = path[i + 1] if i < len(path) - 1 else ""

        data["hpo_code"].append(node)
        data["hpo_name"].append(id2name.get(node, node))
        data["parent"].append(parent)
        data["frequency"].append(count if node == term_id else 0)
        data["main_branch"].append(branch_root if branch_root else node)
        data["branch_color"].append(
        branch_color_mapping.get(branch_root if branch_root else node, 'gray')
    )

        added_nodes.add(node)


In [None]:
# Construction du DataFrame final
df = pd.DataFrame(data)

# Colonne de label visible seulement au-dessus d’un certain seuil

df["hpo_name_visible"] = df.apply(
    lambda row: row["hpo_name"] if row["frequency"] > FREQ_LABEL_THRESHOLD else "",
    axis=1
)

In [11]:
COHORT_NAME="TEST"

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = Path("../output")
output_dir.mkdir(parents=True, exist_ok=True)

# 1) Sauvegarde du DataFrame pour le sunburst
sunburst_data_path = output_dir / f"hpo_sunburst_data_{COHORT_NAME}_{timestamp}.csv"
df.to_csv(sunburst_data_path, index=False)
print(f"Jeu de données pour le sunburst sauvegardé dans : {sunburst_data_path}")

# 2) Sauvegarde du mapping branche -> couleur
branch_colors_path = output_dir / f"hpo_sunburst_branch_colors_{COHORT_NAME}_{timestamp}.json"
with open(branch_colors_path, "w") as f:
    json.dump(branch_color_mapping, f, indent=2)
print(f"Mapping des couleurs sauvegardé dans : {branch_colors_path}")

Jeu de données pour le sunburst sauvegardé dans : ../output/hpo_sunburst_data_TEST_20260119_091642.csv
Mapping des couleurs sauvegardé dans : ../output/hpo_sunburst_branch_colors_TEST_20260119_091642.json
