Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## v0.7.4 (2026-04-17)

### Fix

- fix failures not being properly audited in foundry pipeline

## v0.7.3 (2026-04-16)

### Fix
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Issues = "https://github.com/NHSDigital/data-validation-engine/issues"
Changelog = "https://github.com/NHSDigital/data-validation-engine/blob/main/CHANGELOG.md"

[tool.poetry]
version = "0.7.3"
version = "0.7.4"
packages = [
{ include = "dve", from = "src" },
]
Expand Down
3 changes: 3 additions & 0 deletions src/dve/pipeline/foundry_ddb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ def run_pipeline(
)
if sub_stats:
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
else:
self._audit_tables.mark_failed(submissions=[sub_id])

except Exception as err: # pylint: disable=W0718
self._logger.exception(
f"During processing of submission_id: {sub_id}, this exception was raised:"
Expand Down
16 changes: 12 additions & 4 deletions src/dve/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ def data_contract_step(

return processed_files, failed_processing

def apply_business_rules(
def apply_business_rules( # pylint: disable=R0914
self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None
) -> tuple[SubmissionInfo, SubmissionStatus]:
"""Apply the business rules to a given submission, the submission may have failed at the
Expand Down Expand Up @@ -581,15 +581,23 @@ def apply_business_rules(

key_fields = {model: conf.reporting_fields for model, conf in model_config.items()}

self.step_implementations.apply_rules(working_directory, entity_manager, rules, key_fields) # type: ignore
_errors_uri, rules_success = self.step_implementations.apply_rules( # type: ignore
working_directory,
entity_manager,
rules,
key_fields
)

rule_messages = load_feedback_messages(
get_feedback_errors_uri(working_directory, "business_rules")
)
submission_status.validation_failed = (
if (
any(not rule_message.is_informational for rule_message in rule_messages)
or submission_status.validation_failed
)
):
submission_status.validation_failed = True
elif not rules_success:
submission_status.processing_failed = True

for entity_name, entity in entity_manager.entities.items():
projected = self._step_implementations.write_parquet( # type: ignore
Expand Down
36 changes: 36 additions & 0 deletions tests/test_pipeline/test_foundry_ddb_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,39 @@ def test_foundry_runner_with_submitted_files_path(movies_test_files, temp_ddb_co
assert fh.get_resource_exists(report_uri)
assert len(list(fh.iter_prefix(output_loc))) == 2
assert len(list(fh.iter_prefix(audit_files))) == 3


def test_foundry_runner_error_at_bi_rules(movies_test_files, temp_ddb_conn):
# Missing refdata in the business rules should cause a handled failure
db_file, conn = temp_ddb_conn
processing_folder = Path(tempfile.mkdtemp()).as_posix()
submitted_files_path = Path(movies_test_files).as_posix()
sub_id = uuid4().hex
sub_info = SubmissionInfo(
submission_id=sub_id,
dataset_id="movies",
file_name="good_movies",
file_extension="json",
submitting_org="TEST",
datetime_received=datetime(2025,11,5)
)

DuckDBRefDataLoader.connection = conn
DuckDBRefDataLoader.dataset_config_uri = None

with DDBAuditingManager(db_file.as_uri(), None, conn) as audit_manager:
dve_pipeline = FoundryDDBPipeline(
processed_files_path=processing_folder,
audit_tables=audit_manager,
connection=conn,
rules_path=get_test_file_path("movies/movies_ddb.dischema.json").as_posix(),
submitted_files_path=submitted_files_path,
reference_data_loader=DuckDBRefDataLoader,
)
output_loc, report_uri, audit_files = dve_pipeline.run_pipeline(sub_info)

assert Path(processing_folder, sub_id, sub_info.file_name_with_ext).exists()
assert output_loc is None
assert len(list(fh.iter_prefix(audit_files))) == 2
assert audit_manager.get_submission_status(sub_id).processing_failed
assert audit_manager.get_latest_processing_records().select("submission_result").pl().to_dicts()[0]["submission_result"] == "processing_failed"
Loading