# Individual Modules and Mock

In [5]:
import os
import json
from lxml import etree

def parse_newstyle_orphanet(xml_path):
    """
    Parses an Orphanet XML with structure like:
    <JDBOR>
      <HPODisorderSetStatusList>
        <HPODisorderSetStatus>
          <Disorder>
            <OrphaCode>...</OrphaCode>
            <Name>...</Name>
            <HPODisorderAssociationList>...</HPODisorderAssociationList>
          </Disorder>
        </HPODisorderSetStatus>
        ...
      </HPODisorderSetStatusList>
    </JDBOR>
    
    Returns a dict:
    {
      "ORPHA:58 | Alexander disease": {
        "hpo_terms": [...],
        "frequencies": { "HP:0000256": "Very frequent (99-80%)", ... }
      },
      ...
    }
    """
    tree = etree.parse(xml_path)
    root = tree.getroot()

    disease_dict = {}

    # Grab all <Disorder> elements from any depth
    # e.g. //Disorder means "all Disorder tags at any level"
    disorders = root.xpath("//Disorder")

    for disorder in disorders:
        # 1) OrphaCode
        orpha_code_el = disorder.find("OrphaCode")
        if orpha_code_el is not None:
            orpha_code = orpha_code_el.text
        else:
            continue  # skip if we can't find OrphaCode

        # 2) Disease name
        name_el = disorder.find("Name")
        if name_el is not None:
            disease_name = name_el.text
        else:
            disease_name = f"UnknownOrpha_{orpha_code}"

        disease_key = f"ORPHA:{orpha_code} | {disease_name}"

        # 3) HPO associations
        hpo_terms = []
        freq_map = {}

        assoc_list_el = disorder.find("HPODisorderAssociationList")
        if assoc_list_el is not None:
            associations = assoc_list_el.findall("HPODisorderAssociation")
            for assoc in associations:
                hpo_el = assoc.find("HPO")
                if hpo_el is not None:
                    hpo_id_el = hpo_el.find("HPOId")
                    if hpo_id_el is not None:
                        hpo_id = hpo_id_el.text
                    else:
                        hpo_id = None

                    # frequency
                    freq_el = assoc.find("HPOFrequency")
                    if freq_el is not None:
                        freq_name_el = freq_el.find("Name")
                        if freq_name_el is not None:
                            freq_name = freq_name_el.text
                        else:
                            freq_name = None
                    else:
                        freq_name = None

                    if hpo_id:
                        hpo_terms.append(hpo_id)
                        if freq_name:
                            freq_map[hpo_id] = freq_name

        # remove duplicates
        hpo_terms = list(set(hpo_terms))

        disease_dict[disease_key] = {
            "hpo_terms": hpo_terms,
            "frequencies": freq_map
        }

    return disease_dict


def load_orphanet_data(json_path, xml_path):
    """
    If json_path exists, load from it.
    Otherwise parse the XML, save the result to json_path, then return it.
    """
    if os.path.exists(json_path):
        # load from JSON
        with open(json_path, "r") as f:
            data = json.load(f)
        return data
    else:
        # parse from XML
        data = parse_newstyle_orphanet(xml_path)
        with open(json_path, "w") as f:
            json.dump(data, f, indent=2)
        return data


# Test function call (for a .ipynb cell)
def test_orphanet_parser():
    xml_file = "data/en_product6.xml"        # replace with your file
    json_file = "data/converted_orphanet.json"   # output
    data = load_orphanet_data(json_file, xml_file)
    print(f"Parsed {len(data)} diseases.")
    # Print first 3 entries
    keys = list(data.keys())[:3]
    for k in keys:
        print(k, data[k])

# Usage:
# test_orphanet_parser()


In [None]:
test_orphanet_parser()

In [7]:
import os

def load_hpo_terms(file_path):
    """
    Reads a file of HPO terms in the format: HP:NNNNNNN <tab> PhenotypeName
    Returns a dict: { "phenotypename".lower(): "HP:NNNNNNN" }
    """
    hpo_dict = {}
    with open(file_path, "r") as f:
        for line in f:
            parts = line.strip().split("\t")
            if len(parts) == 2:
                hpo_id, phenotype_name = parts
                hpo_dict[phenotype_name.lower()] = hpo_id
    return hpo_dict

def load_synonyms(file_path):
    """
    Reads synonyms in the format: Synonym <tab> HP:NNNNNNN
    Returns a dict: { "synonym".lower(): "HP:NNNNNNN" }
    """
    synonym_dict = {}
    with open(file_path, "r") as f:
        for line in f:
            parts = line.strip().split("\t")
            if len(parts) == 2:
                synonym, hpo_id = parts
                synonym_dict[synonym.lower()] = hpo_id
    return synonym_dict

def extract_hpo_terms_from_text(text, hpo_dict, synonym_dict):
    """
    Given free-text clinical description, find matching HPO IDs by substring checks.
    Returns a list of (hpo_id, matched_string).
    """
    text = text.lower()
    matched_terms = []

    # Direct phenotype name matches
    for phenotype_name, hpo_id in hpo_dict.items():
        if phenotype_name in text:
            matched_terms.append((hpo_id, phenotype_name))

    # Synonym matches
    for synonym, hpo_id in synonym_dict.items():
        if synonym in text:
            matched_terms.append((hpo_id, synonym))

    return matched_terms

def run_custom_extractor(text, terms_file="data/hpo_term_names.txt", synonyms_file="data/hpo_synonyms.txt"):
    """
    High-level function: 
      1) Load HPO terms & synonyms
      2) Extract matches from 'text'
      3) Return unique list of HPO IDs
    """
    hpo_dict = load_hpo_terms(terms_file)
    synonym_dict = load_synonyms(synonyms_file)
    matches = extract_hpo_terms_from_text(text, hpo_dict, synonym_dict)
    unique_ids = list({m[0] for m in matches})
    return unique_ids

# Test function call
def test_custom_extractor():
    sample_text = "Patient has severe headache and occasional macrocephaly"
    # Adjust the file paths as needed
    hpo_ids = run_custom_extractor(sample_text)
    print("Text:", sample_text)
    print("Extracted HPO IDs:", hpo_ids)

# Usage in .ipynb:
# test_custom_extractor()

In [8]:
test_custom_extractor()

Text: Patient has severe headache and occasional macrocephaly
Extracted HPO IDs: ['HP:0002315', 'HP:0040283', 'HP:0000256', 'HP:0012828']


In [37]:
def parse_obo_to_dag(obo_path, dag_path):
    """
    Converts a basic .obo file into a DAG file of child->parent edges
    based on 'is_a:' lines.
    
    :param obo_path: Path to the input .obo file (e.g., 'hp.obo')
    :param dag_path: Path to the output DAG file (e.g., 'hp_dag.txt')
    
    The DAG file will contain lines like:
       HP:0000002 HP:0001507
       HP:0000003 HP:0000107
       ...
    
    where each line is (child_id parent_id).
    """
    edges = []
    current_id = None  # Will hold the ID of the term we're currently parsing

    with open(obo_path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()

            # Start of a new [Term] means we reset current_id
            if line.startswith("[Term]"):
                current_id = None
                continue

            # If we see "id: HP:xxx", store that as the current term ID
            # e.g. "id: HP:0000003"
            if line.startswith("id: "):
                # Extract everything after "id: "
                current_id = line.split("id: ")[1].strip()
                continue

            # If we see "is_a: HP:xxx ! comment"
            # e.g. "is_a: HP:0000107 ! Renal cyst"
            if line.startswith("is_a: "):
                # Something like "HP:0000107 ! Renal cyst"
                # We'll split by spaces and take the first chunk as the parent ID
                # line would look like: "is_a: HP:0000107 ! Renal cyst"
                parent_part = line.replace("is_a: ", "")  # "HP:0000107 ! Renal cyst"
                parent_part = parent_part.split(" ")[0]  # "HP:0000107"
                parent_id = parent_part.strip()

                # Only store edge if we have a current_id
                if current_id and parent_id:
                    edges.append((current_id, parent_id))

    # Now write out edges to the DAG file
    with open(dag_path, "w", encoding="utf-8") as out:
        for (child_id, parent_id) in edges:
            out.write(f"{child_id} {parent_id}\n")

    print(f"Parsed {len(edges)} edges from {obo_path} and wrote them to {dag_path}")


def test_obo_to_dag():
    """
    Simple test function that:
      1. Reads 'hp.obo'
      2. Writes 'hp_dag.txt'
    Adjust paths as needed.
    """
    obo_file = "data/hp.obo"       # Path to your .obo file
    dag_file = "data/hp_dag.txt"   # Output DAG file

    parse_obo_to_dag(obo_file, dag_file)


if __name__ == "__main__":
    test_obo_to_dag()


Parsed 17755 edges from data/hp.obo and wrote them to data/hp_dag.txt
