Skip to content

GoogleCloudPlatform/healthcare-data-harmonization-dataflow

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

HL7v2 to FHIR Pipeline

This directory contains a reference Cloud Dataflow pipeline to convert HL7v2 messages to FHIR resources. Please note that additional configurations and hardening are required before processing PHI data with this pipeline.

Prerequisites

Permissions

Make sure you have enough permissions to run Cloud Dataflow jobs.

The Cloud Dataflow Controller Service Account needs the following permissions.

  • roles/pubsub.subscriber.
    • To listen for PubSub notifications from new messages. The service account only needs the role on the specific PubSub subscription.
  • roles/healthcare.hl7V2Consumer.
    • To access messages in your HL7v2 store. The service account only needs the role on the source HL7v2 Store.
  • roles/healthcare.fhirResourceEditor.
    • To write transformed resources to your FHIR store. The service account only needs this role on the target FHIR Store.
  • roles/storage.objectAdmin.
    • To access mapping and harmonization configurations on GCS. The service account needs this role on all GCS buckets that the mappings reside in.

How to Run

Build a fat JAR of the pipeline by running the following from the project directory.

  • Please make sure gradle is added to PATH before running the following commands.
# Generate wrapper classes.
gradle wrapper --gradle-version 7.6
./build_deps.sh && ./gradlew shadowJar

A JAR file should be generated in build/libs folder.

Now run the pipeline with the following command:

# Please set the environment variables in the following command.

java -jar build/libs/converter-0.1.0-all.jar --pubSubSubscription="projects/${PROJECT?}/subscriptions/${SUBSCRIPTION?}" \
                                             --readErrorPath="gs://${ERROR_BUCKET?}/read/" \
                                             --writeErrorPath="gs://${ERROR_BUCKET?}/write/" \
                                             --mappingErrorPath="gs://${ERROR_BUCKET?}/mapping/" \
                                             --mappingPath="gs://${MAPPING_BUCKET?}/hl7v2_fhir.wstl" \
                                             --fhirStore="projects/${PROJECT?}/locations/${LOCATION?}/datasets/${DATASET?}/fhirStores/${FHIRSTORE?}" \
                                             --importRoot="${MAPPING_ROOT_FOLDER}" \
                                             --runner=DataflowRunner \
                                             --region=${REGION?} \
                                             --project=${PROJECT?}

A few notes:

  • By default, streaming pipelines do not have autoscaling enabled, please use either --enableStreamingEngine (recommended) or a combination of --autoscalingAlgorithm=THROUGHPUT_BASED and --maxNumWorkers=N to manually enable it. See this page for more details.
  • For production use, we recommend enabling agent metrics by appending --experiments=enable_stackdriver_agent_metrics as an option (you will need to grant roles/monitoring.metricWriter to Dataflow controller service account as well), see this page for more details. Additionally, we highly recommend limiting the number of threads on each worker, e.g. --numberOfWorkerHarnessThreads=10. You can tune the limit based on your workload.
  • To generate a template instead of running the pipeline, add --stagingLocation=gs://${STAGING_LOCATION} --templateLocation=gs://${TEMPLATE_LOCATION} to the above command. See here

Please take a look at the PipelineRunner class to see the concrete meaning of each argument.

You should be able to verify that a Dataflow pipeline is running from the cloud console UI. Data should start flowing through the pipeline and arrive at the FHIR Store, use the SearchResources API to verify that FHIR Resources are written correctly.

DICOM to FHIR Pipeline

This directory contains a reference Cloud Dataflow pipeline to convert a DICOM Study to a FHIR ImagingStudy resource.

Prerequisites

Permissions

Make sure you have enough permissions to run Cloud Dataflow jobs.

The Cloud Dataflow Controller Service Account needs the following permissions.

  • roles/pubsub.subscriber.
    • To listen for PubSub notifications from new messages. The service account only needs the role on the specific PubSub subscription.
  • roles/healthcare.dicomEditor.
    • To access metadata of DICOM stores.
  • roles/healthcare.fhirResourceEditor.
    • To write transformed resources to your FHIR store. The service account only needs this role on the target FHIR Store.
  • roles/storage.objectAdmin.
    • To access mapping and harmonization configurations on GCS. The service account needs this role on all GCS buckets that the mappings reside in.

How to Run

Build a fat JAR of the pipeline by running the following from the project directory.

  • Please make sure gradle is added to PATH before running the following commands.
# Generate wrapper classes.
gradle wrapper
./build_deps.sh && ./gradlew shadowJar -PmainClass=com.google.cloud.healthcare.etl.runner.dicomtofhir.DicomToFhirStreamingRunner

A JAR file should be generated in build/libs folder.

Now run the pipeline with the following command:

# Please set the environment variables in the following command.

java -jar build/libs/converter-0.1.0-all.jar --pubSubSubscription="projects/${PROJECT?}/subscriptions/${SUBSCRIPTION?}" \
                                             --readErrorPath="gs://${ERROR_BUCKET?}/read/" \
                                             --writeErrorPath="gs://${ERROR_BUCKET?}/write/" \
                                             --mappingErrorPath="gs://${ERROR_BUCKET?}/mapping/" \
                                             --mappingPath="gs://${MAPPING_BUCKET?}/main.textproto" \
                                             --fhirStore="projects/${PROJECT?}/locations/${LOCATION}/datasets/${DATASET?}/fhirStores/${FHIRSTORE?}" \
                                             --runner=DataflowRunner \
                                             --region=${REGION?} \
                                             --project=${PROJECT?}

A few notes:

  • By default, streaming pipelines do not have autoscaling enabled, please use either --enableStreamingEngine (recommended) or a combination of --autoscalingAlgorithm=THROUGHPUT_BASED and --maxNumWorkers=N to manually enable it. See this page for more details.
  • For production use, we recommend enabling agent metrics by appending --experiments=enable_stackdriver_agent_metrics as an option (you will need to grant roles/monitoring.metricWriter to Dataflow controller service account as well), see this page for more details. Additionally, we highly recommend limiting the number of threads on each worker, e.g. --numberOfWorkerHarnessThreads=10. You can tune the limit based on your workload.
  • To generate a template instead of running the pipeline, add --stagingLocation=gs://${STAGING_LOCATION} --templateLocation=gs://${TEMPLATE_LOCATION} to the above command. See here
  • The mappingPath file (main.textproto) configures the mapping library. Ensure that the paths inside the file exist (References the following repository: https://github.com/GoogleCloudPlatform/healthcare-data-harmonization/). The required binaries should be installed by the build JAR command. There is a sample main.textproto at src/main/java/com/google/cloud/healthcare/etl/runner/dicomtofhir/main.textproto, if specifying GCS (non-local) paths use gcs_location: instead of local_path:.
  • As the mappings do not assign an ID to the mapped FHIR resource, each input creates a new output in the FHIR store. TODO: evaluate maintaining an ID for DICOM Instances.

Please take a look at the PipelineRunner class to see the concrete meaning of each argument.

You should be able to verify that a Dataflow pipeline is running from the cloud console UI. Data should start flowing through the pipeline and arrive at the FHIR Store, use the SearchResources API to verify that FHIR Resources are written correctly.

Custom to FHIR Pipeline

This directory contains a reference Cloud Dataflow pipeline to convert custom/non standard messages to FHIR resources. Please note that additional configurations and hardening are required before processing PHI data with this pipeline.

Prerequisites

Permissions

Make sure you have enough permissions to run Cloud Dataflow jobs.

The Cloud Dataflow Controller Service Account needs the following permissions.

  • roles/pubsub.subscriber.
    • To listen for PubSub notifications from new messages. The service account only needs the role on the specific PubSub subscription.
  • roles/healthcare.fhirResourceEditor.
    • To write transformed resources to your FHIR store. The service account only needs this role on the target FHIR Store.
  • roles/storage.objectAdmin.
    • To access mapping and harmonization configurations on GCS. The service account needs this role on all GCS buckets that the mappings reside in.
  • roles/pubsub.viewer.
    • To access the PubSub subscription.
  • roles/dataflow.worker.
    • To execute the Dataflow job.

How to Run

Build a fat JAR of the pipeline by running the following from the project directory.

  • Please make sure gradle is added to PATH before running the following commands.
# Generate wrapper classes.
./build_deps.sh && gradle wrapper --gradle-version 7.6
./gradlew shadowJar

A JAR file should be generated in build/libs folder.

Now run the pipeline with the following command:

  • Edit build.gradle and make the change to ensure the mainClassName is set as thus

shadowJar { mainClassName = project.findProperty('mainClass') ?: 'com.google.cloud.healthcare.etl.runner.customtofhir.CustomToFhirStreamingRunner' dependsOn('buildDeps') }

  • (Optional) Edit the build.gradle Depending on the java environment you might need the change as well for the code to build.

// sourceCompatibility = 11 sourceCompatibility = 1.8

# Please set the environment variables in the following command.

java -jar build/libs/converter-0.1.0-all.jar --pubSubSubscription="projects/${PROJECT?}/subscriptions/${SUBSCRIPTION?}" \
                                             --readErrorPath="gs://${ERROR_BUCKET?}/read/" \
                                             --writeErrorPath="gs://${ERROR_BUCKET?}/write/" \
                                             --mappingErrorPath="gs://${ERROR_BUCKET?}/mapping/" \
                                             --mappingPath="gs://${MAPPING_BUCKET?}/mapping.textproto" \
                                             --fhirStore="projects/${PROJECT?}/locations/${LOCATION?}/datasets/${DATASET?}/fhirStores/${FHIRSTORE?}" \
                                             --runner=DataflowRunner \
                                             --region=${REGION?} \
                                             --project=${PROJECT?} \
                                             --serviceAccount=dataflow-0222@smede-276406.iam.gserviceaccount.com

A few notes:

  • By default, streaming pipelines do not have autoscaling enabled, please use either --enableStreamingEngine (recommended) or a combination of --autoscalingAlgorithm=THROUGHPUT_BASED and --maxNumWorkers=N to manually enable it. See this page for more details.
  • For production use, we recommend enabling agent metrics by appending --experiments=enable_stackdriver_agent_metrics as an option (you will need to grant roles/monitoring.metricWriter to Dataflow controller service account as well), see this page for more details. Additionally, we highly recommend limiting the number of threads on each worker, e.g. --numberOfWorkerHarnessThreads=10. You can tune the limit based on your workload.
  • To generate a template instead of running the pipeline, add --stagingLocation=gs://${STAGING_LOCATION} --templateLocation=gs://${TEMPLATE_LOCATION} to the above command. See here

Please take a look at the PipelineRunner class to see the concrete meaning of each argument.

You should be able to verify that a Dataflow pipeline is running from the cloud console UI. Data should start flowing through the pipeline and arrive at the FHIR Store, use the SearchResources API to verify that FHIR Resources are written correctly.

Support

Please file GitHub issues if you encounter any problems.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published