In [1]:
import csv
from collections import Counter

file_path = "KG.txt"

with open(file_path, encoding="utf-8") as f:
    reader = csv.DictReader(f)
    props = [row["property"].strip() for row in reader]

counter = Counter(props)

for prop, n in counter.most_common():
    print(f"{prop}: {n}")


用药目的: 116071
适应证: 116071
相关检查: 88963
临床表现: 54136
科室: 47902
禁忌证: 36654
治疗方式: 29134
症状: 10933
禁忌人群: 9817
等同于: 7332
体征: 3287
适用人群: 3149
慎用: 2445
常见症状: 1906
慎用人群: 956
诊断依据: 7
住院/门诊期间必需检查: 1
选择用药: 1


In [1]:
ALLOWED_TRANS = {
    "用药目的": {"适应证"},          # 药物 → 疾病/症状
    "适应证":   {"临床表现", "症状", "体征", "相关检查", "治疗方式", "禁忌证"},
    "临床表现": {"相关检查", "科室"},
    "症状":     {"相关检查", "科室"},
    "体征":     {"相关检查", "科室"},
    "相关检查": {"科室"},
    "治疗方式": {"相关检查", "科室"},
    "禁忌证":   {"慎用人群", "禁忌人群"},   # 可选分支
    "慎用":     {"慎用人群"},
}
TERMINAL_RELS = {"科室"}             # 到科室就收尾


In [2]:
# 允许的 “当前关系 → 下一跳可选关系” 映射
import os, json, random, pandas as pd, networkx as nx
from openai import OpenAI

# # ---------- 0. OpenAI 配置 ----------
# os.environ["OPENAI_API_KEY"] = "sk-OlimLcefr3MBSt08IrcZ9LrhP94qqni4w3u4qkOPFtAULcDD"
# os.environ["OPENAI_BASE_URL"] = "https://api.chatanywhere.tech"
# client = OpenAI()

# ---------- 1. 读取 KG ----------
cols = ["entityId","entity","entityType","property",
        "valueId","value","valueType","group","source"]

df = pd.read_csv("KG.txt", sep=",", header=0, names=cols,
                 dtype=str, encoding="utf-8")
df.columns = df.columns.str.strip()
df = df.applymap(lambda x: x.strip() if isinstance(x,str) else x)

# ---------- 2. 构图 ----------
# ---------- 2. 构图（推荐明确节点实体信息）----------
G = nx.MultiDiGraph()
for e_id, e, e_type, prop, v_id, v, v_type, grp, src in df.itertuples(False):
    G.add_node(e, type=e_type)
    G.add_node(v, type=v_type)
    G.add_edge(e, v,
               predicate=prop, head=e, head_type=e_type,
               tail=v, tail_type=v_type, group=grp, source=src)

print(f"节点数: {G.number_of_nodes():,} 边数: {G.number_of_edges():,}")

# 采样逻辑: 直接用节点的实体值（name）进行判断

def sample_entity_path(min_hops=3, max_hops=6, retry=10_000):
    all_edges = [
        (u, v, d)
        for u, v, _, d in G.edges(keys=True, data=True)
        if d["predicate"] in ALLOWED_TRANS
    ]

    for _ in range(retry):
        edge = random.choice(all_edges)
        path = [edge]
        used_preds = {edge[2]["predicate"]}
        visited_nodes = {edge[0], edge[1]}

        while len(path) < max_hops:
            if path[-1][2]["predicate"] in TERMINAL_RELS:
                break

            tail_node = path[-1][1]  # 使用实体名
            curr_pred = path[-1][2]["predicate"]
            next_rel_set = ALLOWED_TRANS.get(curr_pred, set())

            if not next_rel_set:
                break

            candidates = [
                (tail_node, v2, d2)
                for _, v2, _, d2 in G.out_edges(tail_node, keys=True, data=True)
                if d2["predicate"] in next_rel_set
                and d2["predicate"] not in used_preds
                and v2 not in visited_nodes  # 避免实体重复访问
            ]
            if not candidates:
                break

            nxt = random.choice(candidates)
            path.append(nxt)
            used_preds.add(nxt[2]["predicate"])
            visited_nodes.add(nxt[1])

            if len(path) >= min_hops and random.random() < 0.3:
                break

        if len(path) >= min_hops:
            return path

    return None 

# 显示采样结果（保持原有的方式即可）
def triples_to_str_entity(path):
    return "\n".join(f"{d['head']} --{d['predicate']}→ {d['tail']}" 
                     for _, _, d in path)



  df = df.applymap(lambda x: x.strip() if isinstance(x,str) else x)


节点数: 151,026 边数: 528,765


In [3]:
path = sample_entity_path(min_hops=3, max_hops=6, retry=50_000)
print(triples_to_str_entity(path))

益肝灵滴丸 --适应证→ 慢性肝炎
慢性肝炎 --临床表现→ 肝损害
肝损害 --相关检查→ 尿三胆


In [7]:
import random

# ------------------------------------------
# 1) 建立 “tail → 边列表” 反向索引
#    每条记录都是 (head_val, tail_val, data_dict)
# ------------------------------------------
tail2edges = {}
for _, _, _, d in G.edges(keys=True, data=True):
    h, t = d["head"], d["tail"]
    tail2edges.setdefault(h, []).append((h, t, d))   # 以后要找 "h 做完 tail, 谁再做 head"

# ------------------------------------------
# 2) 深度优先在“边序列”空间里采样
# ------------------------------------------
def sample_entity_path(min_hops=3, max_hops=4, retry=5_000):
    # 随机多次尝试不同起始边，增加成功概率
    all_edges = [(d["head"], d["tail"], d) for _, _, _, d in G.edges(keys=True, data=True)]
    for _ in range(retry):
        edge = random.choice(all_edges)         # 随机起点
        path = [edge]

        while len(path) < max_hops:
            _, tail_cur, _ = path[-1]           # 当前链最后一条的 tail 实体
            candidates = tail2edges.get(tail_cur, [])
            if not candidates:
                break                           # 接不上就终止本条

            nxt = random.choice(candidates)
            # 为简单起见允许 predicate 重复；如需避免重复，可额外判断
            path.append(nxt)

            # 满足 min_hops 后给 30% 概率提前停止
            if len(path) >= min_hops and random.random() < 0.3:
                return path

        if len(path) >= min_hops:
            return path
    raise RuntimeError("在实体层面仍未采到符合要求的链，\
可调大 retry 或放宽约束。")

# ------------------------------------------
# 3) 打印结果
# ------------------------------------------
def triples_to_str_entity(path):
    return "\n".join(f"{h} --{d['predicate']}→ {t}" for h, t, d in path)

path = sample_entity_path(min_hops=3, max_hops=6, retry=20_000)
print(triples_to_str_entity(path))


头孢特仑新戊酯胶囊 --适应证→ 喉炎
喉炎 --临床表现→ 喉部异物感
喉部异物感 --科室→ 耳鼻喉科


In [4]:
"""
generate_medical_qa.py
———————————————
随机采样知识图谱推理链 → 调 GPT 生成中文 Q&A → 保存 JSON 数据集
"""
import os, json, time, random, textwrap
from openai import OpenAI
# 假设已有 G, sample_entity_path, triples_to_str_entity
# from your_sampling_module import G, sample_entity_path, triples_to_str_entity
# ---------- 0. OpenAI 配置 ----------
os.environ["OPENAI_API_KEY"] = "sk-4jnd9yjoIXnQRQ5SXR2b3bVO1d3sHtuyegGMzAl6awSWDRNn"
os.environ["OPENAI_BASE_URL"] = "https://api2.aigcbest.top/v1"
client = OpenAI()

def build_prompt(triples_str: str):
    """
    根据带因果逻辑的多跳三元组链，生成【需要完整链条推理】才能定位末端实体的
    中文医学问答。答案必须是【最后一个 tail 实体】，并用简洁语句说明推理脉络。
    """
    sys_msg = (
        "你是一位资深临床药理学与循证医学专家。"
        "系统将给出一条医学知识图谱推理链（药物 → 疾病 → 症状/检查 → 科室 等）。"
        "请基于整条链条设计一个 **必须通过多跳专业推理** 才能锁定最终实体的中文问题，"
        "并给出对应答案。答案应准确指出【链条最后一个实体】并用一句话概括推理逻辑。"
        "只能返回 JSON，键为 question 与 answer。"
    )

    user_msg = textwrap.dedent(f"""
        以下推理链每行格式为：〈head 实体〉 --〈predicate〉→ 〈tail 实体〉

        ```
        {triples_str}
        ```

        生成要求：
        1. **问题**应从首实体切入，隐含链中每一跳的医学关联或因果，
           使读者只有充分理解整条关系才能确定答案。
           - 可用“哪一科室应负责诊疗”“最终应做哪项检查”之类专业提问。
        2. **答案**必须是最后一个 tail 实体，并用 1 句简洁医学表述
           点明推理核心（勿展开整条链）。
           例： "答案：心内科；因为复方丹参含片主要治疗心绞痛 … 最终应由心内科诊治。"
        3. 不得直接暴露三元组文本；不得生成可由单跳回答的简单事实型问题。
        4. 仅输出符合示例格式的 JSON，无其他内容：
           {{
             "question": "…",
             "answer": "…"
           }}
    """)
    return [{"role": "system", "content": sys_msg},
            {"role": "user",   "content": user_msg}]



# ---------- 2. 调 GPT 生成 Q&A ----------
def generate_qa(triples_str: str, model="gpt-4o-mini", temp=0.5, max_retry=3):
    messages = build_prompt(triples_str)
    for _ in range(max_retry):
        try:
            res = client.chat.completions.create(
                model=model,
                messages=messages,
                temperature=temp,
                timeout=60,
            )
            qa = json.loads(res.choices[0].message.content)
            assert {"question", "answer"} <= qa.keys()
            return qa
        except Exception as e:
            print("⚠️ GPT 调用失败，重试…", e)
            time.sleep(1 + random.random())
    raise RuntimeError("连续多次调用 GPT 失败")

def build_dataset(n_samples=100, min_hops=3, max_hops=6):
    dataset, used_paths = [], set()
    attempts = 0

    while len(dataset) < n_samples:
        attempts += 1

        # ---------- ① 采样 ----------
        path = sample_entity_path(min_hops, max_hops, retry=100_000)
        if path is None:                 # ←← 采样失败直接跳过
            continue

        triples_text = triples_to_str_entity(path)
        if triples_text in used_paths:   # 去重
            continue
        used_paths.add(triples_text)

        # ---------- ② GPT 生成 ----------
        try:
            qa = generate_qa(triples_text)
        except Exception as e:
            print("⚠️ GPT 生成失败，跳过:", e)
            continue                     # GPT 调用出错也跳过

        dataset.append({
            "triples": triples_text.split("\n"),
            "question": qa["question"],
            "answer":  qa["answer"]
        })
        print(f"✔ 已生成 {len(dataset):3d}/{n_samples} 条  (共尝试 {attempts} 次)")

    return dataset



In [5]:
NUM_SAMPLES = 100      # 需要多少条问答
data = build_dataset(NUM_SAMPLES)

out_path = "medical_qa_rule.json"     
with open(out_path, "w", encoding="utf-8") as f:
    json.dump(data, f, ensure_ascii=False, indent=2)

print(f"\n🎉 数据集已保存: {out_path}  (共 {len(data)} 条)")


✔ 已生成   1/100 条  (共尝试 1 次)
✔ 已生成   2/100 条  (共尝试 2 次)
✔ 已生成   3/100 条  (共尝试 3 次)
✔ 已生成   4/100 条  (共尝试 4 次)
✔ 已生成   5/100 条  (共尝试 5 次)
✔ 已生成   6/100 条  (共尝试 6 次)
✔ 已生成   7/100 条  (共尝试 7 次)
✔ 已生成   8/100 条  (共尝试 8 次)
✔ 已生成   9/100 条  (共尝试 9 次)
✔ 已生成  10/100 条  (共尝试 10 次)
✔ 已生成  11/100 条  (共尝试 11 次)
✔ 已生成  12/100 条  (共尝试 12 次)
✔ 已生成  13/100 条  (共尝试 13 次)
✔ 已生成  14/100 条  (共尝试 14 次)
✔ 已生成  15/100 条  (共尝试 15 次)
✔ 已生成  16/100 条  (共尝试 16 次)
✔ 已生成  17/100 条  (共尝试 17 次)
✔ 已生成  18/100 条  (共尝试 18 次)
✔ 已生成  19/100 条  (共尝试 19 次)
✔ 已生成  20/100 条  (共尝试 20 次)
✔ 已生成  21/100 条  (共尝试 21 次)
✔ 已生成  22/100 条  (共尝试 22 次)
✔ 已生成  23/100 条  (共尝试 23 次)
✔ 已生成  24/100 条  (共尝试 24 次)
✔ 已生成  25/100 条  (共尝试 25 次)
✔ 已生成  26/100 条  (共尝试 26 次)
✔ 已生成  27/100 条  (共尝试 27 次)
✔ 已生成  28/100 条  (共尝试 28 次)
✔ 已生成  29/100 条  (共尝试 29 次)
✔ 已生成  30/100 条  (共尝试 30 次)
✔ 已生成  31/100 条  (共尝试 31 次)
✔ 已生成  32/100 条  (共尝试 32 次)
✔ 已生成  33/100 条  (共尝试 33 次)
✔ 已生成  34/100 条  (共尝试 34 次)
✔ 已生成  35/100 条  (共尝试 35 次)
✔ 已生成  36/100 条  (共尝试 36 次)
✔

In [None]:
import pymysql
import time
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def get_connection(retries = 5, delay = 3):
    attempt = 0
    while attempt < retries:
        try:
            connection = pymysql.connect(
                host="172.188.121.85",
                user="root",
                password="1qaz0plm",
                database="umls",
                port=3306,
                cursorclass=pymysql.cursors.DictCursor,
                connect_timeout=10
            )
            print("Connected to the database.")
            return connection
        except pymysql.MySQLError as e:
            attempt += 1
            logging.error(f"Connection attempt {attempt} failed: {e}")
            if attempt < retries:
                logging.info(f"Retrying in {delay} seconds...")
                time.sleep(delay)
            else:
                logging.error("All connection attempts failed. Please check the MySQL server and network settings.")
                raise e
            
def look_up_cui(term):
    """ Retrieve the Concept Unique Identifier (CUI) for a given term from the MRCONSO table in the UMLS database."""
    connection = get_connection()
    try:
        with connection.cursor() as cursor:
            sql = """
                SELECT DISTINCT 
                    CUI
                FROM 
                    MRCONSO
                WHERE 
                    (STR = %s OR STR LIKE %s)
                    AND TTY = 'PT'
                    AND LAT = 'ENG';
            """
            cursor.execute(sql, (term, f"{term}%"))
            result = cursor.fetchall()
            if result:
                return result
            if not result:
                return None
    finally:
        connection.close()
            
def get_term(cui):
    """ Retrieve the preferred term for a given CUI from the MRCONSO table in the UMLS database."""
    connection = get_connection()
    try:
        with connection.cursor() as cursor:
            sql = """
                SELECT DISTINCT 
                    STR
                FROM 
                    MRCONSO
                WHERE 
                    CUI = %s
                    AND TTY = 'PT'
                    AND LAT = 'ENG';
            """
            cursor.execute(sql, (cui,))
            result = cursor.fetchall()
            if result:
                return result
            if not result:
                return [{"STR": "Unknown Term"}]
    finally:
        connection.close()
        
def get_synonyms(cui):
    """ Retrieve synonyms for a given CUI from the MRCONSO table in the UMLS database. """
    connection = get_connection()
    try:
        with connection.cursor() as cursor:
            sql = "SELECT STR FROM MRCONSO WHERE CUI = %s AND TS = 'P' AND STT = 'PF'"
            cursor.execute(sql, (cui,))
            result = cursor.fetchall()
            if result:
                return result
            if not result:
                return None
    finally:
        connection.close()
            
def get_definition(cui):
    """ Retrieve the definition for a given CUI from the MRDEF table in the UMLS database."""
    connection = get_connection()
    try:
        with connection.cursor() as cursor:
            sql = "SELECT DEF FROM MRDEF WHERE CUI = %s"
            cursor.execute(sql, (cui,))
            result = cursor.fetchall()
            if result:
                return result
            if not result:
                return None
    finally:
        connection.close()
        
def get_semantic_type(cui):
    """ Retrieve the semantic type for a given CUI from the UMLS database. """
    connection = get_connection()
    try:
        with connection.cursor() as cursor:
            sql = """
                SELECT DISTINCT 
                    TUI, STY
                FROM 
                    MRSTY
                WHERE 
                    CUI = %s;
            """
            cursor.execute(sql, (cui,))
            result = cursor.fetchall()
            if result:
                return result
            if not result:
                return None
    finally:
        connection.close()
        
def get_relations(cui):
    """
    Retrieve all relationships for a given CUI from the UMLS database.

    Args:
        cui (str): The Concept Unique Identifier (CUI) for which to retrieve relationships.

    Returns:
        list[dict]: A list of dictionaries containing all relationships, 
                    or None if no relationships are found.
    """
    connection = get_connection()
    try:
        with connection.cursor() as cursor:
            # SQL query to find all relationships
            sql = """
                SELECT DISTINCT 
                    R.CUI1 AS SourceCUI,  
                    M1.STR AS SourceTerm,  
                    R.REL AS Relationship, 
                    R.RELA AS RelationshipType, 
                    R.CUI2 AS TargetCUI, 
                    M2.STR AS TargetTerm, 
                    R.SAB AS Source
                FROM 
                    MRREL R
                LEFT JOIN 
                    MRCONSO M1 ON R.CUI1 = M1.CUI  
                LEFT JOIN 
                    MRCONSO M2 ON R.CUI2 = M2.CUI  
                WHERE 
                    (R.CUI1 = %s OR R.CUI2 = %s) -- Match the input CUI as Source or Target
                    AND M1.TTY = 'PT'
                    AND M1.TS = 'P'
                    AND M2.TTY = 'PT'
                    AND M2.TS = 'P'
                    AND M1.LAT = 'ENG'
                    AND M2.LAT = 'ENG';
            """
            # Execute the query with the provided CUI
            cursor.execute(sql, (cui, cui))
            result = cursor.fetchall()
            if result:
                # Return in a clear and structured format
                return [
                    {
                        "SourceCUI": row["SourceCUI"],
                        "SourceTerm": row["SourceTerm"],
                        "Relationship": row["Relationship"],
                        "RelationshipType": row["RelationshipType"],
                        "TargetCUI": row["TargetCUI"],
                        "TargetTerm": row["TargetTerm"],
                        "Source": row["Source"],
                    }
                    for row in result
                ]
            else:
                logging.debug(f"No relationships found for CUI: {cui}")
                return None
    except Exception as e:
        logging.error(f"An error occurred while fetching all relationships: {e}")
        return None
    finally:
        connection.close()
        
def get_specific_relation(cui, relationship_type):
    """
    Retrieve specific relationships for a given CUI from the UMLS database.

    Args:
        cui (str): The Concept Unique Identifier (CUI) for which to retrieve relationships.
        relationship_type (str): The type of relationship to retrieve (e.g., 'children', 'parents', 'descendents').

    Returns:
        list[dict]: A list of dictionaries containing the specific relationships, 
                    or None if no relationships are found.
    """
    connection = get_connection()
    try:
        with connection.cursor() as cursor:
            # SQL query to find specific relationships
            sql = """
                SELECT DISTINCT 
                    R.CUI1 AS SourceCUI,  
                    M1.STR AS SourceTerm,  
                    R.REL AS Relationship, 
                    R.RELA AS RelationshipType, 
                    R.CUI2 AS TargetCUI, 
                    M2.STR AS TargetTerm, 
                    R.SAB AS Source
                FROM 
                    MRREL R
                LEFT JOIN 
                    MRCONSO M1 ON R.CUI1 = M1.CUI  
                LEFT JOIN 
                    MRCONSO M2 ON R.CUI2 = M2.CUI  
                WHERE 
                    (R.CUI1 = %s OR R.CUI2 = %s) -- Match the input CUI as Source or Target
                    AND R.RELA = %s
                    AND M1.TTY = 'PT'
                    AND M1.TS = 'P'
                    AND M2.TTY = 'PT'
                    AND M2.TS = 'P'
                    AND M1.LAT = 'ENG'
                    AND M2.LAT = 'ENG';
            """
            # Execute the query with the provided CUI and relationship type
            cursor.execute(sql, (cui, cui, relationship_type))
            result = cursor.fetchall()
            if result:
                # Return in a clear and structured format
                return [
                    {
                        "SourceCUI": row["SourceCUI"],
                        "SourceTerm": row["SourceTerm"],
                        "Relationship": row["Relationship"],
                        "RelationshipType": row["RelationshipType"],
                        "TargetCUI": row["TargetCUI"],
                        "TargetTerm": row["TargetTerm"],
                        "Source": row["Source"],
                    }
                    for row in result
                ]
            else:
                logging.debug(f"No '{relationship_type}' relationships found for CUI: {cui}")
                return None
    except Exception as e:
        logging.error(f"An error occurred while fetching specific relationships: {e}")
        return None
    finally:
        connection.close()

def get_ro_relations(cui):
    """
    Retrieve 'RO' (Related To) relationships for a given CUI from the UMLS database.

    Args:
        cui (str): The Concept Unique Identifier (CUI) for which to retrieve relationships.

    Returns:
        list[dict]: A list of dictionaries containing 'RO' relationships, or None if no relationships are found.
    """
    connection = get_connection()  # Assumes `get_connection` returns a valid database connection.
    try:
        with connection.cursor() as cursor:
            # SQL query to fetch RO relationships
            sql = """
                SELECT DISTINCT 
                    R.CUI1 AS SourceCUI, 
                    M1.STR AS SourceTerm,
                    R.CUI2 AS TargetCUI, 
                    M2.STR AS TargetTerm,
                    R.REL AS Relationship, 
                    R.RELA AS RelationshipType, 
                    R.SAB AS Source
                FROM 
                    MRREL R
                LEFT JOIN 
                    MRCONSO M1 ON R.CUI1 = M1.CUI
                LEFT JOIN 
                    MRCONSO M2 ON R.CUI2 = M2.CUI
                WHERE 
                    R.CUI1 = %s
                    AND R.REL = 'RO'
                    AND R.SAB != 'NCI'
                    AND M1.LAT = 'ENG'
                    AND M1.TTY = 'PT' 
                    AND M2.LAT = 'ENG'
                    AND M2.TTY = 'PT';
            """
            cursor.execute(sql, (cui,))
            result = cursor.fetchall()
            if result:
                return [
                    {
                        "SourceCUI": row["SourceCUI"],
                        "SourceTerm": row["SourceTerm"],
                        "TargetCUI": row["TargetCUI"],
                        "TargetTerm": row["TargetTerm"],
                        "Relationship": row["Relationship"],
                        "RelationshipType": row["RelationshipType"],
                        "Source": row["Source"]
                    }
                    for row in result
                ]
            else:
                logging.debug(f"No RO relationships found for CUI: {cui}")
                return None
    except Exception as e:
        logging.error(f"An error occurred while fetching RO relationships: {e}")
        return None
    finally:
        connection.close()
        
            
def get_parent_from_snomedct(cui):
    """
    Retrieve 'isa' parent relationships for a given CUI from the SNOMEDCT_US database.

    Args:
        cui (str): The Concept Unique Identifier (CUI) for which to retrieve parent relationships.

    Returns:
        list[dict]: A list of dictionaries containing the parent relationships, 
                    or None if no parents are found.
    """
    connection = get_connection()  # Assumes `get_connection` is a valid function returning a database connection.
    try:
        with connection.cursor() as cursor:
            # SQL query to find 'isa' parent relationships
            sql = """
                SELECT DISTINCT 
                    R.CUI1 AS ChildID,  
                    M1.STR AS ChildTerm,  
                    R.RELA AS RelationshipType, 
                    R.CUI2 AS ParentID, 
                    M2.STR AS ParentTerm
                FROM 
                    MRREL R
                JOIN 
                    MRCONSO M1 ON R.CUI1 = M1.CUI  
                JOIN 
                    MRCONSO M2 ON R.CUI2 = M2.CUI  
                WHERE 
                    R.CUI1 = %s
                    AND R.RELA = 'inverse_isa'
                    AND R.REL = 'PAR'
                    AND R.SAB = 'SNOMEDCT_US'
                    AND M1.SAB = 'SNOMEDCT_US'
                    AND M1.TTY = 'PT'   
                    AND M2.SAB = 'SNOMEDCT_US'
                    AND M2.TTY = 'PT' 
                    AND M2.TS = 'P'   
                    AND M1.LAT = 'ENG'
                    AND M2.LAT = 'ENG';
            """
            # Execute the query with the provided CUI
            cursor.execute(sql, (cui,))
            result = cursor.fetchall()
            if result:
                # Return in a clear and structured format
                return [
                    {
                        "ChildID": row["ChildID"],
                        "ChildTerm": row["ChildTerm"],
                        "RelationshipType": 'isa',
                        "ParentID": row["ParentID"],
                        "ParentTerm": row["ParentTerm"]
                    }
                    for row in result
                ]
            else:
                logging.debug(f"No parents found for CUI: {cui}")
                return None
    except Exception as e:
        logging.error(f"An error occurred while fetching isa parents: {e}")
        return None
    finally:
        connection.close()
        
def get_children_from_snomedct(cui):
    """
    Retrieve 'inverse_isa' child relationships for a given CUI from the SNOMEDCT_US database.

    Args:
        cui (str): The Concept Unique Identifier (CUI) for which to retrieve child relationships.

    Returns:
        list[dict]: A list of dictionaries containing child and parent relationships, 
                    or None if no children are found.
    """
    connection = get_connection()  # Assumes `get_connection` is a valid function returning a database connection.
    try:
        with connection.cursor() as cursor:
            # SQL query to find 'inverse_isa' child relationships
            sql = """
                SELECT DISTINCT 
                    R.CUI1 AS ChildID,  
                    M1.STR AS ChildTerm,  
                    R.RELA AS RelationshipType, 
                    R.CUI2 AS ParentID, 
                    M2.STR AS ParentTerm
                FROM 
                    MRREL R
                JOIN 
                    MRCONSO M1 ON R.CUI1 = M1.CUI  
                JOIN 
                    MRCONSO M2 ON R.CUI2 = M2.CUI  
                WHERE 
                    R.CUI2 = %s  -- Replace the placeholder with the provided CUI
                    AND R.RELA = 'inverse_isa'
                    AND R.REL = 'PAR'
                    AND R.SAB = 'SNOMEDCT_US'      
                    AND M1.SAB = 'SNOMEDCT_US'   
                    AND M1.TTY = 'PT'
                    AND M2.SAB = 'SNOMEDCT_US'    
                    AND M2.TTY = 'PT';
            """
            # Execute the query with the provided CUI
            cursor.execute(sql, (cui,))
            result = cursor.fetchall()
            if result:
                # Return in a clear and structured format
                return [
                    {
                        "ParentID": row["ParentID"],
                        "ParentTerm": row["ParentTerm"],
                        "RelationshipType": 'isa',
                        "ChildID": row["ChildID"],
                        "ChildTerm": row["ChildTerm"],
                    }
                    for row in result
                ]
            else:
                return None
    except Exception as e:
        logging.error(f"An error occurred while fetching inverse_isa children: {e}")
        return None
    finally:
        connection.close()
        
def get_treatments(cui):
    """
    Retrieve treatments for a given disease CUI from the UMLS database.

    Args:
        cui (str): The Concept Unique Identifier (CUI) for the disease.

    Returns:
        list[dict]: A list of dictionaries containing treatments for the disease, 
                    or None if no treatments are found.
    """
    connection = get_connection()  # Assumes `get_connection` is a valid function returning a database connection.
    try:
        with connection.cursor() as cursor:
            # SQL query to find treatments
            sql = """
                SELECT DISTINCT 
                    R.CUI1 AS DiseaseCUI,  
                    M1.STR AS DiseaseTerm,  
                    R.REL AS Relationship, 
                    R.RELA AS RelationshipType, 
                    R.CUI2 AS TreatmentCUI, 
                    M2.STR AS TreatmentTerm
                FROM 
                    MRREL R
                LEFT JOIN 
                    MRCONSO M1 ON R.CUI1 = M1.CUI  
                LEFT JOIN 
                    MRCONSO M2 ON R.CUI2 = M2.CUI  
                WHERE 
                    R.CUI1 = %s
                    AND R.RELA IN ('may_treat') -- Treatment relationships
                    AND M1.TTY = 'PT'
                    AND M2.TTY = 'PT'
                    AND M2.TS = 'P'
                    AND M1.LAT = 'ENG'
                    AND M2.LAT = 'ENG'
                GROUP BY 
                    R.CUI1, R.CUI2;
            """
            # Execute the query with the provided CUI
            cursor.execute(sql, (cui,))
            result = cursor.fetchall()
            if result:
                # Return in a clear and structured format
                return [
                    {
                        "DiseaseCUI": row["DiseaseCUI"],
                        "DiseaseTerm": row["DiseaseTerm"],
                        "Relationship": row["Relationship"],
                        "RelationshipType": row["RelationshipType"],
                        "TreatmentCUI": row["TreatmentCUI"],
                        "TreatmentTerm": row["TreatmentTerm"]
                    }
                    for row in result
                ]
            else:
                logging.debug(f"No treatments found for Disease CUI: {cui}")
                return None
    except Exception as e:
        logging.error(f"An error occurred while fetching treatments: {e}")
        return None
    finally:
        connection.close()
        
def has_manifestation(cui):
    """
    Check if a given CUI has a manifestation relationship in the UMLS database.

    Args:
        cui (str): The Concept Unique Identifier (CUI) to check for manifestation relationships.

    Returns:
        bool: True if the CUI has a manifestation relationship, False otherwise.
    """
    connection = get_connection()  # Assumes `get_connection` is a valid function returning a database connection.
    try:
         with connection.cursor() as cursor:
            # SQL query to find treatments
            sql = """
                SELECT DISTINCT 
                    R.CUI1 AS DiseaseCUI,  
                    M1.STR AS DiseaseTerm,  
                    R.REL AS Relationship, 
                    R.RELA AS RelationshipType, 
                    R.CUI2 AS TreatmentCUI, 
                    M2.STR AS TreatmentTerm
                FROM 
                    MRREL R
                LEFT JOIN 
                    MRCONSO M1 ON R.CUI1 = M1.CUI  
                LEFT JOIN 
                    MRCONSO M2 ON R.CUI2 = M2.CUI  
                WHERE 
                    R.CUI1 = %s
                    AND R.RELA IN ('has_manifestation', 'manifestation_of') -- Treatment relationships
                    AND M1.SAB = 'SNOMEDCT_US'
                    AND M1.TS = 'P'
                    AND M1.TTY = 'PT'
                    AND M2.SAB = 'SNOMEDCT_US'
                    AND M2.TS = 'P'
                    AND M2.TTY = 'PT'
                    AND M1.LAT = 'ENG'
                    AND M2.LAT = 'ENG';
            """
            # Execute the query with the provided CUI
            cursor.execute(sql, (cui,))
            result = cursor.fetchall()
            if result:
                # Return in a clear and structured format
                return [
                    {
                        "DiseaseCUI": row["DiseaseCUI"],
                        "DiseaseTerm": row["DiseaseTerm"],
                        "Relationship": row["Relationship"],
                        "RelationshipType": row["RelationshipType"],
                        "TreatmentCUI": row["TreatmentCUI"],
                        "TreatmentTerm": row["TreatmentTerm"]
                    }
                    for row in result
                ]
            else:
                return None
    except Exception as e:
        logging.error(f"An error occurred while checking for manifestation relationships: {e}")
        return None
    finally:
        connection.close()
        
def has_associated_finding(cui):
    """
    Retrieve 'has_associated_finding' relationships for a given CUI from the UMLS database.

    Args:
        cui (str): The Concept Unique Identifier (CUI) to query.

    Returns:
        list[dict]: A list of dictionaries containing associated findings, 
                    or None if no associated findings are found.
    """
    connection = get_connection()  # Assumes `get_connection` is a valid function returning a database connection.
    try:
        with connection.cursor() as cursor:
            # SQL query to find associated findings
            sql = """
                SELECT DISTINCT 
                    R.CUI1 AS SourceCUI,  
                    M1.STR AS SourceTerm,  
                    R.REL AS Relationship, 
                    R.RELA AS RelationshipType, 
                    R.CUI2 AS TargetCUI, 
                    M2.STR AS TargetTerm
                FROM 
                    MRREL R
                LEFT JOIN 
                    MRCONSO M1 ON R.CUI1 = M1.CUI  
                LEFT JOIN 
                    MRCONSO M2 ON R.CUI2 = M2.CUI  
                WHERE 
                    R.CUI1 = %s
                    AND R.RELA IN ('has_associated_finding', 'associated_finding_of', 'see_from', 'see', 'interprets', 'is_interpreted_by')
                    AND M1.SAB = 'SNOMEDCT_US'
                    AND M1.TS = 'P'
                    AND M1.TTY = 'PT'
                    AND M2.SAB = 'SNOMEDCT_US'
                    AND M2.TS = 'P'
                    AND M2.TTY = 'PT'
                    AND M1.LAT = 'ENG'
                    AND M2.LAT = 'ENG';
            """
            # Execute the query with the provided CUI
            cursor.execute(sql, (cui,))
            result = cursor.fetchall()
            if result:
                # Return in a clear and structured format
                return [
                    {
                        "SourceCUI": row["SourceCUI"],
                        "SourceTerm": row["SourceTerm"],
                        "Relationship": row["Relationship"],
                        "RelationshipType": row["RelationshipType"],
                        "TargetCUI": row["TargetCUI"],
                        "TargetTerm": row["TargetTerm"]
                    }
                    for row in result
                ]
            else:
                logging.debug(f"No associated findings found for CUI: {cui}")
                return None
    except Exception as e:
        logging.error(f"An error occurred while fetching associated findings: {e}")
        return None
    finally:
        connection.close()
        
        
def get_tradename(cui):
    """
    Retrieve tradenames for a given substance CUI from the UMLS database.

    Args:
        cui (str): The Concept Unique Identifier (CUI) for the substance.

    Returns:
        list[dict]: A list of dictionaries containing tradenames, or None if no tradenames are found.
    """
    connection = get_connection()
    try:
        with connection.cursor() as cursor:
            sql = """
                SELECT DISTINCT 
                    R.CUI1 AS SubstanceCUI, 
                    M1.STR AS SubstanceTerm, 
                    R.REL AS Relationship, 
                    R.RELA AS SpecificRelationship, 
                    R.CUI2 AS TradenameCUI, 
                    M2.STR AS Tradename
                FROM 
                    MRREL R
                LEFT JOIN 
                    MRCONSO M1 ON R.CUI1 = M1.CUI
                LEFT JOIN 
                    MRCONSO M2 ON R.CUI2 = M2.CUI
                WHERE 
                    R.CUI1 = %s
                    AND R.RELA = 'tradename_of'
                    AND M1.LAT = 'ENG'
                    AND M2.LAT = 'ENG'
                GROUP BY 
                    R.CUI1, R.CUI2;
            """
            cursor.execute(sql, (cui,))
            result = cursor.fetchall()
            if result:
                return [
                    {
                        "SubstanceCUI": row["SubstanceCUI"],
                        "SubstanceTerm": row["SubstanceTerm"],
                        "Relationship": row["Relationship"],
                        "SpecificRelationship": row["SpecificRelationship"],
                        "TradenameCUI": row["TradenameCUI"],
                        "Tradename": row["Tradename"]
                    }
                    for row in result
                ]
            else:
                logging.debug(f"No tradenames found for Substance CUI: {cui}")
                return None
    except Exception as e:
        logging.error(f"An error occurred while fetching tradenames: {e}")
        return None
    finally:
        connection.close()
        
        

In [24]:
get_tradename(cui)

Connected to the database.


In [23]:
has_associated_finding(cui)

Connected to the database.


[{'SourceCUI': 'C0000737',
  'SourceTerm': 'Abdominal pain',
  'Relationship': 'RO',
  'RelationshipType': 'has_associated_finding',
  'TargetCUI': 'C0423651',
  'TargetTerm': 'No abdominal pain'},
 {'SourceCUI': 'C0000737',
  'SourceTerm': 'Abdominal pain',
  'Relationship': 'RO',
  'RelationshipType': 'associated_finding_of',
  'TargetCUI': 'C1689949',
  'TargetTerm': 'Finding reported by subject or history provider'}]

In [None]:
# test
if __name__ == "__main__":
    # Example usage
    cui = "C0000737"  # Example CUI
    print("CUI:", cui)
    
    # Look up CUI
    print("Look up CUI:", look_up_cui("Aspirin"))
    print("Get treatments:", get_treatments(cui))
    """
    
    # Get term
    print("Get term:", get_term(cui))
    
    # Get synonyms
    print("Get synonyms:", get_synonyms(cui))
    
    # Get definition
    print("Get definition:", get_definition(cui))
    
    # Get semantic type
    print("Get semantic type:", get_semantic_type(cui))
    
    # Get relations
    print("Get relations:", get_relations(cui))
    
    # Get specific relation
    print("Get specific relation (children):", get_specific_relation(cui, 'children'))
    
    # Get RO relations
    print("Get RO relations:", get_ro_relations(cui))
    
    # Get parent from SNOMEDCT
    print("Get parent from SNOMEDCT:", get_parent_from_snomedct(cui))
    
    # Get children from SNOMEDCT
    print("Get children from SNOMEDCT:", get_children_from_snomedct(cui))
    
    # Get treatments
    print("Get treatments:", get_treatments(cui))
    
    # Check for manifestation
    print("Check for manifestation:", has_manifestation(cui))
    
    # Check for associated finding
    print("Check for associated finding:", has_associated_finding(cui))
    
    """

CUI: C0000737
Connected to the database.


In [28]:
import pymysql
import logging
import time

# ---------- 复用你已有的 get_connection ----------
def get_connection(retries=5, delay=3):
    attempt = 0
    while attempt < retries:
        try:
            connection = pymysql.connect(
                host="172.188.121.85",
                user="root",
                password="1qaz0plm",
                database="umls",
                port=3306,
                cursorclass=pymysql.cursors.DictCursor,
                connect_timeout=10,
            )
            print("Connected to the database.")
            return connection
        except pymysql.MySQLError as e:
            attempt += 1
            logging.error(f"Connection attempt {attempt} failed: {e}")
            if attempt < retries:
                logging.info(f"Retrying in {delay} seconds...")
                time.sleep(delay)
            else:
                logging.error(
                    "All connection attempts failed. "
                    "Please check the MySQL server and network settings."
                )
                raise

# ---------- 查询 MRREL 中的 REL / RELA ----------
def list_rel_and_rela():
    conn = get_connection()
    try:
        with conn.cursor() as cur:
            # 1) 统计不同的 REL 及出现次数
            cur.execute("""
                SELECT REL               AS code,
                       COUNT(*)          AS freq
                FROM   MRREL
                GROUP  BY REL
                ORDER  BY freq DESC;
            """)
            rel_rows = cur.fetchall()

            # 2) 统计不同的 RELA（排除 NULL）
            cur.execute("""
                SELECT RELA              AS code,
                       COUNT(*)          AS freq
                FROM   MRREL
                WHERE  RELA IS NOT NULL
                GROUP  BY RELA
                ORDER  BY freq DESC;
            """)
            rela_rows = cur.fetchall()

        # ----------- 打印结果 ----------
        print("=== REL codes (粗粒度关系) ===")
        for row in rel_rows:
            print(f"{row['code']:<10} {row['freq']}")

        print("\n=== RELA codes (精细关系) ===")
        print(len(rela_rows))
        for row in rela_rows:
            print(f"{row['code']:<20} {row['freq']}")

    finally:
        conn.close()

# ---------- 主程序 ----------
if __name__ == "__main__":
    list_rel_and_rela()


Connected to the database.
=== REL codes (粗粒度关系) ===
RO         17040658
CHD        3160234
PAR        3160234
SY         2860442
RB         1473558
RN         1473558
AQ         630118
QB         630118
RQ         502714

=== RELA codes (精细关系) ===
886
inverse_isa          2034000
isa                  2034000
inactive_ingredient_of 1652256
has_inactive_ingredient 1652256
mapped_from          459880
mapped_to            459880
has_finding_site     402331
finding_site_of      402331
subset_includes_concept 376821
concept_in_subset    376821
has_method           376290
method_of            376290
expanded_form_of     370032
has_expanded_form    370032
has_component        347457
component_of         347457
active_ingredient_of 332240
has_active_ingredient 332240
has_associated_morphology 303711
associated_morphology_of 303711
class_of             273523
has_class            273523
active_moiety_of     264236
has_active_moiety    264236
has_ingredient       221394
ingredient_of        2213