# Find Description

In [15]:
import requests
from urllib.parse import quote
from dataclasses import dataclass
from __future__ import annotations
from typing import Any, Dict, List, Optional, Union

@dataclass
class OrphaDisease:
    orphacode: int
    preferred_term: str
    definition: Optional[str]
    orphanet_url: Optional[str]
    synonyms: List[str]

def fetch_orphanet_short_description(
    disease_name: str,
    *,
    lang: str = "en",
    timeout_s: float = 15.0,
    session: Optional[requests.Session] = None,
):

    lang = (lang or "en").strip().lower()
    base = "https://api.orphadata.com"
    path = f"/rd-cross-referencing/orphacodes/names/{quote(disease_name.strip())}"
    url = f"{base}{path}"

    s = session or requests.Session()
    resp = s.get(url, params={"lang": lang}, timeout=timeout_s)
    if resp.status_code == 403:
        raise RuntimeError(
            "Access denied (403) from Orphadata API. "
            "If this persists, check Orphadata/Orphanet access conditions."
        )
    if resp.status_code == 404:
        return f"No Orphanet match found for: {disease_name!r}"
    resp.raise_for_status()

    payload: Dict[str, Any] = resp.json()
    results: Union[None, Dict[str, Any], List[Dict[str, Any]]] = (
        payload.get("data", {}) or {}
    ).get("results")

    # The OpenAPI schema often shows `results` as an object, but be robust anyway.
    candidates: List[Dict[str, Any]]
    if results is None:
        return f"No Orphanet match found for: {disease_name!r}"
    if isinstance(results, list):
        candidates = results
    elif isinstance(results, dict):
        candidates = [results]
    else:
        return f"Unexpected `results` type: {type(results)}"

    best = candidates[0]

    orphacode = best.get("ORPHAcode")
    preferred_term = best.get("Preferred term") or ""
    orphanet_url = best.get("OrphanetURL")
    synonyms = best.get("Synonym") or []
    if not isinstance(synonyms, list):
        synonyms = [str(synonyms)]

    # Definition is typically under SummaryInformation: [{"Definition": "..."}]
    definition: Optional[str] = None
    summary_info = best.get("SummaryInformation") or []
    if isinstance(summary_info, list):
        for item in summary_info:
            if isinstance(item, dict) and item.get("Definition"):
                definition = str(item["Definition"]).strip()
                break
    elif isinstance(summary_info, dict) and summary_info.get("Definition"):
        definition = str(summary_info["Definition"]).strip()

    if not isinstance(orphacode, int):
        # Some APIs serialize numbers as strings; try to coerce.
        try:
            orphacode = int(str(orphacode))
        except Exception as e:
            raise RuntimeError(f"Could not parse ORPHAcode: {orphacode!r}") from e

    return OrphaDisease(
        orphacode=orphacode,
        preferred_term=str(preferred_term).strip(),
        definition=definition,
        orphanet_url=str(orphanet_url).strip() if orphanet_url else None,
        synonyms=[str(x).strip() for x in synonyms if str(x).strip()],
    )

def search_by_name(disease_name: str, *, lang: str = "en") -> Optional[str]:
    d = fetch_orphanet_short_description(disease_name, lang=lang)
    if type(d) == str:
        print(d)
        return None
    results = {
        "preferred_term": d.preferred_term,
        "orphacode": d.orphacode,
        "definition": d.definition,
        "orphanet_url": d.orphanet_url,
        "synonyms": d.synonyms
    }
    return results


In [13]:
import pandas as pd
test_set = pd.read_csv("./raw_data/test.csv")
train_set = pd.read_csv("./raw_data/train.csv")
val_set = pd.read_csv("./raw_data/val.csv")

In [None]:
import json
from tqdm import tqdm

all_test_labels = test_set["final_diagnosis"].unique()
all_test_lables_with_info = {}
for label in tqdm(all_test_labels):
    info = search_by_name(label)
    if info:
        all_test_lables_with_info[label] = info
with open("./raw_data/all_test_labels_with_info.json", "w", encoding="utf-8") as f:
    json.dump(all_test_lables_with_info, f, ensure_ascii=False)

# Relevant Disease

In [5]:
from __future__ import annotations

from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional, Tuple
from collections import Counter
import requests


UTS_BASE = "https://uts-ws.nlm.nih.gov/rest"


@dataclass
class UMLSClient:
    api_key: str
    timeout: int = 30

    def _get(self, path: str, params: Optional[dict] = None) -> dict:
        if params is None:
            params = {}
        params["apiKey"] = self.api_key
        url = f"{UTS_BASE}{path}"
        r = requests.get(url, params=params, timeout=self.timeout)
        r.raise_for_status()
        return r.json()

    def search_cuis(
        self,
        term: str,
        *,
        search_type: str = "exact",
        page_size: int = 25,
        sabs: Optional[str] = None,
    ) -> List[Tuple[str, str]]:
        """
        Returns a list of (cui, name) pairs.
        Uses /search/current?string=... (default returnIdType is concept/CUI).
        """
        params = {
            "string": term,
            "pageSize": page_size,
        }
        if search_type:
            params["searchType"] = search_type
        if sabs:
            params["sabs"] = sabs

        data = self._get("/search/current", params=params)
        results = data.get("result", {}).get("results", []) or []

        cuis: List[Tuple[str, str]] = []
        for item in results:
            ui = item.get("ui")
            name = item.get("name")
            # UI for CUIs typically starts with "C"
            if isinstance(ui, str) and ui.startswith("C") and isinstance(name, str):
                cuis.append((ui, name))
        return cuis

    def concept_relations(
        self,
        cui: str,
        *,
        page_size: int = 200,
        page_number: int = 1,
        sabs: Optional[str] = None,
        include_relation_labels: Optional[Iterable[str]] = None,
        include_additional_relation_labels: Optional[Iterable[str]] = None,
        include_obsolete: bool = False,
        include_suppressible: bool = False,
    ) -> List[dict]:
        """
        Returns raw relation objects from:
          /content/current/CUI/{cui}/relations

        You can filter by:
          - sabs
          - includeRelationLabels
          - includeAdditionalRelationLabels
        """
        params = {
            "pageSize": page_size,
            "pageNumber": page_number,
            "includeObsolete": str(include_obsolete).lower(),
            "includeSuppressible": str(include_suppressible).lower(),
        }
        if sabs:
            params["sabs"] = sabs
        if include_relation_labels:
            params["includeRelationLabels"] = ",".join(include_relation_labels)
        if include_additional_relation_labels:
            params["includeAdditionalRelationLabels"] = ",".join(include_additional_relation_labels)

        data = self._get(f"/content/current/CUI/{cui}/relations", params=params)
        return data.get("result", []) or []


def find_related_diseases(
    disease_name: str,
    *,
    api_key: str,
    max_seed_cuis: int = 3,
    max_return: int = 30,
    # If you want to restrict to clinical vocabularies, set e.g. "SNOMEDCT_US,ICD10CM"
    sabs: Optional[str] = None,
    # Relation label filtering; leave None to include all.
    # Common hierarchical-ish labels include PAR/CHD; RO is "other relationship".
    relation_labels: Optional[Iterable[str]] = ("PAR", "CHD", "RO", "RB", "RN"),
    additional_relation_labels: Optional[Iterable[str]] = None,
    # Search behavior
    search_type_primary: str = "exact",
    search_type_fallback: str = "words",
) -> List[str]:
    """
    Input: disease_name (string)
    Output: ranked list of related disease/concept names (strings)

    Notes:
    - UMLS "relations" are ontology relationships (not guaranteed to be true differential diagnoses).
    - You can tune sabs/relation_labels to bias toward your desired notion of "related".
    """
    client = UMLSClient(api_key=api_key)

    # 1) Find seed CUIs
    seeds = client.search_cuis(disease_name, search_type=search_type_primary, page_size=25, sabs=sabs)
    if not seeds and search_type_fallback:
        seeds = client.search_cuis(disease_name, search_type=search_type_fallback, page_size=25, sabs=sabs)

    if not seeds:
        return []

    seeds = seeds[: max_seed_cuis]
    seed_names = {name.strip().lower() for _, name in seeds} | {disease_name.strip().lower()}

    # 2) Pull relations for each seed CUI and collect neighbor names
    counts: Counter[str] = Counter()

    for cui, cui_name in seeds:
        rels = client.concept_relations(
            cui,
            sabs=sabs,
            include_relation_labels=relation_labels,
            include_additional_relation_labels=additional_relation_labels,
            page_size=200,
        )

        for rel in rels:
            # These fields appear in sample outputs for /relations
            # (works for ConceptRelation / AtomClusterRelation / AtomRelation cases)
            a = rel.get("relatedFromIdName")
            b = rel.get("relatedIdName")
            if isinstance(a, str):
                a_norm = a.strip()
                if a_norm and a_norm.lower() not in seed_names:
                    counts[a_norm] += 1
            if isinstance(b, str):
                b_norm = b.strip()
                if b_norm and b_norm.lower() not in seed_names:
                    counts[b_norm] += 1

    if not counts:
        return []

    # 3) Rank by frequency (ties broken alphabetically)
    ranked = sorted(counts.items(), key=lambda kv: (-kv[1], kv[0].lower()))
    return [name for name, _ in ranked[:max_return]]


# ---- Example usage ----
if __name__ == "__main__":
    import os

    API_KEY = os.environ.get("UMLS_API_KEY", "2003e501-7e93-4acf-b7c2-787e3b716368")
    related = find_related_diseases(
        "Pyoderma gangrenosum",
        api_key=API_KEY,
        sabs="SNOMEDCT_US",          # optional; try None for broader coverage
        max_seed_cuis=2,
        max_return=25,
    )
    print(related)


['Skin structure', 'Bullous pyoderma gangrenosum', 'Infectious process', 'Pustular pyoderma gangrenosum', 'Superficial vegetating pyoderma gangrenosum', 'Parastomal pyoderma gangrenosum', 'Ulcerative pyoderma gangrenosum', 'Above reference range', 'Anatomical or acquired body structure', 'Bacterium', 'Chronic superficial ulcer', 'Chronic ulcer of skin', 'Hidradenitis suppurativa pyoderma gangrenosum complex', 'Infected ulcer of skin', 'Infectious agent', 'Infectious disease', 'Infiltration', 'Inflammation', 'Neutrophil count', 'Neutrophilic dermatosis', 'Neutrophilic infiltration', 'PAPASH syndrome', 'PASS syndrome', 'PsAPASH syndrome', 'Pyoderma']


# Evaluation Data

In [1]:
import pandas as pd
test_set = pd.read_csv("./raw_data/test.csv")

In [None]:
import json
all_test_label_info_path = "./raw_data/all_test_labels_with_info.json"
with open(all_test_label_info_path, "r", encoding="utf-8") as f:
    all_test_label_info = json.load(f)

In [12]:
import copy
valid_labels = {}
for label in all_test_label_info:
    if all_test_label_info[label]["definition"] != None:
        valid_labels[label] = copy.deepcopy(all_test_label_info[label])

valid_label_list = list(valid_labels.keys())

valid_evaluation_data = test_set[test_set["final_diagnosis"].isin(valid_label_list)]
print(len(valid_evaluation_data))


600


In [14]:
import copy
import random
random.seed(42)

evaluation_data = []
for i in range(len(valid_evaluation_data)):
    current_data = {}
    current_data["case_prompt"] = valid_evaluation_data.iloc[i]["case_prompt"].strip()
    current_data["reasoning"] = valid_evaluation_data.iloc[i]["diagnostic_reasoning"].strip()
    current_data["groundtruth_diagnosis"] = valid_evaluation_data.iloc[i]["final_diagnosis"].strip()
    choices = copy.deepcopy(valid_label_list)
    choices = [x for x in choices if x != current_data["groundtruth_diagnosis"]]
    current_data["candidates"] = random.sample(choices, 200)
    description = []
    for candidate in current_data["candidates"]:
        description.append(valid_labels[candidate]["definition"])
    current_data["description"] = {
        "gt_description": valid_labels[current_data["groundtruth_diagnosis"]]["definition"],
        "candidate_descriptions": description,
    }
    evaluation_data.append(current_data)


In [15]:
import json
with open("./processed_data/evaluation_data.json", "w", encoding="utf-8") as f:
    json.dump(evaluation_data, f, ensure_ascii=False)
