In [1]:
list_efo = []
with open("/home/mouren/Data/variants/gwas/ld_pipe/gwas_catalog_v1.0.2-associations_e113_r2025-02-18.tsv") as f:
    first = True
    for line in f:
        if first:
            first = False
            continue
        val = line.strip().split("\t")[35].split("/")[-1]
        if val not in list_efo:
            list_efo.append(val)

In [2]:
### Function for unique ancestor 
import requests
from collections import deque
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import lru_cache

##############################################################################
# 1) Utility: get direct parent IDs (for ANY prefix) from OLS
##############################################################################
dic_onto = {"EFO":"efo","MONDO":"mondo","HP":"hp","OBA":"oba","GO":"go","Orphanet":"ordo","HANCESTRO":"hancestro",
            "NCIT":"ncit","PATO":"pato","MP":"mp","BFO":"bfo","OBI":"obi","OGMS":"ogms","ORDO":"ordo","IAO":"iao",
            "CHEBI":"chebi", "DOID":"doid", "CL":"cl","UBERON":"uberon"}

@lru_cache(None)
def get_parent_ids(obo_id):
    id_url = obo_id.replace(":", "_")
    onto = dic_onto[id_url.split("_")[0]]
    
    terms_url = f"https://www.ebi.ac.uk/ols/api/ontologies/{onto}/terms?iri={id_url}"

    try:
        resp = requests.get(terms_url)
        resp.raise_for_status()
        data = resp.json()
        terms = data.get("_embedded", {}).get("terms", [])
        if not terms:
            return []

        # First term object
        t0 = terms[0]
        parents_link = t0["_links"].get("parents", {}).get("href", None)
        if not parents_link:
            return []

        # Now fetch parents
        p_resp = requests.get(parents_link)
        p_resp.raise_for_status()
        p_data = p_resp.json()

        parent_terms = p_data.get("_embedded", {}).get("terms", [])
        parent_ids = []
        for pterm in parent_terms:
            pid = pterm.get("obo_id")
            if not pid:
                ann = pterm.get("annotation", {})
                has_obo = ann.get("has_obo_id", [])
                if has_obo:
                    pid = has_obo[0]
            if pid:
                parent_ids.append(pid)

        return parent_ids

    except Exception as ex:
        print(f"[ERROR] get_parent_ids({obo_id}): {ex}")
        return []


##############################################################################
# 2) Filtering and BFS logic for EFO-only
##############################################################################

def get_efo_parents(obo_id):
    """
    Return ONLY the EFO parents of a term (by prefix check).
    """
    all_parents = get_parent_ids(obo_id)
    efo_parents = [p for p in all_parents if p.startswith("EFO:")]
    return efo_parents

##############################################################################
# 3) Check if a term has "disease" (EFO:0000408) or "disease staging" (EFO:0000410)
#    in its EFO ancestry. If so, return whichever one we find.
##############################################################################
def find_special_disease_ancestor(obo_id, targets=None):
    """
    BFS up the EFO hierarchy to see if we eventually find any ID in `targets`.
    By default, targets = {"EFO:0000408", "EFO:0000410"}.
    
    If found, returns that ID; else returns None.
    """
    if targets is None:
        targets = {"EFO:0000408", "EFO:0000410","MONDO:0000001","OGMS:0000031","MONDO:0042489","GO:0008150","EFO:0001444","EFO:0000651"} #all disease related except avant dernier which is biological process and last one which is measurement last last one is phenotype
    
    visited = set()
    queue = deque([obo_id])
    
    while queue:
        current = queue.popleft()
        if current in visited:
            continue
        visited.add(current)
        
        # If this node itself is a target
        if current in targets:
            return current
        
        # Otherwise fetch its EFO parents and keep going up
        #parents = get_efo_parents(current)
        parents = get_parent_ids(current)
        for p in parents:
            if p not in visited:
                queue.append(p)
    
    return None

### 4) find single efo parent with bfs fallback
def find_unique_efo_parent(obo_id):
    """
    Implements the new rules:
      1) If no parents => None
      2) If exactly 1 parent => return that
      3) If multiple parents => keep only EFO parents
         a) if 0 => return None
         b) if 1 => return that one
         c) if multiple => attempt BFS up the EFO tree to find a unique common ancestor
    """
    # 0) Check for "disease" or "disease staging" ancestor
    special_ancestor = find_special_disease_ancestor(obo_id)
    if special_ancestor is not None:
        return special_ancestor
        
    all_parents = get_parent_ids(obo_id)

    # 1) no parents
    if len(all_parents) == 0:
        return None

    # 2) exactly 1 parent
    if len(all_parents) == 1:
        return all_parents[0]

    # 3) multiple parents => keep only EFO
    efo_only = [p for p in all_parents if p.startswith("EFO:")]
    if len(efo_only) == 0:
        return None

    if len(efo_only) == 1:
        return efo_only[0]

    # If we still have multiple EFO parents, do BFS up the EFO tree
    return find_unique_efo_common_ancestor(efo_only)


def find_unique_efo_common_ancestor(efo_parents):
    """
    Given multiple EFO parents, climb upward *only among EFO parents* to see if they converge
    to exactly 1 common EFO ancestor. If found, return that ID; else None.
    This logic is similar to the BFS intersection approach from before, but we only fetch EFO parents.

    efo_parents: list of strings like ["EFO:0001645", "EFO:0001360", ...]
    """
    # Initialize BFS states
    parent_states = []
    for pid in efo_parents:
        parent_states.append({
            "current": {pid},
            "visited": set([pid])
        })

    while True:
        # Intersection among all current sets
        all_current_sets = [st["current"] for st in parent_states]
        if not all_current_sets:
            return None

        common_now = set.intersection(*all_current_sets)

        # If exactly 1 => unique common parent found
        if len(common_now) == 1:
            return list(common_now)[0]

        # If any frontier is empty => cannot unify => None
        if any(len(st["current"]) == 0 for st in parent_states):
            return None

        # Expand BFS upward (EFO parents only)
        for st in parent_states:
            next_level = set()
            for node in st["current"]:
                # get EFO parents
                efo_up = get_efo_parents(node)
                for p in efo_up:
                    if p not in st["visited"]:
                        st["visited"].add(p)
                        next_level.add(p)
            st["current"] = next_level


##############################################################################
# 3) Threaded processing
##############################################################################

def process_ids_in_parallel(obo_ids, max_workers=18):
    """
    For each ID in obo_ids, apply find_unique_efo_parent in parallel.
    Returns a dict { obo_id: parent_or_none }.
    """
    results = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_map = {executor.submit(find_unique_efo_parent, oid): oid for oid in obo_ids}
        for future in as_completed(future_map):
            oid = future_map[future]
            try:
                res = future.result()
                results[oid] = res
            except Exception as ex:
                print(f"[ERROR] {oid}: {ex}")
                results[oid] = None
    return results



In [3]:
import requests
import urllib.parse

def get_label_from_ols(term_id):
    try:
        id_url = term_id.replace(":", "_")
    except AttributeError:
        return None
    onto = dic_onto[id_url.split("_")[0]]
    
    url = f"https://www.ebi.ac.uk/ols/api/ontologies/{onto}/terms?iri={id_url}"

    r = requests.get(url)
    r.raise_for_status()  # Raise an error if the request failed
    
    response_data = r.json()
    
    # Results (terms) are in the JSON path: _embedded -> terms -> [0] -> label
    # Make sure we have at least one term
    if "_embedded" in response_data and "terms" in response_data["_embedded"]:
        terms = response_data["_embedded"]["terms"]
        if terms:
            return terms[0].get("label")
   
    return None

In [4]:
out_map = process_ids_in_parallel(list_efo, max_workers=18) 

targets = {"EFO:0000408", "EFO:0000410","MONDO:0000001","OGMS:0000031","MONDO:0042489"}
dic_res = {}
for k, v in out_map.items():
    if v in targets:
        dic_res[k] = "Disease"
    elif v == "EFO:0001444":
        dic_res[k] = "Measurement"
    elif v == "GO:0008150":
        dic_res[k] = "Biological process"
    elif v == "EFO:0000651":
        dic_res[k] = "Phenotype"
    else:
        dic_res[k] = get_label_from_ols(v)

with open("/home/mouren/Data/variants/gwas/ld_pipe/dic_traits_to_parents_gwas.tsv", 'w') as file:
    for key, value in dic_res.items():
        file.write(f'{key}\t{value}\n')

[ERROR] : ''
[ERROR] get_parent_ids(OBA_2042058): HTTPSConnectionPool(host='www.ebi.ac.uk', port=443): Max retries exceeded with url: /ols4/api/ontologies/oba/terms/http%253A%252F%252Fpurl.obolibrary.org%252Fobo%252FOBA_2042058/parents (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7f3c445c37c0>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))
[ERROR] get_parent_ids(OBA_2041632): HTTPSConnectionPool(host='www.ebi.ac.uk', port=443): Max retries exceeded with url: /ols4/api/ontologies/oba/terms/http%253A%252F%252Fpurl.obolibrary.org%252Fobo%252FOBA_2041632/parents (Caused by NewConnectionError('<urllib3.connection.VerifiedHTTPSConnection object at 0x7f3c445ff1f0>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution'))
[ERROR] get_parent_ids(OBA_2041631): HTTPSConnectionPool(host='www.ebi.ac.uk', port=443): Max retries exceeded with url: /ols/api/ontologies/oba/terms?iri=OBA_