# UMLS Mapping from survey to HPO 

In [None]:
import pandas as pd
# Your CSV file path
csv_file_path = "test.csv"
# Read CSV, skip the first row, and select only the desired columns
df = pd.read_csv(csv_file_path, skiprows=1, usecols=['question', 'question_HPO_code', 'question_HPO_label'])
# Prepare empty columns for results
df['hpo_codes'] = None
df['confidence'] = None
print(df.head())

In [None]:
# Use a data to test before officially starting the running
from src.graph.builder import umls_mapping_graph
result = umls_mapping_graph.invoke({'text': 'Atlanto-axial instability (neck spine is weak or unstable)'})

print(result)

In [3]:
# import necessary lab
from src.graph.builder import umls_mapping_graph
import requests
import pandas as pd
import logging
from src.config.agents import AGENT_LLM_MAP
from src.prompts.template import apply_prompt_template


API_BASE_URL = "http://52.43.228.165:8000"

In [None]:
# Running the code
for idx, row in df.iterrows():
    question_text = row['question']

    try:
        result = umls_mapping_graph.invoke({
            "text": question_text,
            "retries": 0  
        })

        validated_mappings = result.get("validated_mappings", [])

        # set none, these three output is our goals
        best_match_code = ""
        best_match_term = ""
        confidence = ""

        if validated_mappings and isinstance(validated_mappings, list) and validated_mappings[0]:
            best_mapping = validated_mappings[0]
            best_match_code = best_mapping.get("best_match_code", "")
            best_match_term = best_mapping.get("best_match_term", "")
            confidence = best_mapping.get("confidence", "")

        df.at[idx, 'best_match_code'] = best_match_code
        df.at[idx, 'best_match_term'] = best_match_term
        df.at[idx, 'confidence'] = confidence

    except Exception as e:
        # Defensive skipping on error to prevent the entire batch from being interrupted
        print(f"❌ Error processing row {idx + 1}: {e}")
        df.at[idx, 'best_match_code'] = ""
        df.at[idx, 'best_match_term'] = ""
        df.at[idx, 'confidence'] = ""

    print(f"✅ Processed row {idx + 1}")



In [5]:
# Output the result and check it 
df.to_csv("output_file.csv", index=False)

# Get more data for model performance evaluation

1.Get CUI code data through ‘term’

In [17]:
# Using extracted terms to search CUI
def search_cui(term):
    """ Search for CUIs using the API. """
    response = requests.get(f"{API_BASE_URL}/cuis", params={"query": term})
    return response.json()

# Defined CUI extraction function and extracts the first CUI returned
def extract_first_cui_from_term(term):
    """
    Given a term, query API and extract first CUI.
    If term is None or empty, return None directly.
    """
    if pd.isna(term) or str(term).strip() == "":
        return None
    
    try:
        response = search_cui(term)
        cuis = response.get('cuis', [])
        if cuis:
            return cuis[0]['cui']
    except Exception as e:
        print(f"Error extracting CUI for term '{term}': {e}")
    return None

In [None]:
df['True CUI'] = None
df['Predicted CUI'] = None
# Run the code
for idx, row in df.iterrows():
    hpo_label = row['question_HPO_label']
    hpo_code = row['question_HPO_code']
    best_match_term = row['best_match_term']
    best_match_code = row['best_match_code']
    
    # Get True CUI 
    true_cui = extract_first_cui_from_term(hpo_label)
    df.at[idx, 'True CUI'] = true_cui

    # Get Predicted_CUI 
    if hpo_code == best_match_code:
        # If hpo_code = predicted hpo_code，then True_CUI = Predicted CUI
        df.at[idx, 'Predicted CUI'] = true_cui
    else:
        pred_cui = extract_first_cui_from_term(best_match_term)
        df.at[idx, 'Predicted CUI'] = pred_cui

# Check the results
print(df[['question_HPO_label', 'True CUI', 'Predicted CUI']].head())

2. Get wu_palmer_similarity data

In [None]:
logging.basicConfig(level=logging.INFO)

def is_null(value):
    return value is None or pd.isna(value) or str(value).strip() == ''

def compute_wu_palmer_similarity(actual_output, expected_output):
    """Computes Wu-Palmer similarity between predicted and true CUI."""
    if is_null(actual_output) or is_null(expected_output):
        return None
    
    if actual_output == expected_output:
        return 1.0

    try:
        response = requests.get(f"{API_BASE_URL}/cuis/{actual_output}/{expected_output}/similarity/wu-palmer")
        logging.info(f"Wu-Palmer API response: {response.status_code} - {response.text}")

        if response.status_code == 200:
            data = response.json()
            similarity = round(data.get("similarity", 0), 4)
            return similarity
        else:
            logging.error(f"API returned non-200: {response.status_code}")
            return 0

    except requests.Timeout:
        logging.error("Timeout error while calling Wu-Palmer API")
        return 0

    except Exception as e:
        logging.error(f"Error computing Wu-Palmer similarity: {e}")
        return 0

# 先确认列名：
print(df.columns.tolist())

# 批量计算 Wu-Palmer 相似度
df['Wu-Palmer Similarity'] = df.apply(
    lambda row: compute_wu_palmer_similarity(row['True CUI'], row['Predicted CUI']),
    axis=1
)

3. Get the LLM score

In [9]:
# Define the calculate method
def rank_evaluate_with_llm(predicted_label, true_label, predicted_code, true_code, agent_llm_map):
    if is_null(predicted_code) or is_null(true_code):
        return None

    try:
        llm = agent_llm_map["rank_evaluate_with_llm"]
        prompt = apply_prompt_template(
    "rank_evaluate_with_llm", 
    {
        "predicted_label": predicted_label,
        "true_label": true_label,
        "predicted_code": predicted_code,
        "true_code": true_code
    }
)
        response = llm.invoke(prompt)

        if hasattr(response, "choices"):
            score_text = response.choices[0].message.content.strip()
        elif hasattr(response, "content"):
            score_text = response.content.strip()
        else:
            logging.error("LLM response structure not recognized.")
            return None

        try:
            score = float(score_text)
            if score < 1 or score > 10:
                logging.warning(f"LLM returned out-of-range score: {score_text}")
                return None
            return score

        except ValueError:
            logging.error(f"LLM returned invalid score: {score_text}")
            return None

    except Exception as e:
        logging.error(f"Error calling LLM agent for ranking evaluation: {e}")
        return None

In [10]:
from functools import partial

evaluate_func = partial(rank_evaluate_with_llm, agent_llm_map=AGENT_LLM_MAP)

# Filter out rows without none values
filtered_df = df.dropna(subset=['Predicted CUI', 'True CUI'])

# Apply the function
df['LLM Score'] = filtered_df.apply(
    lambda row: evaluate_func(
        row['best_match_term'], 
        row['question_HPO_label'], 
        row['Predicted CUI'], 
        row['True CUI']
    ),
    axis=1
)

4. Get specificity by using LLM

In [12]:
# Define the evaluate_specificity_with_llm function
def evaluate_specificity_with_llm(predicted_label, true_label, predicted_code, true_code, agent_llm_map):
    if is_null(predicted_code) or is_null(true_code):
        return None

    print("agent_llm_map keys: ", agent_llm_map.keys())

    try:
        llm = agent_llm_map["evaluate_specificity_with_llm"]

        prompt = apply_prompt_template(
            "evaluate_specificity_with_llm",
            {
                "predicted_label": predicted_label,
                "true_label": true_label,
                "predicted_code": predicted_code,
                "true_code": true_code
            }
        )

        response = llm.invoke(prompt)

        if hasattr(response, "choices"):
            specificity_text = response.choices[0].message.content.strip().lower().strip('"').strip("'")
        elif hasattr(response, "content"):
            specificity_text = response.content.strip().lower().strip('"').strip("'")
        else:
            logging.error("LLM response structure not recognized.")
            return None

        valid_outputs = ["exact match", "too specific", "too general", "related but not a match", "incorrect"]
        if specificity_text not in valid_outputs:
            logging.warning(f"LLM returned unexpected category: {specificity_text}")
            return "unknown"

        return specificity_text


    except Exception as e:
        logging.error(f"Error calling LLM agent for specificity evaluation: {e}")
        return None


In [None]:

specificity_func = partial(evaluate_specificity_with_llm,agent_llm_map=AGENT_LLM_MAP)
# Filter out rows without none values
filtered_df = df.dropna(subset=['Predicted CUI', 'True CUI'])

df['Specificity Category'] = None  
# Apply the function
df.loc[filtered_df.index, 'Specificity Category'] = filtered_df.apply(
    lambda row: specificity_func(
        row['best_match_term'],
        row['question_HPO_label'],
        row['Predicted CUI'],
        row['True CUI']
    ),
    axis=1
)

Output the results

In [16]:
# Output the results
df.to_csv("output_file_cui_final.csv", index=False)