# Encounters block that works

In [2]:
import duckdb
import pandas as pd
from pathlib import Path
import orjson
import multiprocessing
import os
from tqdm import tqdm

# --- 1. SETUP ---
DATA_DIR = Path("../data")
FHIR_DIR = DATA_DIR / "fhir"
OUTPUT_DIR = Path("../output")
OUTPUT_DIR.mkdir(exist_ok=True) 
DB_FILE = OUTPUT_DIR / "hospital_readmission.duckdb"

# --- Helper Functions ---
def get_clean_id(ref_dict: dict) -> str | None:
    """Extracts and robustly cleans the ID from a raw FHIR reference dictionary."""
    if not (ref_dict and isinstance(ref_dict, dict) and 'reference' in ref_dict):
        return None
    ref_string = ref_dict['reference']
    last_delim_pos = max(ref_string.rfind('/'), ref_string.rfind(':'), ref_string.rfind('|'))
    if last_delim_pos != -1:
        return ref_string[last_delim_pos + 1:]
    return ref_string

def get_extension_value_from_dict(resource_dict: dict, url: str) -> float | None:
    """Finds a specific extension by URL from a raw resource dictionary."""
    extensions = resource_dict.get('extension', [])
    if not extensions:
        return None
    for ext in extensions:
        if ext.get('url') == url:
            value_money = ext.get('valueMoney')
            if value_money and 'value' in value_money:
                return value_money['value']
    return None

# --- WORKER FUNCTION FOR MULTIPROCESSING ---
def process_file(file_path: Path) -> list:
    """Processes a single FHIR JSON file and returns a list of encounter records."""
    local_encounter_list = []
    try:
        with open(file_path, "rb") as f:
            bundle_dict = orjson.loads(f.read())
        if 'entry' in bundle_dict and bundle_dict['entry']:
            for entry in bundle_dict['entry']:
                resource = entry.get('resource', {})
                if resource.get('resourceType') == 'Encounter':
                    participants = resource.get('participant', [])
                    provider_ref = participants[0].get('individual') if participants else None
                    encounter_types = resource.get('type', [])
                    primary_type = encounter_types[0] if encounter_types else {}
                    codings = primary_type.get('coding', [])
                    primary_coding = codings[0] if codings else {}
                    reason_codes = resource.get('reasonCode', [])
                    primary_reason = reason_codes[0] if reason_codes else {}
                    reason_codings = primary_reason.get('coding', [])
                    primary_reason_coding = reason_codings[0] if reason_codings else {}
                    
                    encounter_record = {
                        "Id": resource.get('id'),
                        "Start": resource.get('period', {}).get('start'),
                        "Stop": resource.get('period', {}).get('end'),
                        "Patient": get_clean_id(resource.get('subject')),
                        "Organization": get_clean_id(resource.get('serviceProvider')),
                        "Provider": get_clean_id(provider_ref),
                        "Payer": get_clean_id(resource.get('account', [{}])[0]),
                        "EncounterClass": resource.get('class', {}).get('code'),
                        "Code": primary_coding.get('code'),
                        "Description": primary_coding.get('display'),
                        "Base_Encounter_Cost": get_extension_value_from_dict(resource, "http://synthetichealth.github.io/synthea/financial-information#base-encounter-cost"),
                        "Total_Claim_Cost": get_extension_value_from_dict(resource, "http://synthetichealth.github.io/synthea/financial-information#total-claim-cost"),
                        "Payer_Coverage": get_extension_value_from_dict(resource, "http://synthetichealth.github.io/synthea/financial-information#payer-coverage"),
                        "ReasonCode": primary_reason_coding.get('code'),
                        "ReasonDescription": primary_reason_coding.get('display'),
                    }
                    local_encounter_list.append(encounter_record)
    except Exception:
        pass
    return local_encounter_list

# --- SCRIPT EXECUTION ---
if __name__ == '__main__':
    # --- 2. INITIALIZE DATABASE ---
    print(f"Connecting to DuckDB database: {DB_FILE}")
    con = duckdb.connect(str(DB_FILE), read_only=False)
    con.execute("DROP TABLE IF EXISTS encounters;")
    print("Creating 'encounters' table...")
    
    # --- SCHEMA FIX: Changed ID columns from UUID to VARCHAR for flexibility ---
    con.execute("""
    CREATE TABLE encounters (
        Id VARCHAR PRIMARY KEY,
        Start TIMESTAMP,
        Stop TIMESTAMP,
        Patient VARCHAR,
        Organization VARCHAR,
        Provider VARCHAR,
        Payer VARCHAR,
        EncounterClass VARCHAR,
        Code VARCHAR,
        Description VARCHAR,
        Base_Encounter_Cost FLOAT,
        Total_Claim_Cost FLOAT,
        Payer_Coverage FLOAT,
        ReasonCode VARCHAR,
        ReasonDescription VARCHAR
    );
    """)
    # --- END OF FIX ---

    # --- 3. PROCESS FHIR BUNDLES IN PARALLEL ---
    # --- For a quick test, limit the number of files ---
    MAX_FILES_TO_PROCESS = 1000
    fhir_files = list(FHIR_DIR.rglob("*.json"))[:MAX_FILES_TO_PROCESS]
    print(f"Found {len(fhir_files)} FHIR JSON bundles to process (limited for testing).")
    
    num_processes = os.cpu_count()
    print(f"Starting parallel processing with {num_processes} workers...")
    
    with multiprocessing.Pool(processes=num_processes) as pool:
        results = list(tqdm(pool.imap_unordered(process_file, fhir_files), total=len(fhir_files)))
    
    all_encounters_data = [item for sublist in results for item in sublist]

    # --- 5. INSERT DATA INTO DUCKDB ---
    if all_encounters_data:
        print(f"\nSuccessfully parsed {len(all_encounters_data)} encounters.")
        print("Converting to Pandas DataFrame and inserting into DuckDB...")
        df_encounters = pd.DataFrame(all_encounters_data)
        
        con.execute("INSERT INTO encounters SELECT * FROM df_encounters")
        print("✅ Data insertion complete.")
    else:
        print("\nNo valid encounter data could be parsed from the files.")

    # --- 6. VERIFY AND CLOSE ---
    print("\nVerifying the data...")
    result_count = con.execute("SELECT COUNT(*) FROM encounters;").fetchone()[0]
    print(f"Total rows in 'encounters' table: {result_count}")
    print("\nSample of 5 records from the table:")
    sample_df = con.execute("SELECT * FROM encounters LIMIT 5;").fetchdf()
    display(sample_df)
    con.close()
    print("\nDatabase connection closed.")

Connecting to DuckDB database: ../output/hospital_readmission.duckdb
Creating 'encounters' table...
Found 1000 FHIR JSON bundles to process (limited for testing).
Starting parallel processing with 32 workers...


100%|██████████| 1000/1000 [00:07<00:00, 129.02it/s]



Successfully parsed 52926 encounters.
Converting to Pandas DataFrame and inserting into DuckDB...
✅ Data insertion complete.

Verifying the data...
Total rows in 'encounters' table: 52926

Sample of 5 records from the table:


Unnamed: 0,Id,Start,Stop,Patient,Organization,Provider,Payer,EncounterClass,Code,Description,Base_Encounter_Cost,Total_Claim_Cost,Payer_Coverage,ReasonCode,ReasonDescription
0,bb0765f1-acb7-5022-d9ea-dd097eedca75,2025-07-19 07:09:37,2025-07-19 07:24:37,295e0a8b-359e-9096-9a39-265d03df2cac,6be86010-1af1-30d3-bf1d-8317e71604e6,9999748590,,AMB,410620009,Well child visit (procedure),,,,,
1,d47805b8-9644-4d9b-4cde-043d3f3878d7,2025-08-23 07:09:37,2025-08-23 07:24:37,295e0a8b-359e-9096-9a39-265d03df2cac,6be86010-1af1-30d3-bf1d-8317e71604e6,9999748590,,AMB,410620009,Well child visit (procedure),,,,,
2,b6c5d5f0-f187-e4a6-3723-1334ed5fc34d,1999-02-10 02:02:23,1999-02-10 02:17:23,7fc520c4-cb59-fa5d-c639-8accb3a4acb7,ea38c4bb-e483-31f9-96fc-53429f19e339,9999961193,,AMB,185347001,Encounter for problem (procedure),,,,3718001.0,Cow's milk (substance)
3,2b85dcbb-ce54-60e2-3864-64fe3e121236,1999-02-25 09:02:23,1999-02-25 09:37:14,7fc520c4-cb59-fa5d-c639-8accb3a4acb7,ea38c4bb-e483-31f9-96fc-53429f19e339,9999961193,,AMB,185347001,Encounter for problem (procedure),,,,609328004.0,Allergic disposition (finding)
4,a632ba07-251f-9905-5967-9ff262a8d41c,2012-05-25 10:02:23,2012-05-25 10:17:23,7fc520c4-cb59-fa5d-c639-8accb3a4acb7,ea38c4bb-e483-31f9-96fc-53429f19e339,9999961193,,AMB,185347001,Encounter for problem (procedure),,,,192127007.0,Child attention deficit disorder (disorder)



Database connection closed.
