Skip to content

Commit

Permalink
Merge pull request #134 from c3g/fix-fhir-workflow
Browse files Browse the repository at this point in the history
Fix FHIR workflow
  • Loading branch information
davidlougheed committed Jun 15, 2020
2 parents 1e263eb + 87796bd commit f68791d
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 48 deletions.
4 changes: 2 additions & 2 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
include chord_metadata_service/chord/workflows/phenopackets_json.wdl
include chord_metadata_service/chord/tests/example_phenopacket.json
include chord_metadata_service/chord/workflows/*.wdl
include chord_metadata_service/chord/tests/*.json
include chord_metadata_service/dats/*
include chord_metadata_service/package.cfg
30 changes: 21 additions & 9 deletions chord_metadata_service/chord/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
{
"id": "json_document",
"type": "file",
"required": True,
"extensions": [".json"]
}
],
Expand All @@ -62,6 +63,7 @@
{
"id": "json_document",
"type": "file",
"required": True,
"extensions": [".json"]
}
],
Expand All @@ -84,25 +86,30 @@
{
"id": "patients",
"type": "file",
"required": True,
"extensions": [".json"]
},
{
"id": "observations",
"type": "file",
"required": False,
"extensions": [".json"]
},
{
"id": "conditions",
"type": "file",
"required": False,
"extensions": [".json"]
},
{
"id": "specimens",
"type": "file",
"required": False,
"extensions": [".json"]
},
{
"id": "created_by",
"required": False,
"type": "string"
},

Expand All @@ -111,26 +118,27 @@
{
"id": "patients",
"type": "file",
"value": "{json_document}"
"value": "{patients}"
},
{
"id": "observations",
"type": "file",
"value": "{json_document}"
"value": "{observations}"
},
{
"id": "conditions",
"type": "file",
"value": "{json_document}"
"value": "{conditions}"
},
{
"id": "specimens",
"type": "file",
"value": "{json_document}"
"value": "{specimens}"
},
{
"id": "created_by",
"type": "string"
"type": "string",
"value": "{created_by}"
},

]
Expand Down Expand Up @@ -342,22 +350,26 @@ def ingest_phenopacket_workflow(workflow_outputs, table_id):
def ingest_fhir_workflow(workflow_outputs, table_id):
with open(workflow_outputs["patients"], "r") as pf:
patients_data = json.load(pf)
ingest_patients(patients_data, table_id, workflow_outputs.get("created_by") or "Imported from file.")
phenopacket_ids = ingest_patients(
patients_data,
table_id,
workflow_outputs.get("created_by") or "Imported from file.",
)

if "observations" in workflow_outputs:
with open(workflow_outputs["observations"], "r") as of:
observations_data = json.load(of)
ingest_observations(observations_data)
ingest_observations(phenopacket_ids, observations_data)

if "conditions" in workflow_outputs:
with open(workflow_outputs["conditions"], "r") as cf:
conditions_data = json.load(cf)
ingest_conditions(conditions_data)
ingest_conditions(phenopacket_ids, conditions_data)

if "specimens" in workflow_outputs:
with open(workflow_outputs["specimens"], "r") as sf:
specimens_data = json.load(sf)
ingest_specimens(specimens_data)
ingest_specimens(phenopacket_ids, specimens_data)


WORKFLOW_INGEST_FUNCTION_MAP = {
Expand Down
49 changes: 31 additions & 18 deletions chord_metadata_service/chord/workflows/fhir_json.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,46 @@ workflow fhir_json {
File? observations
File? conditions
File? specimens
String? created_by

call identity_task {
input:
patients_in = patients,
observations_in = observations,
conditions_in = conditions,
specimens_in = specimens,
created_by_in = created_by
input: json_in = patients
}

call optional_fhir_json_task as ofjt1 {
input: json_in = observations, file_name = "observations.json"
}
call optional_fhir_json_task as ofjt2 {
input: json_in = conditions, file_name = "conditions.json"
}
call optional_fhir_json_task as ofjt3 {
input: json_in = specimens, file_name = "specimens.json"
}
}

task identity_task {
File patients_in
File? observations_in
File? conditions_in
File? specimens_in
String? created_by_in
File json_in

command {
true
}

output {
File patients = "${patients_in}"
File? observations = "${observations_in}"
File? conditions = "${conditions_in}"
File? specimens = "${specimens_in}"
String? created_by = "${created_by_in}"
File json_out = "${json_in}"
}
}
}

task optional_fhir_json_task {
File? json_in
String file_name

command <<<
if [[ -f "${json_in}" ]]; then
mv "${json_in}" "${file_name}";
else
echo '{"resourceType": "bundle", "entry": []}' > "${file_name}";
fi
>>>
output {
File json_out = "${file_name}"
}
}
47 changes: 30 additions & 17 deletions chord_metadata_service/restapi/fhir_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import jsonschema
import logging

from typing import Dict

from .schemas import FHIR_BUNDLE_SCHEMA
from .fhir_utils import (
patient_to_individual,
Expand Down Expand Up @@ -29,16 +31,19 @@ def _check_schema(schema, obj, additional_info=None):
except jsonschema.exceptions.ValidationError:
v = jsonschema.Draft7Validator(schema)
errors = [e for e in v.iter_errors(obj)]
error_messages = []
for i, error in enumerate(errors, 1):
error_messages.append(f"{i} validation error {'.'.join(str(v) for v in error.path)}: {error.message}")
error_messages = [
f"{i} validation error {'.'.join(str(v) for v in error.path)}: {error.message}"
for i, error in enumerate(errors, 1)
]
raise ValidationError(f"{additional_info + ' ' if additional_info else None}errors: {error_messages}")


def ingest_patients(patients_data, table_id, created_by):
""" Takes FHIR Bundle containing Patient resources. """
# check if Patients data follows FHIR Bundle schema
_check_schema(FHIR_BUNDLE_SCHEMA, patients_data, 'patients data')

phenopacket_ids = {}
for item in patients_data["entry"]:
individual_data = patient_to_individual(item["resource"])
individual, _ = Individual.objects.get_or_create(**individual_data)
Expand All @@ -49,22 +54,26 @@ def ingest_patients(patients_data, table_id, created_by):
external_references=[]
)
# create new phenopacket for each individual
phenopacket_ids[individual.id] = str(uuid.uuid4())
phenopacket = Phenopacket.objects.create(
id=str(uuid.uuid4()),
id=phenopacket_ids[individual.id],
subject=individual,
meta_data=meta_data_obj,
table=Table.objects.get(ownership_record_id=table_id)
)
logger.info(f'Phenopacket {phenopacket.id} created')
return

return phenopacket_ids

def ingest_observations(observations_data):

def ingest_observations(phenopacket_ids: Dict[str, str], observations_data):
""" Takes FHIR Bundle containing Observation resources. """
# check if Observations data follows FHIR Bundle schema
_check_schema(FHIR_BUNDLE_SCHEMA, observations_data, 'observations data')

for item in observations_data["entry"]:
phenotypic_feature_data = observation_to_phenotypic_feature(item["resource"])

# Observation must have a subject
try:
item["resource"]["subject"]
Expand All @@ -73,44 +82,47 @@ def ingest_observations(observations_data):

subject = _parse_reference(item["resource"]["subject"]["reference"])
phenotypic_feature, _ = PhenotypicFeature.objects.get_or_create(
phenopacket=Phenopacket.objects.get(subject=Individual.objects.get(id=subject)),
phenopacket=Phenopacket.objects.get(id=phenopacket_ids[subject]),
**phenotypic_feature_data
)

logger.info(f'PhenotypicFeature {phenotypic_feature.id} created')
return


def ingest_conditions(conditions_data):
def ingest_conditions(phenopacket_ids: Dict[str, str], conditions_data):
""" Takes FHIR Bundle containing Condition resources. """
# check if Conditions data follows FHIR Bundle schema
_check_schema(FHIR_BUNDLE_SCHEMA, conditions_data, 'conditions data')

for item in conditions_data["entry"]:
disease_data = condition_to_disease(item["resource"])
disease = Disease.objects.create(**disease_data)

# Condition must have a subject
try:
item["resource"]["subject"]
except KeyError:
raise KeyError(f"Condition {item['resource']['id']} doesn't have a subject.")

subject = _parse_reference(item["resource"]["subject"]["reference"])
phenopacket = Phenopacket.objects.get(subject=Individual.objects.get(id=subject))

phenopacket = Phenopacket.objects.get(id=phenopacket_ids[subject])
phenopacket.diseases.add(disease)

logger.info(f'Disease {disease.id} created')
return


def ingest_specimens(specimens_data):
def ingest_specimens(phenopacket_ids: Dict[str, str], specimens_data):
""" Takes FHIR Bundle containing Specimen resources. """
# check if Specimens data follows FHIR Bundle schema
_check_schema(FHIR_BUNDLE_SCHEMA, specimens_data, 'specimens data')

for item in specimens_data["entry"]:
biosample_data = specimen_to_biosample(item["resource"])
procedure, _ = Procedure.objects.get_or_create(**biosample_data["procedure"])

# Specimen must have a subject
try:
biosample_data["individual"]
except KeyError:
if not biosample_data.get("individual"):
raise KeyError(f"Specimen {item['resource']['id']} doesn't have a subject.")

individual_id = _parse_reference(biosample_data["individual"])
Expand All @@ -120,7 +132,8 @@ def ingest_specimens(specimens_data):
individual=Individual.objects.get(id=individual_id),
sampled_tissue=biosample_data["sampled_tissue"]
)
phenopacket = Phenopacket.objects.get(subject=Individual.objects.get(id=individual_id))

phenopacket = Phenopacket.objects.get(id=phenopacket_ids[individual_id])
phenopacket.biosamples.add(biosample)

logger.info(f'Biosample {biosample.id} created')
return
3 changes: 1 addition & 2 deletions chord_metadata_service/restapi/tests/test_fhir_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def setUp(self) -> None:
dataset=self.d)
self.t = Table.objects.create(ownership_record=to, name="Table 1", data_type=DATA_TYPE_PHENOPACKET)


def test_fhir_bundle_schema(self):

with self.assertRaises(ValidationError):
Expand All @@ -32,6 +31,6 @@ def test_required_subject(self):

with self.assertRaises(KeyError):
try:
ingest_observations(INVALID_SUBJECT_NOT_PRESENT)
ingest_observations({}, INVALID_SUBJECT_NOT_PRESENT)
except KeyError as e:
raise e

0 comments on commit f68791d

Please sign in to comment.