Skip to content

Commit

Permalink
Merge pull request #126 from c3g/fhir-to-phenopackets
Browse files Browse the repository at this point in the history
FHIR to Phenopackets ingestion
  • Loading branch information
zxenia committed Jun 10, 2020
2 parents c1fa080 + ce87b06 commit 1e263eb
Show file tree
Hide file tree
Showing 16 changed files with 1,460 additions and 313 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
include chord_metadata_service/chord/workflows/phenopackets_json.wdl
include chord_metadata_service/chord/tests/example_phenopacket.json
include chord_metadata_service/dats/*
include chord_metadata_service/package.cfg
135 changes: 126 additions & 9 deletions chord_metadata_service/chord/ingest.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,41 @@
import json
import os
import uuid

from dateutil.parser import isoparse
from typing import Callable

from chord_metadata_service.chord.data_types import DATA_TYPE_EXPERIMENT, DATA_TYPE_PHENOPACKET
from chord_metadata_service.chord.models import Table
from chord_metadata_service.chord.models import Table, TableOwnership
from chord_metadata_service.experiments import models as em
from chord_metadata_service.phenopackets import models as pm
from chord_metadata_service.resources import models as rm, utils as ru
from chord_metadata_service.restapi.fhir_ingest import (
ingest_patients,
ingest_observations,
ingest_conditions,
ingest_specimens
)


__all__ = [
"METADATA_WORKFLOWS",
"WORKFLOWS_PATH",
"ingest_resource",
"DATA_TYPE_INGEST_FUNCTION_MAP",
"WORKFLOW_INGEST_FUNCTION_MAP",
]

WORKFLOW_PHENOPACKETS_JSON = "phenopackets_json"
WORKFLOW_EXPERIMENTS_JSON = "experiments_json"
WORKFLOW_FHIR_JSON = "fhir_json"

METADATA_WORKFLOWS = {
"ingestion": {
"phenopackets_json": {
WORKFLOW_PHENOPACKETS_JSON: {
"name": "Bento Phenopackets-Compatible JSON",
"description": "This ingestion workflow will validate and import a Phenopackets schema-compatible "
"JSON document.",
"data_type": "phenopacket",
"data_type": DATA_TYPE_PHENOPACKET,
"file": "phenopackets_json.wdl",
"inputs": [
{
Expand All @@ -42,11 +52,11 @@
}
]
},
"experiments_json": {
WORKFLOW_EXPERIMENTS_JSON: {
"name": "Bento Experiments JSON",
"description": "This ingestion workflow will validate and import a Bento Experiments schema-compatible "
"JSON document.",
"data_type": "experiment",
"data_type": DATA_TYPE_EXPERIMENT,
"file": "experiments_json.wdl",
"inputs": [
{
Expand All @@ -62,6 +72,68 @@
"value": "{json_document}"
}
]
},
WORKFLOW_FHIR_JSON: {
"name": "FHIR Resources JSON",
"description": "This ingestion workflow will validate and import a FHIR schema-compatible "
"JSON document, and convert it to the Bento metadata service's internal Phenopackets-based "
"data model.",
"data_type": DATA_TYPE_PHENOPACKET,
"file": "fhir_json.wdl",
"inputs": [
{
"id": "patients",
"type": "file",
"extensions": [".json"]
},
{
"id": "observations",
"type": "file",
"extensions": [".json"]
},
{
"id": "conditions",
"type": "file",
"extensions": [".json"]
},
{
"id": "specimens",
"type": "file",
"extensions": [".json"]
},
{
"id": "created_by",
"type": "string"
},

],
"outputs": [
{
"id": "patients",
"type": "file",
"value": "{json_document}"
},
{
"id": "observations",
"type": "file",
"value": "{json_document}"
},
{
"id": "conditions",
"type": "file",
"value": "{json_document}"
},
{
"id": "specimens",
"type": "file",
"value": "{json_document}"
},
{
"id": "created_by",
"type": "string"
},

]
}
},
"analysis": {}
Expand Down Expand Up @@ -244,7 +316,52 @@ def ingest_phenopacket(phenopacket_data, table_id) -> pm.Phenopacket:
return new_phenopacket


DATA_TYPE_INGEST_FUNCTION_MAP = {
DATA_TYPE_EXPERIMENT: ingest_experiment,
DATA_TYPE_PHENOPACKET: ingest_phenopacket,
def _map_if_list(fn, data, *args):
# TODO: Any sequence?
return [fn(d, *args) for d in data] if isinstance(data, list) else fn(data, *args)


def ingest_experiments_workflow(workflow_outputs, table_id):
with open(workflow_outputs["json_document"], "r") as jf:
json_data = json.load(jf)

dataset = TableOwnership.objects.get(table_id=table_id).dataset

for rs in json_data.get("resources", []):
dataset.additional_resources.add(ingest_resource(rs))

return [ingest_experiment(exp, table_id) for exp in json_data.get("experiments", [])]


def ingest_phenopacket_workflow(workflow_outputs, table_id):
with open(workflow_outputs["json_document"], "r") as jf:
json_data = json.load(jf)
return _map_if_list(ingest_phenopacket, json_data, table_id)


def ingest_fhir_workflow(workflow_outputs, table_id):
with open(workflow_outputs["patients"], "r") as pf:
patients_data = json.load(pf)
ingest_patients(patients_data, table_id, workflow_outputs.get("created_by") or "Imported from file.")

if "observations" in workflow_outputs:
with open(workflow_outputs["observations"], "r") as of:
observations_data = json.load(of)
ingest_observations(observations_data)

if "conditions" in workflow_outputs:
with open(workflow_outputs["conditions"], "r") as cf:
conditions_data = json.load(cf)
ingest_conditions(conditions_data)

if "specimens" in workflow_outputs:
with open(workflow_outputs["specimens"], "r") as sf:
specimens_data = json.load(sf)
ingest_specimens(specimens_data)


WORKFLOW_INGEST_FUNCTION_MAP = {
WORKFLOW_EXPERIMENTS_JSON: ingest_experiments_workflow,
WORKFLOW_PHENOPACKETS_JSON: ingest_phenopacket_workflow,
WORKFLOW_FHIR_JSON: ingest_fhir_workflow,
}

0 comments on commit 1e263eb

Please sign in to comment.