# Project: Predicting Diabetes Onset FHIR EHR Data
### Notebook: 01_Data_Pull.ipynb
### Purpose: Pull patients, observations, encounters, and conditions from a Synthea generated FHIR resources

In [None]:
# imports
import pandas as pd
import numpy as np
import json
from pathlib import Path
from typing import List,Dict,Tuple,Optional
import os

In [None]:
# Extract FHIR files and confirm number of files extracted
fhir_path = Path("/Users/sanasiddiqui/synthea/output/fhir")
files = list(fhir_path.glob("*.json"))

len(files), files[:2]

In [None]:
# Inspect one patient's files to ensure full data extraction per patient
f = files[0]
with open(f) as fh:
    data = json.load(fh)

types = {}
for e in data.get("entry", []):
    rt = e["resource"]["resourceType"]
    types[rt] = types.get(rt, 0) + 1
types

In [None]:
# Function to parse FHIR bundles
def parse_fhir_bundles(
    fhir_dir: str, 
    max_files: Optional[int] = None
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Parse FHIR JSON bundles from a directory into DataFrames for patients,
    observations, encounters, and conditions.

    Parameters:
        fhir_dir (str): Path to FHIR JSON files.
        max_files (int, optional): Maximum number of files to parse.

    Returns:
        df_patients
        df_observations
        df_encounters
        df_conditions
    """
    fhir_path = Path(fhir_dir)
    files = list(fhir_path.glob("*.json"))
    
    if max_files:
        files = files[:max_files]

    patients = []
    observations = []
    encounters = []
    conditions = []

    for f in files:
        with open(f) as fh:
            bundle = json.load(fh)
        
        for entry in bundle.get("entry", []):
            res = entry["resource"]
            rt = res.get("resourceType")
            
            if rt == "Patient":
                patients.append({
                    "patient_id": res.get("id"),
                    "gender": res.get("gender"),
                    "birth_date": res.get("birthDate")
                })
            
            elif rt == "Observation":
                subj = res.get("subject", {}).get("reference", "")
                patient_id = subj.replace("Patient/", "")
                coding = res.get("code", {}).get("coding", [{}])[0]
                valq = res.get("valueQuantity", {})

                observations.append({
                    "patient_id": patient_id,
                    "code": coding.get("code"),
                    "name": coding.get("display"),
                    "value": valq.get("value"),
                    "unit": valq.get("unit"),
                    "time": res.get("effectiveDateTime")
                })
            
            elif rt == "Encounter":
                subj = res.get("subject", {}).get("reference", "")
                patient_id = subj.replace("Patient/", "")
                period = res.get("period", {})

                encounters.append({
                    "patient_id": patient_id,
                    "encounter_id": res.get("id"),
                    "start": period.get("start"),
                    "end": period.get("end"),
                    "class": res.get("class", {}).get("code")
                })

            elif rt == "Condition":
                subj = res.get("subject", {}).get("reference", "")
                patient_id = subj.replace("Patient/", "")
                coding = res.get("code", {}).get("coding", [{}])[0]

                conditions.append({
                    "patient_id": patient_id,
                    "condition_id": res.get("id"),
                    "code": coding.get("code"),
                    "name": coding.get("display"),
                    "onset": res.get("onsetDateTime"),
                    "recorded_date": res.get("recordedDate"),
                    "clinical_status": res.get("clinicalStatus", {})
                                         .get("coding", [{}])[0]
                                         .get("code")
                })

    df_patients = pd.DataFrame(patients)
    df_observations = pd.DataFrame(observations)
    df_encounters = pd.DataFrame(encounters)
    df_conditions = pd.DataFrame(conditions)
    
    return df_patients, df_observations, df_encounters, df_conditions

In [None]:
# Extract FHIR data

dfs = parse_fhir_bundles(fhir_path)

In [None]:
# Function to save raw data extracted from FHIR bundles

def save_raw_data(path: str, dfs: tuple(),file_names: List[str]):
    """
    Save raw data parsed into tuple object with multiple data frames from FHIR
    
    Parameters: 
        path: file path to save data to
        dfs: tuple object containing multiple dataframes,
        file_names: name of each individual file to save,
            must be in same order as its matching dataframe in dfs object
            for correct naming
    """
    
    if len(file_names) != len(dfs):
        return("number of file_names and number of files are not equal")

    for i in range(len(dfs)):
        dfs[i].to_csv(f"{path}/{file_names[i]}_Raw.csv", index=False)

In [None]:
# Save raw data files

file_names = ['Patients', 'Observations', 'Encounters', 'Conditions']
save_raw_data('/Users/sanasiddiqui/Desktop/Current Desktop/Data Portfolio Projects/Diabetes_Onset_Prediction_FHIR/Data_Diabetes_Prediction/Raw',
              dfs,
              file_names)