<a href="https://colab.research.google.com/github/cartiktq/AcademicAnalytics/blob/main/KnowledgeGraphGenerator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
S = """from langgraph.graph import StateGraph, END
from typing import TypedDict, List, Dict, Any
from agents import (
    generate_notes,             # Node 1
    anonymize_notes,           # Node 2
    evaluate_anonymization,    # Node 3
    extract_entities,          # Node 4
    map_to_umls_concepts,      # Node 5
    map_to_local_aui,          # Node 6
    build_patient_knowledge_graph,  # Node 7
    visualize_knowledge_graph       # Node 8
)

class WorkflowState(TypedDict):
    raw_notes_folder: str
    anonymized_folder: str
    comparison_metric: float
    extracted_csv: str
    umls_csv: str
    aui_csv: str
    graph_objects: List[Dict[str, Any]]
    visualizations: List[Any]

graph = StateGraph(WorkflowState)

# Register nodes
graph.add_node("generate_notes", generate_notes)
graph.add_node("anonymize_notes", anonymize_notes)
graph.add_node("evaluate_anonymization", evaluate_anonymization)
graph.add_node("extract_entities", extract_entities)
graph.add_node("map_to_umls_concepts", map_to_umls_concepts)
graph.add_node("map_to_local_aui", map_to_local_aui)
graph.add_node("build_knowledge_graph", build_patient_knowledge_graph)
graph.add_node("visualize_knowledge_graph", visualize_knowledge_graph)

# Edges (linear)
graph.set_entry_point("generate_notes")
graph.add_edge("generate_notes", "anonymize_notes")
graph.add_edge("anonymize_notes", "evaluate_anonymization")
graph.add_edge("evaluate_anonymization", "extract_entities")
graph.add_edge("extract_entities", "map_to_umls_concepts")
graph.add_edge("map_to_umls_concepts", "map_to_local_aui")
graph.add_edge("map_to_local_aui", "build_knowledge_graph")
graph.add_edge("build_knowledge_graph", "visualize_knowledge_graph")
graph.add_edge("visualize_knowledge_graph", END)

app = graph.compile()
"""

In [1]:
! pip install faker transformers torch sacremoses langgraph
! pip install presidio-analyzer presidio-anonymizer
! pip install spacy
! pip install https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/v0.5.3/en_ner_bc5cdr_md-0.5.3.tar.gz

Collecting https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/v0.5.3/en_ner_bc5cdr_md-0.5.3.tar.gz
  Using cached https://s3-us-west-2.amazonaws.com/ai2-s2-scispacy/releases/v0.5.3/en_ner_bc5cdr_md-0.5.3.tar.gz (119.8 MB)
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone


In [2]:
#def generate_notes(state: WorkflowState) -> WorkflowState:
def generate_notes():
    from faker import Faker
    import os
    import random
    from datetime import datetime, timedelta

    fake = Faker()
    output_dir = "raw_clinical_notes"
    os.makedirs(output_dir, exist_ok=True)

    physical_symptoms = [
    "Developmental Delay", "Speech and Language Impairment", "Hypotonia", "Seizures",
    "Gastrointestinal Issues", "Sleep Disturbances", "Dysmorphia", "Lymphedema",
    "Renal Abnormalities", "Thermoregulation Issues", "Abnormalities of Nails"
]
    behavioral_symptoms = [
    "Autism Spectrum Disorder (ASD) Features", "Intellectual Disability", "Anxiety",
    "Aggression/Self-Injurious Behaviors", "Hyperactivity/Impulsivity",
    "Sensory Processing Differences", "Compulsive Behaviors", "Mood Dysregulation"
]
    prescriptions = [
    "Levetiracetam", "Valproic Acid", "Lamotrigine", "Fluoxetine", "Sertraline",
    "Risperidone", "Aripiprazole", "Melatonin", "Polyethylene glycol", "Omeprazole",
    "Ranitidine", "Methylphenidate"
]
    lab_tests = [
    "Chromosomal Microarray Analysis (CMA)", "FISH", "Karyotype", "EEG",
    "Metabolic Screens", "Renal Ultrasound", "GI Motility Studies"
]
    therapies = [
    "Early Intervention Programs", "Speech and Language Therapy (SLT)",
    "Occupational Therapy (OT)", "Physical Therapy (PT)", "Applied Behavior Analysis (ABA) Therapy",
    "Feeding Therapy", "Behavioral Therapy/Parent Training", "Surgical Interventions"
]
    comorbidities = [
    "Autism Spectrum Disorder (ASD)", "Epilepsy/Seizure Disorder", "Gastrointestinal Disorders",
    "Sleep Disorders", "Anxiety Disorders", "ADHD", "Obesity", "Lymphedema",
    "Renal Anomalies", "Immunodeficiency"
]

    for patient_id in range(1, 101):
        num_visits = random.randint(8, 10)
        base_date = datetime.today()
        for visit in range(num_visits):
            visit_date = base_date - timedelta(days=random.randint(0, 365*2))
            note = f"""
            Patient: {fake.name()}
            DOB: {fake.date_of_birth()}
            Visit Date: {visit_date.strftime('%Y-%m-%d')}
            Diagnoses: Phelan-McDermid Syndrome
            Symptoms: {random.sample(physical_symptoms, 4)}
            Behavioral Symptoms: {random.sample(behavioral_symptoms, 2)}
            Labs: {random.sample(lab_tests, 2)}
            Prescriptions: {random.sample(prescriptions, 2)}
            Therapies: {random.sample(therapies, 2)}
            Comorbidities: {random.sample(comorbidities, 2)}
            """
            fname = f"clinical_note_{visit}_for_patient_{patient_id}_{visit_date.strftime('%Y-%m-%d')}.txt"
            with open(os.path.join(output_dir, fname), "w") as f:
                f.write(note)

#    state["raw_notes_folder"] = output_dir
#    return state


In [3]:
generate_notes()

In [5]:
import os
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine

#def anonymize_notes(state: dict) -> dict:
def anonymize_notes():
    #raw_dir = state["raw_notes_folder"]
    raw_dir = os.path.join(os.getcwd(), "raw_clinical_notes")
    anon_dir = "anonymized_clinical_notes"
    os.makedirs(anon_dir, exist_ok=True)

    analyzer = AnalyzerEngine()
    anonymizer = AnonymizerEngine()

    for fname in os.listdir(raw_dir):
        if not fname.endswith(".txt"):
            continue
        with open(os.path.join(raw_dir, fname), "r") as f:
            text = f.read()

        # Run PHI detection
        results = analyzer.analyze(text=text, entities=None, language="en")

        # Anonymize detected PHI entities
        anonymized_text = anonymizer.anonymize(text=text, analyzer_results=results).text

        # Write to new anonymized file
        new_fname = f"anonymized_{fname}"
        with open(os.path.join(anon_dir, new_fname), "w") as out_f:
            out_f.write(anonymized_text)

#    state["anonymized_folder"] = anon_dir
#    return state


In [6]:
anonymize_notes()



[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_lg')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.




In [8]:
import os
import shutil
from collections import defaultdict
from typing import List
from presidio_analyzer import AnalyzerEngine, RecognizerResult

def extract_phi(file_path: str, analyzer: AnalyzerEngine) -> List[RecognizerResult]:
    with open(file_path, "r") as f:
        text = f.read()
    return analyzer.analyze(text=text, entities=None, language="en")

def quarantileFile(source_file, quarantine_dir):
  try:
    shutil.move(source_file, quarantine_dir)
    #os.rename(source_file, destination_file)
  except FileNotFoundError:
    print(f"Source file '{source_file}' not found.")
  except FileExistsError:
    print(f"Destination file '{destination_file}' already exists.")
  except OSError as e:
    print(f"Error moving file: {e}")


#def evaluate_anonymization(state: dict) -> dict:
def evaluate_anonymization():
#    raw_dir = state["raw_notes_folder"]
#    anon_dir = state["anonymized_folder"]
    raw_dir = os.path.join(os.getcwd(), "raw_clinical_notes")
    anon_dir = os.path.join(os.getcwd(), "anonymized_clinical_notes")
    analyzer = AnalyzerEngine()

    phi_counts_raw = 0
    phi_counts_anon = 0

    false_negatives = defaultdict(list)  # {file: [(entity_type, matched_text)]}
    fileCount = len(os.listdir(raw_dir))

    print(f"Evaluating the effectiveness of the anonymizer in removing PHI across {fileCount} files")

    for fname in os.listdir(raw_dir):
        if not fname.endswith(".txt"):
            continue

        raw_path = os.path.join(raw_dir, fname)
        anon_path = os.path.join(anon_dir, f"anonymized_{fname}")

        raw_entities = extract_phi(raw_path, analyzer)
        anon_entities = extract_phi(anon_path, analyzer)

        phi_counts_raw += len(raw_entities)
        phi_counts_anon += len(anon_entities)

        for ent in anon_entities:
            # Changed ent.text to ent.original_text to access the matched text
            false_negatives[fname].append((ent.entity_type, ent.contains, ent.contained_in))


    # Compute effectiveness metric
    if phi_counts_raw == 0:
        effectiveness = 100.0 if phi_counts_anon == 0 else 0.0
    else:
        effectiveness = (1 - (phi_counts_anon / phi_counts_raw)) * 100

    # Print metric summary
    print(f"\n🔍 Anonymization Effectiveness: {effectiveness:.2f}%")
    print(f"Total PHI elements (raw): {phi_counts_raw}")
    print(f"Remaining PHI elements (anonymized): {phi_counts_anon}\n")

    # Print false negatives summary
    if false_negatives:
        quarantine_dir = os.path.join(os.getcwd(), "quarantined_clinical_notes")
        os.makedirs(quarantine_dir, exist_ok=True)
        print("🚨 False Negatives (Missed PHI Elements):")
        for fname, entities in false_negatives.items():
            print(f"  File: {fname}")
            for matched_text in entities:
                anon_fname = f"anonymized_{fname}"
                source_file = os.path.join(anon_dir, "" + anon_fname)
                destination_file = os.path.join(quarantine_dir, anon_fname)
                print(f"    -  {matched_text[1]}")
                print(f"Removing file: {anon_fname} from anonymized directory as it contains PHI elements")
                quarantileFile(source_file, quarantine_dir)
        print("All files containing PHI elements have been moved to quarantine")
        print(f"There are now {len(os.listdir(anon_dir))} files in the anonymized folder")

    else:
        print("✅ No false negatives detected. All PHI was successfully removed.")

#    state["comparison_metric"] = effectiveness
#    return state


In [9]:
evaluate_anonymization()



Evaluating the effectiveness of the anonymizer in removing PHI across 907 files

🔍 Anonymization Effectiveness: 99.78%
Total PHI elements (raw): 5953
Remaining PHI elements (anonymized): 13

🚨 False Negatives (Missed PHI Elements):
  File: clinical_note_4_for_patient_92_2024-12-31.txt
    -  <bound method RecognizerResult.contains of type: NRP, start: 22, end: 31, score: 0.85>
Removing file: anonymized_clinical_note_4_for_patient_92_2024-12-31.txt from anonymized directory as it contains PHI elements
  File: clinical_note_4_for_patient_73_2025-03-04.txt
    -  <bound method RecognizerResult.contains of type: LOCATION, start: 31, end: 49, score: 0.85>
Removing file: anonymized_clinical_note_4_for_patient_73_2025-03-04.txt from anonymized directory as it contains PHI elements
  File: clinical_note_4_for_patient_70_2024-11-26.txt
    -  <bound method RecognizerResult.contains of type: PERSON, start: 380, end: 391, score: 0.85>
Removing file: anonymized_clinical_note_4_for_patient_70_2024-

In [3]:
import os
import csv
import uuid
import spacy

#def extract_entities(state: dict) -> dict:
def extract_entities():
#    anonymized_dir = state["anonymized_folder"]
    anonymized_dir = "anonymized_clinical_notes"
    output_csv = "extracted_entity_triples.csv"

    # Load scispaCy model
    nlp = spacy.load("en_ner_bc5cdr_md")

    # Define mapping from keyword match to relation
    verb_mapping = {
        "lab": ["EEG", "CMA", "Karyotype", "FISH", "Metabolic", "Ultrasound", "GI"],
        "diagnosis": ["syndrome", "diagnosis", "disorder"],
        "comorbidity": ["ASD", "anxiety", "obesity", "sleep", "ADHD", "epilepsy", "renal"],
        "procedure": ["surgery", "microarray", "biopsy"],
        "therapy": ["therapy", "intervention", "training"],
        "symptom": ["delay", "impairment", "hypotonia", "seizure", "disturbance", "abnormality"],
        "behavior": ["aggression", "mood", "anxiety", "compulsive", "impulsivity", "sensory"],
        "prescription": ["ine", "acid", "ole", "zol", "phenidate", "melatonin"]
    }

    def classify_relation(entity_text: str):
        text_lower = entity_text.lower()
        for relation, keywords in verb_mapping.items():
            for kw in keywords:
                if kw.lower() in text_lower:
                    return relation
        return "misc"

    rows = []

    for fname in os.listdir(anonymized_dir):
        if not fname.endswith(".txt"):
            continue

        patient_id = str(uuid.uuid4())  # Generate unique patient ID
        file_path = os.path.join(os.getcwd(), anonymized_dir, fname)
        with open(file_path, "r") as f:
            text = f.read()

        # Extract visit date from file name
        try:
            visit_date = fname.split("_")[-1].replace(".txt", "")
        except:
            visit_date = "unknown"

        # Run NER
        doc = nlp(text)
        for ent in doc.ents:
            verb = classify_relation(ent.text)
            if verb == "lab":
                rows.append([patient_id, verb, ent.text, visit_date])
                rows.append([patient_id, "result", f"{ent.text} result pending", visit_date])
            else:
                rows.append([patient_id, verb, ent.text, visit_date])

    # Write triples to CSV
    with open(output_csv, "w", newline="") as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(["patient_id", "relation", "entity", "date"])
        writer.writerows(rows)

#    state["extracted_csv"] = output_csv
#    return state


In [4]:
extract_entities()

  deserializers["tokenizer"] = lambda p: self.tokenizer.from_disk(  # type: ignore[union-attr]
