In [1]:
# -*- coding: utf-8 -*-
"""
HPO Vector DB Builder via Pronto Graph Extraction

Features:
- Automatic OBO download & refresh (`initialize_hpo_resources`)
- Build metadata directly from the HPO OBO using pronto:
  • labels, definitions, synonyms, ALT IDs, xrefs
  • full lineage (root→term) across all inheritance paths
  • organ system(s) (direct child under phenotypic abnormality)
- Optional SBERT (BioBERT) or BGE embeddings
- Antonym `direction` flags
- Float16 quantization & compressed storage (.npz)
- Optional limit for test extraction
"""
import time
import json
import re
import requests
from pathlib import Path
from collections import deque

import numpy as np
import pandas as pd
import pronto      # pip install pronto
from fastembed import TextEmbedding
from sentence_transformers import SentenceTransformer
from tqdm import tqdm

# ────────── 0. Automatic OBO Download & Load ──────────
_hpo_ontology = None

def initialize_hpo_resources(
    obo_url: str = "https://purl.obolibrary.org/obo/hp.obo",
    obo_path: str = "hp.obo",
    refresh_days: int = 14,
) -> pronto.Ontology:
    """
    Download or refresh the HPO OBO file if older than `refresh_days`,
    then load it via pronto.
    """
    global _hpo_ontology
    if _hpo_ontology is None:
        obo_file = Path(obo_path)
        if not obo_file.exists() or ((time.time() - obo_file.stat().st_mtime) / 86400) > refresh_days:
            print(f"Downloading HPO ontology from {obo_url} …")
            resp = requests.get(obo_url, timeout=30)
            resp.raise_for_status()
            obo_file.write_text(resp.text, encoding="utf-8")
        with open(obo_file, 'rb') as f:
            _hpo_ontology = pronto.Ontology(f)
        print(f"Loaded ontology with {len(list(_hpo_ontology.terms()))} terms")
    return _hpo_ontology

ROOT_ID  = 'HP:0000001'  # “All”
PHENO_ID = 'HP:0000118'  # “Phenotypic abnormality”

# load & cache ontology once
ont = initialize_hpo_resources()

# maps HP_ID → [immediate parent IDs]
parent_map = {
    term.id: [p.id for p in term.superclasses(distance=1)]
    for term in ont.terms()
}

# maps HP_ID → term name
label_map = {term.id: term.name for term in ont.terms()}

# ────────── 1. Multi-Path Lineage Helper ──────────
_lineage_memo = {}

def _build_lineage_paths(hp_id, parent_map, seen=None):
    """
    Return all paths from ROOT_ID → ... → hp_id.
    Each path is a list of HP_ID strings.
    Avoids cycles by tracking `seen`.
    """
    if seen is None:
        seen = set()
    if hp_id in seen:              # cycle guard
        return []
    seen = seen | {hp_id}

    # cached?
    if hp_id in _lineage_memo:
        return _lineage_memo[hp_id]

    # base case: we hit the root
    if hp_id == ROOT_ID:
        paths = [[ROOT_ID]]
    else:
        parents = parent_map.get(hp_id, [])
        if not parents:
            # orphan: just attach root + self
            paths = [[ROOT_ID, hp_id]]
        else:
            paths = []
            for p in parents:
                for ppath in _build_lineage_paths(p, parent_map, seen):
                    paths.append(ppath + [hp_id])

    _lineage_memo[hp_id] = paths
    return paths

# ────────── 2. Build DataFrame from OBO ──────────
CLEAN_ABNORMALITY = re.compile(r'(?i)^Abnormality of(?: the)?\s*')

def _sort_by_numeric(entries: list[str]) -> list[str]:
    def key_fn(e: str):
        digs = ''.join(filter(str.isdigit, e))
        return int(digs) if digs else float('inf')
    return sorted(entries, key=key_fn)

def build_hpo_dataframe(obo_path: str = "hp.obo", limit: int = None) -> pd.DataFrame:
    ont     = pronto.Ontology(Path(obo_path))
    records = []
    terms   = list(ont.terms())[:limit] if limit else list(ont.terms())

    for term in tqdm(terms, desc="Building HPO DataFrame", unit="term"):
        hp_id      = term.id
        label      = term.name
        definition = term.definition or ""
        synonyms   = [syn.description for syn in term.synonyms]

        # ── ALT IDs ──
        alt_ids = _sort_by_numeric(list(term.alternate_ids))

        # ── XREFS ──
        snomedct, umls = [], []
        for xr in term.xrefs:
            txt = str(xr)
            m   = re.search(r"'(.+?:.+?)'", txt)
            ent = m.group(1) if m else txt
            pre, _, _ = ent.partition(':')
            if pre.upper() == 'UMLS':
                umls.append(ent)
            elif pre.upper().startswith('SNOMED'):
                snomedct.append(ent)
        snomedct = _sort_by_numeric(snomedct)
        umls      = _sort_by_numeric(umls)

        # ── LINEAGE ──
        paths = _build_lineage_paths(hp_id, parent_map) or [[ROOT_ID, hp_id]]
        for path_ids in paths:
            lineage_str = " -> ".join(f"{label_map[i]} ({i})" for i in path_ids)

            # ── ORGAN SYSTEM ──
            if PHENO_ID in path_ids:
                idx   = path_ids.index(PHENO_ID)
                organ = label_map.get(path_ids[idx+1], "Other") if idx+1 < len(path_ids) else "Other"
            else:
                organ = "Other"
            organ_system = CLEAN_ABNORMALITY.sub("", organ).title()

            # ── EMIT ROWS ──
            for phrase in [label] + synonyms:
                if not phrase:
                    continue
                # **Only title-case** the phrase; do NOT strip anything from it
                clean_phrase = phrase.title()

                records.append({
                    "hp_id":        hp_id,
                    "phrase":       clean_phrase,
                    "organ_system": organ_system,
                    "lineage":      lineage_str,
                    "definition":   definition,
                    "alt_ids":      ";".join(alt_ids),
                    "snomedct":     ";".join(snomedct),
                    "umls":         ";".join(umls),
                })

    return pd.DataFrame(
        records,
        columns=[
            "hp_id", "phrase", "organ_system", "lineage",
            "definition", "alt_ids", "snomedct", "umls"
        ]
    )
# ────────── 3. Clean Text for Embedding ──────────
PAT = re.compile(r'\s*\([^)]*\)\s*')

def clean_text(txt: str) -> str:
    txt = PAT.sub(' ', txt)
    txt = re.sub(r'\s+', ' ', txt).strip().lower()
    txt = re.sub(r'[^\w\s]+$', '', txt)
    return txt

# ────────── 4. Embedding Model Selector ──────────
def get_embedding_model(
    use_sbert: bool = True,
    sbert_model: str = 'FremyCompany/BioLORD-2023',

    bge_model: str = 'BAAI/bge-small-en-v1.5',
):
    if use_sbert:
        print(f"Loading SBERT model: {sbert_model}")
        return SentenceTransformer(sbert_model)
    print(f"Loading BGE model: {bge_model}")
    return TextEmbedding(model_name=bge_model)

# ────────── 5. Vectorize & Save ──────────
import re
import json
import numpy as np
from tqdm import tqdm

def vectorize_dataframe(
    df: pd.DataFrame,
    meta_out: str,
    vec_out: str,
    use_sbert: bool = True
):
    model = get_embedding_model(use_sbert)

    # ——— New: collect constant metadata once per HP term ———
    constants: dict[str, dict] = {}

    # ——— New: per-embedding metadata (minimal) ———
    entries: list[dict] = []
    embs: list[np.ndarray] = []

    # ——— Compile direction‐detection regexes once per call ———
    NEG_PATTERN = re.compile(
        r'\b(?:decreas(?:e|ed|ing)?|loss(?:es)?|hypo[-]?\w+)\b',
        re.IGNORECASE
    )
    POS_PATTERN = re.compile(
        r'\b(?:increas(?:e|ed|ing)?|gain(?:s|ed)?|hyper[-]?\w+)\b',
        re.IGNORECASE
    )

    for _, row in tqdm(df.iterrows(), total=len(df), desc="Embedding rows", unit="row"):
        info = clean_text(row.phrase)

        # ——— More robust direction detection ———
        direction = 0
        if NEG_PATTERN.search(info):
            direction = -1
        elif POS_PATTERN.search(info):
            direction = 1

        # ——— Embedding call unchanged ———
        if use_sbert:
            vec = model.encode(info, convert_to_numpy=True)
        else:
            vec = np.asarray(list(model.embed([info]))[0], dtype=np.float32)

        # ——— New: store only minimal per-info metadata ———
        entries.append({
            'hp_id':    row.hp_id,
            'info':     info,
            'direction':direction
        })

        # ——— New: record the constant fields once for each hp_id ———
        if row.hp_id not in constants:
            constants[row.hp_id] = {
                'organ_system': row.organ_system,
                'lineage':      row.lineage,
                'definition':   row.definition,
                'alt_ids':      row.alt_ids,
                'snomedct':     row.snomedct,
                'umls':         row.umls
            }

        embs.append(vec.astype(np.float16))

    # ——— Save embeddings as before ———
    emb_matrix = np.vstack(embs)

    # ——— Write out a single JSON with both parts ———
    combined = {
        'constants': constants,
        'entries':   entries
    }
    with open(meta_out, 'w') as f:
        json.dump(combined, f, separators=(',', ':'))

    np.savez_compressed(vec_out, emb=emb_matrix)
    print(f"Saved {len(entries)} embeddings → {meta_out}, {vec_out}")

# ────────── 6. Main Entrypoint ──────────
def main():
    df = build_hpo_dataframe("hp.obo")
    df.to_csv("deeprare_hpo_terms_full.csv", index=False)
    print(f"Built DataFrame with {len(df)} rows.")
    vectorize_dataframe(
        df,
        meta_out='deeprare_hpo_meta.json',
        vec_out='deeprare_hpo_embedded.npz',
        use_sbert=True
    )

def main_test(
    test_limit: int = None
):
    df = build_hpo_dataframe("hp.obo", limit=test_limit)
    out_csv = "deeprare_hpo_terms_full.csv" if test_limit is None else f"hpo_terms_test_{test_limit}.csv"
    df.to_csv(out_csv, index=False)
    print(f"Built DataFrame with {len(df)} rows → {out_csv}")
    if test_limit is None:
        vectorize_dataframe(
            df,
            meta_out='deeprare_hpo_meta.json',
            vec_out='deeprare_hpo_embedded.npz',
            use_sbert=True
        )
    else:
        print("Test run: skipping vectorization.")

if __name__ == '__main__':
    main_test()


  _hpo_ontology = pronto.Ontology(f)


Loaded ontology with 19657 terms


  ont     = pronto.Ontology(Path(obo_path))
Building HPO DataFrame: 100%|████████████████████████████████████████████████████████████████████████████████████| 19657/19657 [00:01<00:00, 10873.26term/s]


Built DataFrame with 245952 rows → deeprare_hpo_terms_full.csv
Loading SBERT model: FremyCompany/BioLORD-2023


modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/649 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/280 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Embedding rows: 100%|██████████████████████████████████████████████████████████████████████████████████████████████| 245952/245952 [51:14<00:00, 79.99row/s]


Saved 245952 embeddings → deeprare_hpo_meta.json, deeprare_hpo_embedded.npz


In [None]:
# import time
# import requests
# import random
# from pathlib import Path

# import pronto

# # ——— Your existing initialize_hpo_resources() ———
# _hpo_ontology = None
# def initialize_hpo_resources(
#     obo_url: str = "https://purl.obolibrary.org/obo/hp.obo",
#     obo_path: str = "hp.obo",
#     refresh_days: int = 14,
# ) -> pronto.Ontology:
#     global _hpo_ontology
#     if _hpo_ontology is None:
#         obo_file = Path(obo_path)
#         if (
#             not obo_file.exists()
#             or ((time.time() - obo_file.stat().st_mtime) / 86400) > refresh_days
#         ):
#             print(f"Downloading HPO ontology from {obo_url} …")
#             resp = requests.get(obo_url, timeout=30)
#             resp.raise_for_status()
#             obo_file.write_text(resp.text, encoding="utf-8")
#         with open(obo_file, 'rb') as f:
#             _hpo_ontology = pronto.Ontology(f)
#         print(f"Loaded ontology with {len(list(_hpo_ontology.terms()))} terms")
#     return _hpo_ontology

# def print_random_term():
#     ont = initialize_hpo_resources()
#     term = random.choice(list(ont.terms()))

#     print("\n=== Random HPO Term ===")
#     print(f"ID         : {term.id}")
#     print(f"Name       : {term.name}")
#     print(f"Definition : {term.definition!r}")

#     # Synonyms
#     syns = [syn.description for syn in term.synonyms]
#     print(f"Synonyms   : {syns}")

#     # Xrefs
#     xrs = [str(x) for x in term.xrefs]
#     print(f"Xrefs      : {xrs}")

#     # Try alt_ids or other_ids or whatever exists
#     for attr in ("other_ids", "alt_ids", "ids", "synonyms", "xrefs"):
#         if hasattr(term, attr):
#             val = getattr(term, attr)
#             print(f"{attr!r:12}: {val!r}")

#     # If none of those printed, dump dir()
#     known = {"id","name","definition","synonyms","xrefs","other_ids","alt_ids","ids"}
#     present = set(dir(term))
#     if not any(a in present for a in ("other_ids","alt_ids","ids")):
#         print("\n-- No alt_ids-like attribute found; available attributes: --")
#         print(sorted(present))
#     print("========================\n")

# if __name__ == "__main__":
#     print_random_term()