In [None]:
import pandas as pd

import ncit_utils

#### Load in the Thesaurus


In [None]:
# Get the latest NCIt FLAT (tsv) export
ncit = ncit_utils.load_ncit()
ncit = ncit.set_index("code")

#### Create Node mapping


In [None]:
NODES_CACHE: dict[str, "Node"] = {}


def get_row(code) -> pd.Series:
    global ncit
    return ncit.loc[code]


class Node:
    code: str
    parents: list["Node"]
    synonyms: set[str]
    pref_name: str

    def __init__(self, row: pd.Series) -> None:
        self.code = row.name
        self.parents = []
        if pd.notna(row["parents"]):
            for parent_code in row["parents"].split("|"):
                if parent_code not in NODES_CACHE:
                    NODES_CACHE[parent_code] = Node(get_row(parent_code))
                self.parents.append(NODES_CACHE[parent_code])
        self.synonyms = set()
        for synonym in row["synonyms"].split("|"):
            self.synonyms.add(synonym)
        self.pref_name = row["display name"]

    def __str__(self) -> str:
        return f"<{self.code} <= {','.join(p.code for p in self.parents)}>"

    def __repr__(self) -> str:
        return f"<{self.code} <= {','.join(p.code for p in self.parents)}>"

In [None]:
for r_parent_code, row in ncit.iterrows():
    node = Node(row)
    if r_parent_code not in NODES_CACHE:
        NODES_CACHE[r_parent_code] = node
print(len(NODES_CACHE))
# Test a random code
print(NODES_CACHE["C142799"])

#### Traverse the Node mapping for a certain concept and children


In [None]:
from collections import deque


def get_children_of_code(nodes: dict[str, Node], parent_code: str):
    children = set()
    for c, n in nodes.items():
        if parent_code in [p.code for p in n.parents]:
            children.add(c)
    return children


def bfs(nodes: dict[str, Node], start_code: str):
    visited = []  # Keep track of visited nodes.
    queue = deque([start_code])  # Queue initialized with the start node code.
    tracking_q = deque([(start_code, None)])

    while queue:
        current_code = queue.popleft()  # Dequeue a node code.
        if current_code not in visited:
            visited.append(current_code)

            # Add all unvisited children to the queue.
            if current_code == "C2991":
                node_children = ["C3262"]
            else:
                node_children = get_children_of_code(nodes, current_code)
            for child_code in node_children:
                if child_code not in visited:
                    queue.append(child_code)
                if (child_code, current_code) not in tracking_q:
                    tracking_q.append((child_code, current_code))

    return visited, tracking_q

In [None]:
root_concept = "C2991"  # disease, disorder, finding
# root_concept = "C3262"  # neoplasm
children, children_w_parents = bfs(NODES_CACHE, root_concept)

In [None]:
# Check that the set of child2parents contains all unique pairs
combos = set()
for tup in children_w_parents:
    combos.add(tup)
assert len(combos) == len(children_w_parents)

#### Construct the output


In [None]:
data = []
for idx, (child, parent) in enumerate(children_w_parents):
    if not parent:
        assert child == root_concept
        continue
    child, parent = NODES_CACHE[child], NODES_CACHE.get(parent, None)
    for syn in child.synonyms:
        data.append(
            (
                idx + 1,
                syn,
                child.pref_name,
                child.code,
                parent.pref_name if parent else None,
                parent.code if parent else None,
            )
        )
output = pd.DataFrame(
    data,
    columns=[
        "Level",
        "Term",
        "Preferred Term",
        "Code",
        "Parent Term",
        "Parent Term Code",
    ],
    dtype=str,
)
output.to_csv("disease_codes.csv", index=False)
output

#### Call EVS API to get preferred terms


In [None]:
disease_terms_df = ncit_utils.EVSConceptsApi.load_terms_w_synonyms(
    children, "ncit_output/preferred_terms_diseases.csv"
)
disease_terms_df = disease_terms_df.dropna(subset=['source'])

#### Apply EVS terms to output

In [None]:
from collections import defaultdict


syn_to_sources = defaultdict(set)

for _, row in disease_terms_df.iterrows():
    r_code = row["code"]
    syn = row["synonym"]
    source = row["source"]
    syn_to_sources[(r_code, syn)].add(source)


def get_sources(row: pd.Series):
    as_list = list(syn_to_sources[(row["Code"], row["Term"])])
    as_list.sort()
    return ",".join(as_list)


output["Sources"] = output[["Term", "Code"]].apply(get_sources, axis=1)
assert not output["Sources"].isna().any()
display(output.head())

In [None]:
# Use existing pref term if provided by NCIt
# Else use the EVS preferred name
code_to_term_map = {}


def code_to_term(code: str):
    if code not in code_to_term_map:
        term = disease_terms_df.loc[disease_terms_df["code"] == code, "name"].iloc[0]
        code_to_term_map[code] = term
    return code_to_term_map[code]


output["Preferred Term"] = output.apply(
    lambda row: row["Preferred Term"]
    if not pd.isna(row["Preferred Term"])
    else code_to_term(row["Code"]),
    axis=1,
)
print("Done with Preferred Term.")
output["Parent Term"] = output.apply(
    lambda row: row["Parent Term"]
    if not pd.isna(row["Parent Term"])
    else code_to_term(row["Parent Term Code"]),
    axis=1,
)
assert output["Preferred Term"].hasnans is False
assert output["Parent Term"].hasnans is False

In [None]:
assert not output.duplicated().any()

#### Save the output

In [None]:
term2code = output.loc[:, ["Term", "Code"]].drop_duplicates()
term2code = term2code.sort_values(by=["Term"])
term2code.to_csv("disease_syn_2_code.tsv", sep="\t", index=False, encoding="utf-8")

code2pref_term = output.loc[:, ["Code", "Preferred Term"]].drop_duplicates()
code2pref_term = code2pref_term.sort_values(by=["Code"])
code2pref_term.to_csv(
    "disease_code_2_pref_term.tsv", sep="\t", index=False, encoding="utf-8"
)

output.to_csv("disease_ncit_concepts.tsv", sep="\t", index=False, encoding="utf-8")

#### Check the output

In [None]:
parents_checked = set()


def check_output(row: pd.Series):
    r_code = row["Code"]
    r_parent_code = row["Parent Term Code"]
    if r_parent_code in parents_checked:
        return

    parents_checked.add(r_parent_code)

    if r_parent_code != "C2991":
        assert (
            pd.Series(r_parent_code).isin(output["Code"]).any()
        ), f"Failed to find {r_parent_code} in output Codes"

    assert r_code in children, "Every code should have been visited"

    parent_terms: pd.Series = output["Parent Term"].loc[
        output["Parent Term Code"] == r_parent_code
    ]
    assert (
        len(parent_terms.unique()) == 1
    ), f"Parent Code {r_parent_code} should have the same Parent Term {parent_terms}"

    terms: pd.Series = output["Preferred Term"].loc[output["Code"] == r_code]
    assert (
        len(terms.unique()) == 1
    ), f"Code {r_code} should have the same Preferred Term {terms}"


_ = output.apply(check_output, axis=1)