diff --git a/CHANGELOG.md b/CHANGELOG.md index dbb56c0..1d7781a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## v0.7.4 (2026-04-17) + +### Fix + +- fix failures not being properly audited in foundry pipeline + ## v0.7.3 (2026-04-16) ### Fix diff --git a/pyproject.toml b/pyproject.toml index 26071f8..d76f647 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ Issues = "https://github.com/NHSDigital/data-validation-engine/issues" Changelog = "https://github.com/NHSDigital/data-validation-engine/blob/main/CHANGELOG.md" [tool.poetry] -version = "0.7.3" +version = "0.7.4" packages = [ { include = "dve", from = "src" }, ] diff --git a/src/dve/pipeline/foundry_ddb_pipeline.py b/src/dve/pipeline/foundry_ddb_pipeline.py index 21cac56..a5c2463 100644 --- a/src/dve/pipeline/foundry_ddb_pipeline.py +++ b/src/dve/pipeline/foundry_ddb_pipeline.py @@ -152,6 +152,9 @@ def run_pipeline( ) if sub_stats: self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats]) + else: + self._audit_tables.mark_failed(submissions=[sub_id]) + except Exception as err: # pylint: disable=W0718 self._logger.exception( f"During processing of submission_id: {sub_id}, this exception was raised:" diff --git a/src/dve/pipeline/pipeline.py b/src/dve/pipeline/pipeline.py index b14ada1..26b682e 100644 --- a/src/dve/pipeline/pipeline.py +++ b/src/dve/pipeline/pipeline.py @@ -527,7 +527,7 @@ def data_contract_step( return processed_files, failed_processing - def apply_business_rules( + def apply_business_rules( # pylint: disable=R0914 self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None ) -> tuple[SubmissionInfo, SubmissionStatus]: """Apply the business rules to a given submission, the submission may have failed at the @@ -581,15 +581,23 @@ def apply_business_rules( key_fields = {model: conf.reporting_fields for model, conf in model_config.items()} - self.step_implementations.apply_rules(working_directory, entity_manager, rules, key_fields) # type: ignore + _errors_uri, rules_success = self.step_implementations.apply_rules( # type: ignore + working_directory, + entity_manager, + rules, + key_fields + ) rule_messages = load_feedback_messages( get_feedback_errors_uri(working_directory, "business_rules") ) - submission_status.validation_failed = ( + if ( any(not rule_message.is_informational for rule_message in rule_messages) or submission_status.validation_failed - ) + ): + submission_status.validation_failed = True + elif not rules_success: + submission_status.processing_failed = True for entity_name, entity in entity_manager.entities.items(): projected = self._step_implementations.write_parquet( # type: ignore diff --git a/tests/test_pipeline/test_foundry_ddb_pipeline.py b/tests/test_pipeline/test_foundry_ddb_pipeline.py index 12a7fd1..350b990 100644 --- a/tests/test_pipeline/test_foundry_ddb_pipeline.py +++ b/tests/test_pipeline/test_foundry_ddb_pipeline.py @@ -192,3 +192,39 @@ def test_foundry_runner_with_submitted_files_path(movies_test_files, temp_ddb_co assert fh.get_resource_exists(report_uri) assert len(list(fh.iter_prefix(output_loc))) == 2 assert len(list(fh.iter_prefix(audit_files))) == 3 + + +def test_foundry_runner_error_at_bi_rules(movies_test_files, temp_ddb_conn): + # Missing refdata in the business rules should cause a handled failure + db_file, conn = temp_ddb_conn + processing_folder = Path(tempfile.mkdtemp()).as_posix() + submitted_files_path = Path(movies_test_files).as_posix() + sub_id = uuid4().hex + sub_info = SubmissionInfo( + submission_id=sub_id, + dataset_id="movies", + file_name="good_movies", + file_extension="json", + submitting_org="TEST", + datetime_received=datetime(2025,11,5) + ) + + DuckDBRefDataLoader.connection = conn + DuckDBRefDataLoader.dataset_config_uri = None + + with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager: + dve_pipeline = FoundryDDBPipeline( + processed_files_path=processing_folder, + audit_tables=audit_manager, + connection=conn, + rules_path=get_test_file_path("movies/movies_ddb.dischema.json").as_posix(), + submitted_files_path=submitted_files_path, + reference_data_loader=DuckDBRefDataLoader, + ) + output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info) + + assert Path(processing_folder, sub_id, sub_info.file_name_with_ext).exists() + assert output_loc is None + assert len(list(fh.iter_prefix(audit_files))) == 2 + assert audit_manager.get_submission_status(sub_id).processing_failed + assert audit_manager.get_latest_processing_records().select("submission_result").pl().to_dicts()[0]["submission_result"] == "processing_failed"