In [None]:
import random
import fhirpathpy
import pyarrow as pa
import pyarrow.parquet as pq
import pathlib
import json
from pathlib import Path
from datetime import datetime
from azure.identity import DefaultAzureCredential
from phdi.fhir import fhir_server_get
from phdi.schemas import load_schema
from phdi.azure import AzureFhirServerCredentialManager

In [None]:
schema_path = Path("/path/to/schema.yaml")
base_output_path = Path("/path/to/output/directory")
output_format = "parquet"
fhir_url = "<FHIR URL>"
cred_manager = AzureFhirServerCredentialManager(fhir_url)

In [None]:
schema = load_schema(schema_path)

In [None]:
def extract_data_from_fhir_server(
    fhir_url,
    table_schema,
    cred_manager
):
    all_data = {}
    for resource_type in table_schema:
        url = f"{fhir_url}/{resource_type}?_count=1000"
        data = []
        
        while url is not None:
            access_token = cred_manager.get_access_token().token
            response = fhir_server_get(url, access_token)
            if response.status_code != 200:
                break
            
            query_result = response.json()
            resources = [elem.get("resource") for elem in query_result["entry"]]
            resources = list(filter(None, resources))
            data.extend(resources)
            
            for link in query_result.get("link"):
                if link.get("relation") == "next":
                    url = link.get("url", None)
                    break
                else:
                    url = None
        
        all_data[resource_type] = data
        
    return all_data

def make_table(data, table_schema):
    output = {}
    for resource_type in data.keys():
        for parameters in table_schema[resource_type].values():
            parser = fhirpathpy.compile(parameters.get("fhir_path"))
            values = [parser(resource)[0] if parser(resource) else None for resource in data[resource_type]]
            output[parameters["new_name"]] = values
    
    return pa.Table.from_pydict(output)

def make_table2(data, table_schema):
    output = []
    for resource_type in data.keys():
        parsers = __generate_parsers(table_schema[resource_type].values())
        for resource in data[resource_type]:
            values = extract_and_filter_resource_values(resource, table_schema[resource_type].values(), parsers)
            if values:
                output.append(values)
    
    return pa.Table.from_pylist(output)

def __generate_parsers(field_parameters):
    parsers = {}
    for parameters in field_parameters:
        parsers[parameters["new_name"]] = fhirpathpy.compile(parameters["fhir_path"])
    return parsers

def extract_and_filter_resource_values(resource, field_parameters, parsers):
    values = {}
    for parameters in field_parameters:
        parser = parsers[parameters["new_name"]] 
        value = parser(resource)
        
        if len(value) == 0:
            values[parameters["new_name"]] = None
        elif parameters["selection_criteria"] == "first":
            values[parameters["new_name"]] = value[0]
        elif parameters["selection_criteria"] == "last":
            values[parameters["new_name"]] = value[-1]
        
        if isinstance(value, dict):
            values[parameters["new_name"]] = json.dumps(value)
        elif isinstance(value, list):
            values[parameters["new_name"]] = ",".join(map(str, value))
        else:
            values[parameters["new_name"]] = value
    
    return values
        
def write_table(
    data,
    output_file_name,
    file_format
):
    writer = pq.ParquetWriter(output_file_name, data.schema)
    writer.write_table(table=data)
    writer.close()

In [None]:
all_data = {}

for table_name in schema.keys():
    if (table_name != "patient"):
        continue
    print(f"\nCreating the {table_name} table...")
    
    output_path = base_output_path / table_name
    output_path.mkdir(parents=True, exist_ok=True)
    output_file_name = output_path / f"{table_name}.{output_format}"
    
    start = datetime.now()
    data = extract_data_from_fhir_server(fhir_url, schema[table_name], cred_manager)
    extract_end = datetime.now()
    print(f"Time to extract {sum(map(len, data.values()))} resources: {extract_end - start}")
    
    table = make_table2(data, schema[table_name])
    print(f"Time to tabularize the data: {datetime.now() - extract_end}")
    
    all_data[table_name] = table
    write_table(table, output_file_name, output_format)
    print(f"Total time spent generating the {table_name} table: {(datetime.now() - start)}")

In [None]:
for table_name in schema.keys():
    output_path = base_output_path / table_name
    output_path.mkdir(parents=True, exists_ok=True)
    output_file_name = output_path / f"{table_name}.{output_format}"
    
    dtype_map = {
        "string": pa.string(),
        "timestamp": pa.timestamp(),
        "float64": pa.float64()
    }
    
    for resource, fields in schema[table_name].items():
        parameters = fields.values()
        values = [(param["new_name"], dtype_map[param["dtype"]]) for param in parameters]
            
    table_schema = pa.schema(values)
    
    start = datetime.now()
    with pa.OSFile(output_file_name, 'wb') as sink:
        with pa.ipc.new_file(sink, schema) as writer:
            while url is not None:
                data, url = 
                batch = pa.record_batch([pa.array(range(BATCH_SIZE), type=pa.int32())], table_schema)
                writer.write(batch)