# DATASCI 290 — Mini SNOMED Knowledge Graph (Public Snowstorm → Neo4j AuraDB)

This notebook is a scaffold to help you go from **SNOMED CT concepts** to a small **property-graph KG** in **Neo4j AuraDB**.

You will:
- fetch concepts + relationships from a public Snowstorm training API
- build a bounded mini-KG (small on purpose)
- load it into Neo4j AuraDB
- run verification queries

You do not install Snowstorm. You only call its public API.


## 0) Install dependencies (run once)

In [None]:
# If needed:
# !pip install requests pandas neo4j

import os
import time
import random
from typing import Dict, List, Tuple, Optional, Set

import requests
import pandas as pd
from neo4j import GraphDatabase


## 1) Configuration

A) Public Snowstorm endpoints (training).  
B) Neo4j AuraDB credentials (student-specific).


In [None]:
# Snowstorm (PUBLIC TRAINING)
SNOWSTORM_FHIR_BASE = "https://snowstorm-training.snomedtools.org/fhir"
SNOWSTORM_REST_BASE = "https://snowstorm-training.snomedtools.org/snowstorm/snomed-ct"
BRANCH = "MAIN"
SNOMED_SYSTEM = "http://snomed.info/sct"
SNOWSTORM_BEARER_TOKEN = ""  # leave blank

# Be polite to shared servers
MIN_SLEEP_SEC = 0.05
MAX_RETRIES_429 = 8

# Neo4j AuraDB (fill these)
NEO4J_URI = os.environ.get("NEO4J_URI", "neo4j+s://YOUR_AURA_HOST.databases.neo4j.io")
NEO4J_USER = os.environ.get("NEO4J_USER", "neo4j")
NEO4J_PASSWORD = os.environ.get("NEO4J_PASSWORD", "YOUR_PASSWORD")

# Scope controls (keep KG small)
MAX_TOTAL_CONCEPTS = 800
MAX_ATTR_NEIGHBORS = 3
MAX_HOPS_ATTR = 1


## 2) Robust HTTP helpers (handles 429 rate limiting)

In [None]:
def _headers_json() -> Dict[str, str]:
    h = {"Accept": "application/json", "User-Agent": "DATASCI290-Snowstorm-Client/1.0"}
    if SNOWSTORM_BEARER_TOKEN:
        h["Authorization"] = f"Bearer {SNOWSTORM_BEARER_TOKEN}"
    return h

def _headers_fhir() -> Dict[str, str]:
    h = {"Accept": "application/fhir+json", "User-Agent": "DATASCI290-Snowstorm-Client/1.0"}
    if SNOWSTORM_BEARER_TOKEN:
        h["Authorization"] = f"Bearer {SNOWSTORM_BEARER_TOKEN}"
    return h

def _get(url: str, headers: Dict[str, str], params: Optional[Dict] = None, max_retries: int = MAX_RETRIES_429) -> Dict:
    params = params or {}
    backoff = 1.0
    for attempt in range(max_retries):
        r = requests.get(url, headers=headers, params=params, timeout=60)
        if r.status_code == 429:
            retry_after = r.headers.get("Retry-After")
            sleep_s = backoff
            if retry_after is not None:
                try:
                    sleep_s = float(retry_after)
                except ValueError:
                    pass
            sleep_s += random.uniform(0, 0.25)
            print(f"[429] Rate limited. Sleeping {sleep_s:.2f}s (attempt {attempt+1}/{max_retries})")
            time.sleep(sleep_s)
            backoff = min(backoff * 2, 30.0)
            continue
        r.raise_for_status()
        time.sleep(MIN_SLEEP_SEC)
        return r.json()
    raise RuntimeError("Exceeded retry budget due to repeated 429 responses.")


## 3) Snowstorm FHIR: $lookup (SCTID → preferred label)

In [None]:
def fhir_get(path: str, params: Optional[Dict] = None) -> Dict:
    url = SNOWSTORM_FHIR_BASE.rstrip("/") + "/" + path.lstrip("/")
    return _get(url, headers=_headers_fhir(), params=params)

def snomed_lookup_display(sctid: str) -> str:
    out = fhir_get("CodeSystem/$lookup", params={"system": SNOMED_SYSTEM, "code": sctid})
    if "display" in out:
        return out["display"]
    for p in out.get("parameter", []):
        if p.get("name") == "display":
            return p.get("valueString", "")
    return ""

cap = fhir_get("metadata")
print("FHIR server OK. Software:", cap.get("software", {}).get("name"), cap.get("software", {}).get("version"))
print("Example lookup 29857009:", snomed_lookup_display("29857009"))


## 4) Snowstorm REST: search concepts by text (find seed SCTIDs)

In [None]:
def rest_get(path: str, params: Optional[Dict] = None) -> Dict:
    url = SNOWSTORM_REST_BASE.rstrip("/") + "/" + path.lstrip("/")
    return _get(url, headers=_headers_json(), params=params)

def search_concepts(term: str, limit: int = 10, active: bool = True) -> List[Dict]:
    out = rest_get(f"{BRANCH}/concepts", params={
        "term": term, "limit": limit, "offset": 0, "activeFilter": str(active).lower()
    })
    return out.get("items", [])

hits = search_concepts("chest pain", limit=5)
for h in hits:
    cid = h.get("conceptId")
    pt = (h.get("pt") or {}).get("term")
    fsn = (h.get("fsn") or {}).get("term")
    print(cid, "|", pt, "|", fsn)


## 5) Choose your scope + seeds (10–20 SCTIDs)

In [None]:
SEED_CONCEPTS = [
    # TODO: paste 10–20 SCTIDs here
    # Example:
    # "29857009",  # Chest pain
]

assert len(SEED_CONCEPTS) > 0, "Fill SEED_CONCEPTS with 10–20 SCTIDs."
print("Seed count:", len(SEED_CONCEPTS))


## 6) Fetch concept JSON (parents + relationships)

In [None]:
def get_concept_browser(concept_id: str) -> Dict:
    return rest_get(f"browser/{BRANCH}/concepts/{concept_id}")

def parse_parents(concept_json: Dict) -> List[str]:
    parents = []
    for p in concept_json.get("parents", []):
        if isinstance(p, dict) and "conceptId" in p:
            parents.append(str(p["conceptId"]))
        elif isinstance(p, str):
            parents.append(p)
    return parents

def parse_attribute_relationships(concept_json: Dict) -> List[Tuple[str, str, str]]:
    rels = []
    for r in concept_json.get("relationships", []):
        if not isinstance(r, dict):
            continue
        if r.get("active") is False:
            continue
        type_obj = r.get("type") or {}
        type_id = str(type_obj.get("conceptId", ""))
        if type_id == "116680003":  # IS_A
            continue
        type_term = ""
        pt = type_obj.get("pt")
        if isinstance(pt, dict):
            type_term = pt.get("term", "")
        target = r.get("target") or {}
        dest_id = str(target.get("conceptId", ""))
        if type_id and dest_id:
            rels.append((type_id, type_term, dest_id))
    return rels

test_id = SEED_CONCEPTS[0]
cj = get_concept_browser(test_id)
print("PT:", (cj.get("pt") or {}).get("term"))
print("Parents:", parse_parents(cj)[:5])
print("Attr rels sample:", parse_attribute_relationships(cj)[:3])


## 7) Build the bounded mini-KG

In [None]:
def build_mini_kg(seeds: List[str], max_total: int, max_attr_neighbors: int, max_hops_attr: int):
    concepts: Set[str] = set()
    isa_edges: Set[Tuple[str, str]] = set()
    rel_edges: Set[Tuple[str, str, str, str]] = set()
    cache: Dict[str, Dict] = {}

    def get_cached(cid: str) -> Dict:
        if cid not in cache:
            cache[cid] = get_concept_browser(cid)
        return cache[cid]

    frontier: List[Tuple[str, int]] = [(s, 0) for s in seeds]
    seen: Set[str] = set(seeds)

    while frontier and len(concepts) < max_total:
        cid, depth = frontier.pop(0)
        concepts.add(cid)
        cj = get_cached(cid)

        # IS_A parents
        for p in parse_parents(cj):
            isa_edges.add((cid, p))
            if p not in concepts and len(concepts) < max_total:
                concepts.add(p)
            if p not in seen and len(concepts) < max_total:
                frontier.append((p, depth))
                seen.add(p)

        # Attribute edges (bounded)
        rels = parse_attribute_relationships(cj)[:max_attr_neighbors]
        for (type_id, type_term, dst) in rels:
            rel_edges.add((cid, type_id, type_term, dst))
            if dst not in concepts and len(concepts) < max_total:
                concepts.add(dst)
            if depth < max_hops_attr and dst not in seen and len(concepts) < max_total:
                frontier.append((dst, depth + 1))
                seen.add(dst)

    # Ancestor closure (bounded)
    changed = True
    while changed and len(concepts) < max_total:
        changed = False
        for cid in list(concepts):
            try:
                cj = get_cached(cid)
            except Exception:
                continue
            for p in parse_parents(cj):
                if (cid, p) not in isa_edges:
                    isa_edges.add((cid, p))
                    changed = True
                if p not in concepts and len(concepts) < max_total:
                    concepts.add(p)
                    changed = True

    return concepts, isa_edges, rel_edges

concepts, isa_edges, rel_edges = build_mini_kg(
    seeds=SEED_CONCEPTS,
    max_total=MAX_TOTAL_CONCEPTS,
    max_attr_neighbors=MAX_ATTR_NEIGHBORS,
    max_hops_attr=MAX_HOPS_ATTR,
)

print("Concepts:", len(concepts))
print("IS_A edges:", len(isa_edges))
print("REL edges:", len(rel_edges))


## 8) Create node/edge tables (inspect before loading)

In [None]:
display_cache: Dict[str, str] = {}

def display_for(cid: str) -> str:
    if cid not in display_cache:
        display_cache[cid] = snomed_lookup_display(cid)
    return display_cache[cid]

df_concepts = pd.DataFrame([{"sctid": c, "pt": display_for(c)} for c in sorted(concepts)])
df_isa = pd.DataFrame(sorted(list(isa_edges)), columns=["child", "parent"])
df_rel = pd.DataFrame(sorted(list(rel_edges)), columns=["src", "typeId", "typeTerm", "dst"])

df_concepts.head(), df_isa.head(), df_rel.head()


## 9) Load into Neo4j AuraDB (MERGE upserts)

In [None]:
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))

def run_cypher(q: str, params: Optional[Dict] = None):
    with driver.session() as session:
        return session.run(q, params or {}).data()

# Constraint
run_cypher("CREATE CONSTRAINT concept_sctid IF NOT EXISTS FOR (c:Concept) REQUIRE c.sctid IS UNIQUE")

def upsert_concepts(rows: List[Dict], batch_size: int = 500):
    q = """
    UNWIND $rows AS row
    MERGE (c:Concept {sctid: row.sctid})
    SET c.pt = row.pt
    """
    for i in range(0, len(rows), batch_size):
        run_cypher(q, {"rows": rows[i:i+batch_size]})

def upsert_isa(rows: List[Dict], batch_size: int = 1000):
    q = """
    UNWIND $rows AS row
    MATCH (child:Concept {sctid: row.child})
    MATCH (parent:Concept {sctid: row.parent})
    MERGE (child)-[:IS_A]->(parent)
    """
    for i in range(0, len(rows), batch_size):
        run_cypher(q, {"rows": rows[i:i+batch_size]})

def upsert_rel(rows: List[Dict], batch_size: int = 1000):
    q = """
    UNWIND $rows AS row
    MATCH (src:Concept {sctid: row.src})
    MATCH (dst:Concept {sctid: row.dst})
    MERGE (src)-[r:REL {typeId: row.typeId}]->(dst)
    SET r.typeTerm = row.typeTerm
    """
    for i in range(0, len(rows), batch_size):
        run_cypher(q, {"rows": rows[i:i+batch_size]})

upsert_concepts(df_concepts.to_dict("records"))
upsert_isa(df_isa.to_dict("records"))
upsert_rel(df_rel.to_dict("records"))

print("Loaded into AuraDB.")


## 10) Verification queries

In [None]:
run_cypher("""
MATCH (c:Concept) WITH count(c) AS concept_nodes
MATCH ()-[r:IS_A]->() WITH concept_nodes, count(r) AS isa_edges
MATCH ()-[r:REL]->() RETURN concept_nodes, isa_edges, count(r) AS rel_edges
""")


In [None]:
seed = SEED_CONCEPTS[0]
run_cypher("""
MATCH p = (c:Concept {sctid: $seed})-[:IS_A*1..3]->(a:Concept)
RETURN c.pt AS seed_term, [n IN nodes(p)[1..] | n.pt] AS ancestors
LIMIT 10
""", {"seed": seed})


In [None]:
seed = SEED_CONCEPTS[0]
run_cypher("""
MATCH (c:Concept {sctid: $seed})-[r:REL]->(d:Concept)
RETURN c.pt AS src, r.typeId AS rel_typeId, r.typeTerm AS rel_type, d.pt AS dst
LIMIT 25
""", {"seed": seed})


In [None]:
driver.close()
print("Done.")
