Skip to content

Commit

Permalink
Merge pull request #369 from bento-platform/ingest-error-bad-schema
Browse files Browse the repository at this point in the history
Improve error handling when ingestion schema validation fails
  • Loading branch information
davidlougheed committed Dec 22, 2022
2 parents 9b4c6d6 + 00d5689 commit 6bf2861
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 37 deletions.
75 changes: 56 additions & 19 deletions chord_metadata_service/chord/ingest/experiments.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import json
import uuid

Expand All @@ -15,12 +17,18 @@
from .utils import get_output_or_raise, workflow_file_output_to_path

__all__ = [
"create_instrument",
"create_experiment_result",
"validate_experiment",
"ingest_experiment",
"ingest_experiments_workflow",
"ingest_maf_derived_from_vcf_workflow",
]

from .exceptions import IngestError


def create_instrument(instrument):
def create_instrument(instrument: dict) -> em.Instrument:
instrument_obj, _ = em.Instrument.objects.get_or_create(
identifier=instrument.get("identifier", str(uuid.uuid4()))
)
Expand All @@ -32,7 +40,7 @@ def create_instrument(instrument):
return instrument_obj


def create_experiment_result(er):
def create_experiment_result(er: dict) -> em.ExperimentResult:
er_obj = em.ExperimentResult(
identifier=er.get("identifier"),
description=er.get("description"),
Expand All @@ -49,13 +57,28 @@ def create_experiment_result(er):
return er_obj


def ingest_experiment(experiment_data, table_id):
"""Ingests a single experiment."""

# validate experiment data against experiments schema
def validate_experiment(experiment_data, idx: Optional[int] = None) -> None:
# Validate experiment data against experiments schema.
validation = schema_validation(experiment_data, EXPERIMENT_SCHEMA)
if not validation:
return
# TODO: Report more precise errors
raise IngestError(
f"Failed schema validation for experiment{(' ' + str(idx)) if idx is not None else ''} "
f"(check Katsu logs for more information)")


def ingest_experiment(
experiment_data: dict,
table_id: str,
validate: bool = True,
idx: Optional[int] = None,
) -> em.Experiment:
"""Ingests a single experiment."""

if validate:
# Validate experiment data against experiments schema prior to ingestion, if specified.
# `validate` may be false if the experiment has already been validated.
validate_experiment(experiment_data, idx)

new_experiment_id = experiment_data.get("id", str(uuid.uuid4()))
study_type = experiment_data.get("study_type")
Expand Down Expand Up @@ -118,41 +141,55 @@ def ingest_experiment(experiment_data, table_id):
return new_experiment


def ingest_experiments_workflow(workflow_outputs, table_id):
def ingest_experiments_workflow(workflow_outputs, table_id: str) -> list[em.Experiment]:
with workflow_file_output_to_path(get_output_or_raise(workflow_outputs, "json_document")) as json_doc_path:
logger.info(f"Attempting ingestion of experiments from path: {json_doc_path}")
with open(json_doc_path, "r") as jf:
json_data = json.load(jf)
json_data: dict = json.load(jf)

dataset = TableOwnership.objects.get(table_id=table_id).dataset

for rs in json_data.get("resources", []):
dataset.additional_resources.add(ingest_resource(rs))

return [ingest_experiment(exp, table_id) for exp in json_data.get("experiments", [])]
exps = json_data.get("experiments", [])

# First, validate all experiments with the schema before creating anything in the database.
for idx, exp in enumerate(exps):
validate_experiment(exp, idx)

# Then, if everything passes, ingest the experiments. Don't re-do the validation in this case.
return [ingest_experiment(exp, table_id, validate=False) for exp in exps]

def ingest_derived_experiment_results(json_data):

def ingest_derived_experiment_results(json_data: list[dict]) -> list[em.ExperimentResult]:
""" Reads a JSON file containing a list of experiment results and adds them
to the database.
The linkage to experiments is inferred from the `derived_from` category
in `extra_properties`
"""

exp_res_list = []
# First, validate all experiment results with the schema before creating anything in the database.

for idx, exp_result in enumerate(json_data):
validation = schema_validation(exp_result, EXPERIMENT_RESULT_SCHEMA)
if not validation:
# TODO: Report more precise errors
raise IngestError(
f"Failed schema validation for experiment result {idx} "
f"(check Katsu logs for more information)")

# If everything passes, perform the actual ingestion next.

exp_res_list: list[em.ExperimentResult] = []
# Create a map of experiment results identifier to experiment id.
# Prefetch results due to the many-to-many relationship
exp = em.Experiment.objects.all().prefetch_related("experiment_results")
exp_result2exp = dict()
for row in exp.values("id", "experiment_results__identifier"):
exp_result2exp[row["experiment_results__identifier"]] = row["id"]

for exp_result in json_data:
validation = schema_validation(exp_result, EXPERIMENT_RESULT_SCHEMA)
if not validation:
logger.warning(f"Improper schema for experiment result: {json.dumps(exp_result)}")
continue

for idx, exp_result in enumerate(json_data):
derived_identifier = exp_result['extra_properties']['derived_from']
experiment_id = exp_result2exp.get(derived_identifier, None)
if experiment_id is None:
Expand All @@ -175,7 +212,7 @@ def ingest_derived_experiment_results(json_data):
# The table_id is required to fit the bento_ingest.schema.json in bento_lib,
# but it is unused. It can be set to any valid table_id or to one of the override
# values defined in view_ingest.py
def ingest_maf_derived_from_vcf_workflow(workflow_outputs, table_id):
def ingest_maf_derived_from_vcf_workflow(workflow_outputs, table_id: str) -> list[em.ExperimentResult]:
with workflow_file_output_to_path(get_output_or_raise(workflow_outputs, "json_document")) as json_doc_path:
logger.info(f"Attempting ingestion of MAF-derived-from-VCF JSON from path: {json_doc_path}")
with open(json_doc_path, "r") as fh:
Expand Down
38 changes: 28 additions & 10 deletions chord_metadata_service/chord/ingest/phenopackets.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import json
import uuid

Expand All @@ -9,15 +11,16 @@
from chord_metadata_service.phenopackets.schemas import PHENOPACKET_SCHEMA
from chord_metadata_service.restapi.utils import iso_duration_to_years

from .exceptions import IngestError
from .logger import logger
from .resources import ingest_resource
from .schema import schema_validation
from .utils import get_output_or_raise, map_if_list, query_and_check_nulls, workflow_file_output_to_path

from typing import Any, Dict, Optional
from typing import Any, Optional, Union


def get_or_create_phenotypic_feature(pf):
def get_or_create_phenotypic_feature(pf: dict) -> pm.PhenotypicFeature:
# Below is code for if we want to re-use phenotypic features in the future
# For now, the lack of a many-to-many relationship doesn't let us do that.
# - David Lougheed, Nov 11 2022
Expand Down Expand Up @@ -53,13 +56,24 @@ def get_or_create_phenotypic_feature(pf):
return pf_obj


def ingest_phenopacket(phenopacket_data: Dict[str, Any], table_id: str) -> Optional[pm.Phenopacket]:
"""Ingests a single phenopacket."""

# validate phenopackets data against phenopacket schema
def validate_phenopacket(phenopacket_data: dict[str, Any], idx: Optional[int] = None) -> None:
# Validate phenopacket data against phenopackets schema.
validation = schema_validation(phenopacket_data, PHENOPACKET_SCHEMA)
if not validation:
return
# TODO: Report more precise errors
raise IngestError(
f"Failed schema validation for phenopacket{(' ' + str(idx)) if idx is not None else ''} "
f"(check Katsu logs for more information)")


def ingest_phenopacket(phenopacket_data: dict[str, Any], table_id: str, validate: bool = True,
idx: Optional[int] = None) -> pm.Phenopacket:
"""Ingests a single phenopacket."""

if validate:
# Validate phenopacket data against phenopackets schema prior to ingestion, if specified.
# `validate` may be false if the phenopacket has already been validated.
validate_phenopacket(phenopacket_data, idx)

new_phenopacket_id = phenopacket_data.get("id", str(uuid.uuid4()))

Expand All @@ -73,7 +87,7 @@ def ingest_phenopacket(phenopacket_data: Dict[str, Any], table_id: str) -> Optio
meta_data = phenopacket_data["meta_data"]

if subject:
extra_properties: Dict[str, Any] = subject.get("extra_properties", {})
extra_properties: dict[str, Any] = subject.get("extra_properties", {})

# Pre-process subject data: ---------------------------------------------------------------------------------

Expand Down Expand Up @@ -229,10 +243,14 @@ def ingest_phenopacket(phenopacket_data: Dict[str, Any], table_id: str) -> Optio
return new_phenopacket


def ingest_phenopacket_workflow(workflow_outputs, table_id):
def ingest_phenopacket_workflow(workflow_outputs, table_id) -> Union[list[pm.Phenopacket], pm.Phenopacket]:
with workflow_file_output_to_path(get_output_or_raise(workflow_outputs, "json_document")) as json_doc_path:
logger.info(f"Attempting ingestion of phenopackets from path: {json_doc_path}")
with open(json_doc_path, "r") as jf:
json_data = json.load(jf)

return map_if_list(ingest_phenopacket, json_data, table_id)
# First, validate all phenopackets
map_if_list(validate_phenopacket, json_data)

# Then, actually try to ingest them (if the validation passes); we don't need to re-do validation here.
return map_if_list(ingest_phenopacket, json_data, table_id, validate=False)
9 changes: 7 additions & 2 deletions chord_metadata_service/chord/ingest/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import contextlib
import os
import re
Expand Down Expand Up @@ -28,9 +30,12 @@
WINDOWS_DRIVE_SCHEME = re.compile(r"^[a-zA-Z]$")


def map_if_list(fn: Callable, data: Any, *args) -> Any:
def map_if_list(fn: Callable, data: Any, *args, **kwargs) -> Any:
# TODO: Any sequence?
return [fn(d, *args) for d in data] if isinstance(data, list) else fn(data, *args)
return (
[fn(d, *args, idx=idx, **kwargs) for idx, d in enumerate(data)] if isinstance(data, list)
else fn(data, *args, **kwargs)
)


def get_output_or_raise(workflow_outputs, key):
Expand Down
29 changes: 25 additions & 4 deletions chord_metadata_service/chord/tests/test_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,17 @@
from chord_metadata_service.chord.data_types import DATA_TYPE_PHENOPACKET, DATA_TYPE_EXPERIMENT
from chord_metadata_service.chord.models import Project, Dataset, TableOwnership, Table
from chord_metadata_service.chord.ingest import WORKFLOW_INGEST_FUNCTION_MAP
from chord_metadata_service.chord.ingest.exceptions import IngestError
from chord_metadata_service.chord.ingest.experiments import (
validate_experiment,
ingest_experiment,
)
from chord_metadata_service.chord.ingest.schema import schema_validation
from chord_metadata_service.chord.ingest.phenopackets import get_or_create_phenotypic_feature
from chord_metadata_service.chord.ingest.phenopackets import (
get_or_create_phenotypic_feature,
validate_phenopacket,
ingest_phenopacket,
)
from chord_metadata_service.chord.workflows.metadata import (
WORKFLOW_EXPERIMENTS_JSON,
WORKFLOW_MAF_DERIVED_FROM_VCF_JSON,
Expand Down Expand Up @@ -165,13 +174,21 @@ def test_reingesting_updating_phenopackets_json(self):
for m1, m2 in zip(p.meta_data.resources.all().order_by("id"), p2.meta_data.resources.all().order_by("id")):
self.assertEqual(m1.id, m2.id)

def test_ingesting_invalid_phenopackets_json(self):
# check invalid phenopacket, must fail validation
def test_phenopackets_validation(self):
# check invalid phenopacket, must fail validation & validate_phenopacket must raise

validation = schema_validation(EXAMPLE_INGEST_INVALID_PHENOPACKET, PHENOPACKET_SCHEMA)
self.assertEqual(validation, False)
# valid phenopacket passes validation
with self.assertRaises(IngestError):
validate_phenopacket(EXAMPLE_INGEST_INVALID_PHENOPACKET)
with self.assertRaises(IngestError):
ingest_phenopacket(EXAMPLE_INGEST_INVALID_PHENOPACKET, "dummy", validate=True)

# valid phenopacket passes validation & doesn't raise
validation_2 = schema_validation(EXAMPLE_INGEST_PHENOPACKET, PHENOPACKET_SCHEMA)
self.assertEqual(validation_2, True)
validate_phenopacket(EXAMPLE_INGEST_PHENOPACKET)

# valid experiments pass validation
for exp in EXAMPLE_INGEST_EXPERIMENT["experiments"]:
validation_3 = schema_validation(exp, EXPERIMENT_SCHEMA)
Expand Down Expand Up @@ -214,6 +231,10 @@ def test_ingesting_invalid_experiment_json(self):
for exp in EXAMPLE_INGEST_INVALID_EXPERIMENT["experiments"]:
validation = schema_validation(exp, EXPERIMENT_SCHEMA)
self.assertEqual(validation, False)
with self.assertRaises(IngestError):
validate_experiment(exp)
with self.assertRaises(IngestError):
ingest_experiment(exp, "dummy", validate=True)

# check valid experiment, must pass validation
for exp in EXAMPLE_INGEST_EXPERIMENT["experiments"]:
Expand Down
2 changes: 2 additions & 0 deletions chord_metadata_service/chord/views_ingest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import json
import logging
import os
Expand Down
6 changes: 4 additions & 2 deletions chord_metadata_service/mcode/mcode_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from . import models as m
from django.utils import timezone

from typing import Optional

logger = logging.getLogger("mcode_ingest")
logger.setLevel(logging.INFO)

Expand All @@ -15,7 +17,7 @@ def _logger_message(created, obj):
logger.info(f"Existing {obj.__class__.__name__} {obj.id} retrieved")


def ingest_mcodepacket(mcodepacket_data, table_id):
def ingest_mcodepacket(mcodepacket_data, table_id, idx: Optional[int] = None):
""" Ingests a single mcodepacket in mcode app and patients' metadata into patients app."""

new_mcodepacket = {"id": mcodepacket_data["id"]}
Expand Down Expand Up @@ -274,7 +276,7 @@ def ingest_mcodepacket(mcodepacket_data, table_id):
updated=timezone.now()
)
mcodepacket.save()
logger.info(f"New Mcodepacket {mcodepacket.id} created")
logger.info(f"New Mcodepacket {mcodepacket.id} created (idx={idx})")
if crprocedures:
mcodepacket.cancer_related_procedures.set(crprocedures)
if medication_statements:
Expand Down

0 comments on commit 6bf2861

Please sign in to comment.