In [2]:
import pyspark.sql.functions as F
from credentials import MY_CREDENTIALS
from data_location import DELTA_LOCATION

from spark_bi.spark import FutPathlingContext

pc = FutPathlingContext.create(
    app_name="example-spark-app", hadoop_config=MY_CREDENTIALS.to_hadoop_config()
)
delta_lake = pc.read.delta(DELTA_LOCATION)

:: loading settings :: url = jar:file:/Users/mabe/Git/spark-bi/.venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/mabe/.ivy2.5.2/cache
The jars for the packages stored in: /Users/mabe/.ivy2.5.2/jars
au.csiro.pathling#library-runtime added as a dependency
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a930056c-9fc2-4ceb-a12a-be9180f4d2c7;1.0
	confs: [default]
	found au.csiro.pathling#library-runtime;9.1.0 in local-m2-cache
	found io.delta#delta-spark_2.13;4.0.0 in local-m2-cache
	found io.delta#delta-storage;4.0.0 in local-m2-cache
	found org.antlr#antlr4-runtime;4.13.1 in local-m2-cache
:: resolution report :: resolve 90ms :: artifacts dl 5ms
	:: modules in use:
	au.csiro.pathling#library-runtime;9.1.0 from local-m2-cache in [default]
	io.delta#delta-spark_2.13;4.0.0 from local-m2-cache in [default]
	io.delta#delta-storage;4.0.0 fr

For at finde patientens diagnoser/behandlingsområder skal vi lave koblingen:

`Patient <-> EpisodeOfCare <-> CarePlan.addresses <-> Condition.code`

Vi starter fra højre:

In [16]:
conditions = delta_lake.view(
    resource="Condition",
    select=[
        {
            "column": [
                {"name": "condition_id", "path": "getResourceKey()"},
                {"name": "diagnosis_code", "path": "code.coding.code"},
            ]
        }
    ],
)
conditions.head(5)

[Row(condition_id='Condition/2000000003', diagnosis_code='DJ44'),
 Row(condition_id='Condition/2000000008', diagnosis_code='DJ44'),
 Row(condition_id='Condition/2000000036', diagnosis_code='DJ44'),
 Row(condition_id='Condition/2000000050', diagnosis_code='DJ44'),
 Row(condition_id='Condition/2000000069', diagnosis_code='DJ44')]

In [17]:
careplans = delta_lake.view(
    resource="CarePlan",
    select=[
        {
            "column": [
                {"name": "cp_id", "path": "getResourceKey()"},
                {
                    "name": "cp_eoc_id",
                    "path": "extension.where(url='http://hl7.org/fhir/StructureDefinition/workflow-episodeOfCare').valueReference.getReferenceKey()",
                },
                {"name": "addresses_condition", "path": "addresses.getReferenceKey()"},
            ]
        }
    ],
)
careplans.head(5)

[Row(cp_id='CarePlan/2000000027', cp_eoc_id='EpisodeOfCare/2000000023', addresses_condition='Condition/2000000024'),
 Row(cp_id='CarePlan/2000000039', cp_eoc_id='EpisodeOfCare/2000000035', addresses_condition='Condition/2000000036'),
 Row(cp_id='CarePlan/2000000072', cp_eoc_id='EpisodeOfCare/2000000068', addresses_condition='Condition/2000000069'),
 Row(cp_id='CarePlan/2000000084', cp_eoc_id='EpisodeOfCare/2000000080', addresses_condition='Condition/2000000081'),
 Row(cp_id='CarePlan/2000000115', cp_eoc_id='EpisodeOfCare/2000000111', addresses_condition='Condition/2000000112')]

In [18]:
episodes_of_care = delta_lake.view(
    resource="EpisodeOfCare",
    select=[
        {
            "column": [
                {"name": "eoc_id", "path": "getResourceKey()"},
                {"name": "eoc_patient_id", "path": "patient.getReferenceKey()"},
            ]
        }
    ],
)
episodes_of_care.head(5)

[Row(eoc_id='EpisodeOfCare/2000000029', eoc_patient_id='Patient/1000264558'),
 Row(eoc_id='EpisodeOfCare/2000000035', eoc_patient_id='Patient/1000264558'),
 Row(eoc_id='EpisodeOfCare/2000000042', eoc_patient_id='Patient/1000264558'),
 Row(eoc_id='EpisodeOfCare/2000000049', eoc_patient_id='Patient/1000264558'),
 Row(eoc_id='EpisodeOfCare/2000000068', eoc_patient_id='Patient/1000264558')]

Dernæst kan vi samle til én stor tabel:

In [22]:
(
    conditions.join(
        careplans, conditions["condition_id"] == careplans["addresses_condition"], how="inner"
    )
    .join(episodes_of_care, careplans["cp_eoc_id"] == episodes_of_care["eoc_id"], how="inner")
    .groupby("diagnosis_code")
    .agg(F.countDistinct("eoc_patient_id").alias("n_citizens"))
    .head(5)
)

[Row(diagnosis_code='DJ44', n_citizens=2)]

Vær opmærksom på, at disse tal er fra TRIFORKs testmiljø, og derfor ikke repræsentative.