<img src="https://raw.githubusercontent.com/Databricks-BR/health/main/image/head_notebook.png" width="1000px">

## Acelerando a interoperabilidade com Databricks

<img src="https://raw.githubusercontent.com/QuentinAmbard/databricks-demo/main/hls/resources/dbinterop/hls-dbiginte-flow-0.png" width="1000px">

##### Descrição e Objetivos

Esta demonstração é uma versão resumida da solução **dbignite** para **integração de dados em formato FHIR**. O objetivo é desbloquear os bundles FHIR para análises, permitindo uma visão completa do paciente. Com a plataforma Databricks, é possível:

* Automatizar a ingestão;
* Atualizar dados incrementalmente;
* Desnormalizar bundles para consultas interativas;
* Preparar dados para análise e IA em escala.

Para ver a solução completa, entre [aqui](https://databricks-industry-solutions.github.io/interop/#interop_1.html).

##### Referências:

* [Databricks - FHIR Solution Accelerator Site](https://www.databricks.com/solutions/accelerators/fhir)
* [Databricks Industry Solutions - Interoperability (Interop)](https://databricks-industry-solutions.github.io/interop/#interop_1.html)
* [GitHub - Databricks Industry Solutions - Interop](https://github.com/databricks-industry-solutions/interop)

##### Descrição do Cluster

Esta demo pode ser executada utilizando Serverless ou um tipo de instância de sua pereferência.

In [0]:
%pip install git+https://github.com/databrickslabs/dbignite.git

## Lendo dados FHIR (C-CDA Messages)

In [0]:
from  dbignite.fhir_mapping_model import FhirSchemaModel
from pyspark.sql.functions import *
from pyspark.sql.types import * 
import uuid
from dbignite.readers import read_from_directory

sample_data = "s3://hls-eng-data-public/data/synthea/fhir/fhir/*json"

# Leia dados de um diretório estático e analise-os usando a função entry()
bundle = read_from_directory(sample_data)
df = bundle.entry()

In [0]:
df.select(col("Patient")).printSchema()

## ETL usando Dataframe API
Trabalhando com dados de pacientes e gravando resultados em tabelas

Nota: Os dados sintéticos utilizam o sistema de codificação SNOMED. Na área da saúde, os códigos aceitos são ICD10 PCS, ICD CM, CPT4, HCPCS.

### Conditions

In [0]:
df.select(explode("Patient").alias("Patient"), col("bundleUUID"), col("Condition")).select(col("Patient"), col("bundleUUID"), explode("Condition").alias("Condition")).select(
  col("bundleUUID").alias("UNIQUE_FHIR_ID"), 
  col("patient.id").alias("Patient"),
  col("patient.birthDate").alias("Birth_date"),
  col("Condition.clinicalStatus.coding.code")[0].alias("clinical_status"),
  col("Condition.code.coding.code")[0].alias("condition_code"),
  col("Condition.code.coding.system")[0].alias("condition_type_code"), 
  col("Condition.code.text").alias("condition_description"),
  col("Condition.recordedDate").alias("condition_date") 
).filter(col("Patient").like("efee780e%") |  col("Patient").like("1a5e6090%")).display()
# Selecionando 2 pacientes aqui. No entanto, se este fosse o mesmo paciente em pacotes FHIR separados, você estaria trabalhando com uma linha por pacote FHIR. Então, 2 pacientes em 2 pacotes FHIR = 2 linhas

In [0]:
# Primeiro, crie um catalógo com o nome 'demo_health', caso ainda não exista, ou outro nome de preferência
# O comando a seguir cria o schema, caso ainda não exista
spark.sql("CREATE SCHEMA IF NOT EXISTS demo_health.fhir_integration")

In [0]:
df.select(explode("Patient").alias("Patient"), col("bundleUUID"), col("Condition")).select(col("Patient"), col("bundleUUID"), explode("Condition").alias("Condition")).select(
  col("bundleUUID").alias("UNIQUE_FHIR_ID"), 
  col("patient.id").alias("Patient"),
  col("patient.birthDate").alias("Birth_date"),
  col("Condition.clinicalStatus.coding.code")[0].alias("clinical_status"),
  col("Condition.code.coding.code")[0].alias("condition_code"),
  col("Condition.code.coding.system")[0].alias("condition_type_code"), 
  col("Condition.code.text").alias("condition_description"),
  col("Condition.recordedDate").alias("condition_date") 
).write.mode("overwrite").saveAsTable("demo_health.fhir_integration.patient_conditions")

### Claims

In [0]:

df.select(explode("Patient").alias("Patient"), col("bundleUUID"), col("Claim")).select(col("Patient"), col("bundleUUID"), explode("Claim").alias("Claim")).select(
  col("bundleUUID").alias("UNIQUE_FHIR_ID"), 
  col("patient.id").alias("Patient"),
  col("claim.patient").alias("claim_patient_id"),
  col("claim.id").alias("claim_id"),
  col("patient.birthDate").alias("Birth_date"),
  col("claim.type.coding.code")[0].alias("claim_type_cd"),
  col("claim.insurance.coverage")[0].alias("insurer"),
  col("claim.total.value").alias("claim_billed_amount"),
  col("claim.item.productOrService.coding.display").alias("prcdr_description"),
  col("claim.item.productOrService.coding.code").alias("prcdr_cd"),
  col("claim.item.productOrService.coding.system").alias("prcdr_coding_system")
).filter(col("Patient").like("efee780e%") |  col("Patient").like("1a5e6090%")).display()

In [0]:

df.select(explode("Patient").alias("Patient"), col("bundleUUID"), col("Claim")).select(col("Patient"), col("bundleUUID"), explode("Claim").alias("Claim")).select(
  col("bundleUUID").alias("UNIQUE_FHIR_ID"), 
  col("patient.id").alias("Patient"),
  col("claim.patient").alias("claim_patient_id"),
  col("claim.id").alias("claim_id"),
  col("patient.birthDate").alias("Birth_date"),
  col("claim.type.coding.code")[0].alias("claim_type_cd"),
  col("claim.insurance.coverage")[0].alias("insurer"),
  col("claim.total.value").alias("claim_billed_amount"),
  col("claim.item.productOrService.coding.display").alias("prcdr_description"),
  col("claim.item.productOrService.coding.code").alias("prcdr_cd"),
  col("claim.item.productOrService.coding.system").alias("prcdr_coding_system")
).write.mode("overwrite").saveAsTable("demo_health.fhir_integration.patient_claims")

## Medications

Nota: O conjunto de dados sintético não segue os padrões FHIR. Na próxima célula, estendemos nosso esquema para suportar essa estrutura não padrão.

In [0]:

med_schema = df.select(explode("MedicationRequest").alias("MedicationRequest")).schema
# Adicione o esquema medicationCodeableConcept
medCodeableConcept = StructField("medicationCodeableConcept", StructType([
              StructField("text",StringType()),
              StructField("coding", ArrayType(
                StructType([
                    StructField("code", StringType()),
                    StructField("display", StringType()),
                    StructField("system", StringType()),
                ])
              ))
    ]))

# Adicione StructField um nível abaixo de MedicationRequest
med_schema.fields[0].dataType.add(medCodeableConcept)

In [0]:
# Reconstrua o objeto de esquema com o esquema de Medicação atualizado
old_schemas = {k:v for (k,v) in FhirSchemaModel().fhir_resource_map.items() if k != 'MedicationRequest'}
new_schemas = {**old_schemas, **{'MedicationRequest': med_schema.fields[0].dataType} }

# Releia os dados
bundle = read_from_directory(sample_data)
df = bundle.entry(schemas = FhirSchemaModel(fhir_resource_map = new_schemas))

In [0]:
df.select(explode("Patient").alias("Patient"), col("bundleUUID"), col("MedicationRequest")).select(col("Patient"), col("bundleUUID"), explode(col("MedicationRequest")).alias("MedicationRequest")).select(
  col("bundleUUID").alias("UNIQUE_FHIR_ID"),
  col("patient.id").alias("Patient"),
  col("MedicationRequest.status"),
  col("MedicationRequest.intent"),
  col("MedicationRequest.authoredOn"),
  col("MedicationRequest.medicationCodeableConcept.text").alias("rx_text"),
  col("MedicationRequest.medicationCodeableConcept.coding.code")[0].alias("rx_code"),
  col("MedicationRequest.medicationCodeableConcept.coding.system")[0].alias("code_type")
).filter(col("Patient").like("efee780e%") |  col("Patient").like("1a5e6090%")).display()

In [0]:
df.select(explode("Patient").alias("Patient"), col("bundleUUID"), col("MedicationRequest")).select(col("Patient"), col("bundleUUID"), explode(col("MedicationRequest")).alias("MedicationRequest")).select(
  col("bundleUUID").alias("UNIQUE_FHIR_ID"), 
  col("patient.id").alias("Patient"),
  col("MedicationRequest.status"),
  col("MedicationRequest.intent"),
  col("MedicationRequest.authoredOn"),
  col("MedicationRequest.medicationCodeableConcept.text").alias("rx_text"),
  col("MedicationRequest.medicationCodeableConcept.coding.code")[0].alias("rx_code"),
  col("MedicationRequest.medicationCodeableConcept.coding.system")[0].alias("code_type")
).write.mode("overwrite").saveAsTable("demo_health.fhir_integration.medication_requests")

## Providers

In [0]:
# Nota: os provedores podem ser qualquer um dos (Profissional, Organização, Função do profissional)
# Para este exemplo, mostramos profissionais

df.select(col("bundleUUID"), col("Practitioner")).select(col("bundleUUID"), explode("Practitioner").alias("Practitioner")).select(
  col("bundleUUID").alias("UNIQUE_FHIR_ID"), 
  col("practitioner.active"),
  col("practitioner.gender"),
  col("practitioner.telecom.system")[0].alias("primary_contact_method"),
  col("practitioner.telecom.value")[0].alias("primary_contact_value"),
  col("practitioner.telecom.use")[0].alias("primary_use")
).display()

In [0]:
df.select(col("bundleUUID"), col("Practitioner")).select(col("bundleUUID"), explode("Practitioner").alias("Practitioner")).select(
  col("bundleUUID").alias("UNIQUE_FHIR_ID"), 
  col("practitioner.active"),
  col("practitioner.gender"),
  col("practitioner.telecom.system")[0].alias("primary_contact_method"),
  col("practitioner.telecom.value")[0].alias("primary_contact_value"),
  col("practitioner.telecom.use")[0].alias("primary_use")
).write.mode("overwrite").saveAsTable("demo_health.fhir_integration.providers_practitioners")

# ETL Using SQL 
Escreva FHIR como está na tabela e use SQL para manipular

In [0]:
spark.sql("DROP TABLE IF EXISTS demo_health.fhir_integration.Patient")
spark.sql("DROP TABLE IF EXISTS demo_health.fhir_integration.Condition")
spark.sql("DROP TABLE IF EXISTS demo_health.fhir_integration.Claim")
spark.sql("DROP TABLE IF EXISTS demo_health.fhir_integration.MedicationRequest")
spark.sql("DROP TABLE IF EXISTS demo_health.fhir_integration.Practitioner")

bundle.bulk_table_write(location="demo_health.fhir_integration" 
  ,write_mode="overwrite"
  ,columns=["Patient", "Condition", "Claim", "MedicationRequest", "Practitioner"])

## Conditions

In [0]:
%sql
select p.bundleUUID as UNIQUE_FHIR_ID, 
  p.Patient.id,
  p.patient.birthDate,
  c.Condition.clinicalStatus.coding.code[0] as clinical_status,
  c.Condition.code.coding.code[0] as condition_code, 
  c.Condition.code.coding.system[0] as condition_type_code, 
  c.Condition.code.text as condition_description,
  c.Condition.recordedDate condition_date
from (select bundleUUID, explode(Patient) as patient from demo_health.fhir_integration.patient) p --all patient information
  inner join (select bundleUUID, explode(condition) as condition from demo_health.fhir_integration.condition) c --all conditions from that patient 
    on p.bundleUUID = c.bundleUUID --Only show records that were bundled together 


## Claims

In [0]:
%sql
select p.bundleUUID as UNIQUE_FHIR_ID, 
  p.Patient.id as patient_id,
  p.patient.birthDate,
  c.claim.patient as claim_patient_id, 
  c.claim.id as claim_id,
  c.claim.type.coding.code[0] as claim_type_cd, --837I = Institutional, 837P = Professional
  c.claim.insurance.coverage[0],
  c.claim.total.value as claim_billed_amount,
  c.claim.item.productOrService.coding.display as procedure_description,
  c.claim.item.productOrService.coding.code as procedure_code,
  c.claim.item.productOrService.coding.system as procedure_coding_system
from (select bundleUUID, explode(Patient) as patient from demo_health.fhir_integration.patient) p --all patient information
  inner join (select bundleUUID, explode(claim) as claim from demo_health.fhir_integration.claim) c --all claim lines from that patient 
    on p.bundleUUID = c.bundleUUID --Only show records that were bundled together 
  limit 10

## Medications

In [0]:
%sql
select p.bundleUUID as UNIQUE_FHIR_ID, 
  p.Patient.id as patient_id,
  p.patient.birthDate,
  m.medication.intent,
  m.medication.status,
  m.medication.authoredOn as date_requested,
  m.medication.requester as rx_requester,
  --m.medication.medication --This is where medication should be, but looks like this isn't a compliant FHIR resource. 
                          --Upon further inspection the resource is located at the places below
  
  m.medication.medicationCodeableConcept.coding.code[0] as rx_code,
  m.medication.medicationCodeableConcept.coding.system[0] as rx_code_type,
  m.medication.medicationCodeableConcept.coding.display[0] as rx_description
  from (select bundleUUID, explode(Patient) as patient from demo_health.fhir_integration.patient) p --all patient information
  inner join (select bundleUUID, explode(MedicationRequest) as medication from demo_health.fhir_integration.MedicationRequest) m --all medication orders from that patient 
    on p.bundleUUID = m.bundleUUID --Only show records that were bundled together 
  limit 10

## Providers

In [0]:
%sql
select p.bundleUUID as UNIQUE_FHIR_ID,
  p.practitioner.id as provider_id,  --in this FHIR bundle, ID is the FK to other references in various resources (claim, careTeam, etc)
  p.practitioner.active,
  p.practitioner.gender,
  p.practitioner.telecom.system[0] as primary_contact_method,
  p.practitioner.telecom.value[0] as primary_contact_value,
  p.practitioner.telecom.use[0] as primary_use
from (select bundleUUID, explode(practitioner) as practitioner from demo_health.fhir_integration.Practitioner) as p
limit 10


In [0]:
%sql
select p.bundleUUID as UNIQUE_FHIR_ID,
  p.practitioner.id as provider_id,  --in this FHIR bundle, ID is the FK to other references in various resources (claim, careTeam, etc)
  p.practitioner.active,
  p.practitioner.gender,
  p.practitioner.telecom.system[0] as primary_contact_method,
  p.practitioner.telecom.value[0] as primary_contact_value,
  p.practitioner.telecom.use[0] as primary_use,
  c.*
from (select bundleUUID, explode(practitioner) as practitioner from demo_health.fhir_integration.Practitioner) as p
  inner join  (select claim.id as claim_id, 
                  substring(claim.provider, 82, 36) as provider_id, 
                    claim.type.coding.code[0] as claim_type_cd, --837I = Institutional, 837P = Professional
                    claim.insurance.coverage[0] as insurance,
                    claim.total.value as claim_billed_amount
                  from (select explode(claim) as claim from demo_health.fhir_integration.claim)) as c
  on c.provider_id = p.practitioner.id 
  limit 10;


In [0]:
%sql
select claim.type.coding.code[0] as claim_type_cd, --837I = Institutional, 837P = Professional
  count(1)
from (select explode(claim) as claim from demo_health.fhir_integration.claim) as c
group by 1 
-- Only institutional and Rx claims present, no professional claims submitted
limit 10

# Deduplicando FHIR Messages

In [0]:
df = read_from_directory(sample_data).entry()

In [0]:
#claim & patient info
df.select(col("bundleUUID"), col("Patient")).write.mode("overwrite").saveAsTable("demo_health.fhir_integration.staging_patient")
df.select(col("bundleUUID"), col("Claim")).write.mode("overwrite").saveAsTable("demo_health.fhir_integration.staging_claim")

## Consulta de paciente para deduplicar registros

In [0]:
%sql
--Lookup by patient_id 
select stg.bundleUUID as fhir_bundle_id_staging_
  ,p.bundleUUID as fhir_bundle_id_pateint
  ,stg.patient.id as patient_id
  ,case when p.patient.id is not null then "Y" else "N" end as record_exists_flag
from (select bundleUUID, explode(Patient) as patient from demo_health.fhir_integration.staging_patient) stg
  left outer join (select bundleUUID, explode(Patient) as patient from demo_health.fhir_integration.patient) p 
    on stg.patient.id = p.patient.id 
limit 20;


## Consulta de sinistro para deduplicar registros

In [0]:
%sql
--Lookup by claim_id 
select stg.bundleUUID as fhir_bundle_id_staging_
  ,c.bundleUUID as fhir_bundle_id_pateint
  ,stg.claim.id as claim_id
  ,case when c.claim.id is not null then "Y" else "N" end as record_exists_flag
from  (select bundleUUID, explode(claim) as claim from demo_health.fhir_integration.staging_claim) stg
  left outer join (select bundleUUID, explode(claim) as claim from demo_health.fhir_integration.claim) c
    on stg.claim.id = c.claim.id 
limit 20;

#### Para ver a solução completa, entre [aqui](https://databricks-industry-solutions.github.io/interop/#interop_1.html).