Skip to content

Commit

Permalink
[Ingest] Ingest Validations for merged LedgerEntity (Recidiviz/recidi…
Browse files Browse the repository at this point in the history
…viz-data#29150)

## Description of the change

Validates LedgerEntity entities:
- sequence_num must be all None or all not-None
- partition_key must be unique if sequence_num are all None
- sequence_num must be unique

GitOrigin-RevId: 33cfcc1eefefe2e52c5ce165b2ef9efffd058e0e
  • Loading branch information
recidinick authored and Helper Bot committed May 14, 2024
1 parent 69a8eb3 commit 67b2fef
Show file tree
Hide file tree
Showing 6 changed files with 684 additions and 276 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ input_columns:
- transition_release_eligibility_date
- actual_or_expected_release_date
- update_datetime_external
- sequence_num
- PERSON_ID
- DOC_ID
unused_columns: []
Expand All @@ -21,7 +20,6 @@ output:
update_datetime: update_datetime_external
task_type: $literal_enum(StateTaskType.DISCHARGE_FROM_INCARCERATION)
task_subtype: $literal("Standard Transition Release")
sequence_num: sequence_num
task_metadata:
$json_dict:
external_id: DOC_ID
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,7 @@
WHERE update_datetime_external IS NOT NULL)
SELECT
* EXCEPT(rn),
ROW_NUMBER() OVER (
PARTITION BY PERSON_ID, DOC_ID
ORDER BY update_datetime_external, transition_release_eligibility_date,
actual_or_expected_release_date) AS sequence_num
* EXCEPT(rn)
FROM final_dedup
WHERE rn = 1
"""
Expand Down
78 changes: 68 additions & 10 deletions recidiviz/pipelines/ingest/state/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"""Utility classes for validating state entities and entity trees."""

from collections import defaultdict
from typing import Dict, Iterable, List, Type
from typing import Dict, Iterable, List, Optional, Sequence, Type

from recidiviz.common.constants.state.state_sentence import (
StateSentenceStatus,
Expand All @@ -29,6 +29,7 @@
get_all_entities_from_tree,
)
from recidiviz.persistence.entity.state import entities as state_entities
from recidiviz.persistence.entity.state.state_entity_mixins import LedgerEntityMixin
from recidiviz.persistence.persistence_utils import RootEntityT
from recidiviz.pipelines.ingest.state.constants import EntityKey, Error
from recidiviz.utils.types import assert_type
Expand Down Expand Up @@ -90,19 +91,13 @@ def _external_id_checks(root_entity: RootEntityT) -> Iterable[Error]:


def _unique_constraint_check(
root_entity: RootEntityT, field_index: CoreEntityFieldIndex
entities_by_cls: Dict[Type[Entity], List[Entity]]
) -> Iterable[Error]:
"""Checks that all child entities match entity_tree_unique_constraints.
If not, this function yields an error message for each child entity and constraint
that fails. The message shows a pii-limited view of the first three entities that
fail the checks.
"""
child_entities = get_all_entities_from_tree(root_entity, field_index=field_index)

entities_by_cls: Dict[Type[Entity], List[Entity]] = defaultdict(list)
for child in child_entities:
entities_by_cls[type(child)].append(child)

for entity_cls, entity_objects in entities_by_cls.items():
for constraint in entity_cls.entity_tree_unique_constraints():
grouped_entities: Dict[str, List[Entity]] = defaultdict(list)
Expand Down Expand Up @@ -166,6 +161,55 @@ def _sentencing_entities_checks(
f"Found person {state_person.limited_pii_repr()} with REVOKED status on {sentence.sentence_type} sentence."
" REVOKED statuses are only allowed on PROBATION and PAROLE type sentences."
)
if sentence.sentence_lengths:
if err := ledger_entity_checks(
state_person,
state_entities.StateSentenceLength,
sentence.sentence_lengths,
):
yield err
if sentence.sentence_status_snapshots:
if err := ledger_entity_checks(
state_person,
state_entities.StateSentenceStatusSnapshot,
sentence.sentence_status_snapshots,
):
yield err


def ledger_entity_checks(
root_entiy: RootEntityT,
entity_cls: Type[LedgerEntityMixin],
ledger_objects: Sequence[LedgerEntityMixin],
) -> Optional[Error]:
"""Yields error messages related to LedgerEntity checks:
- sequence_num must be all None or all not-None
- partition_key must be unique if sequence_num are all None
- sequence_num must be unique
- sequence_num ledger datetime ordering must be consistent
"""
preamble = (
f"Found {root_entiy.limited_pii_repr()} having {entity_cls.__name__} with "
)
# If sequence_num are all None, ensure the partition key is unique.
# Unique partition keys are usually from unique ledger datetimes.
if all(eo.sequence_num is None for eo in ledger_objects):
if len(ledger_objects) != len({eo.partition_key for eo in ledger_objects}):
return (
preamble + "invalid datetime/sequence_num hydration."
" If sequence_num is None, then the ledger's partition_key must be unique across hydrated entities."
)
return None
# If there are any sequence_num that are not None, then they ALL must be not None.
if any(eo.sequence_num is None for eo in ledger_objects):
return (
preamble + " inconsistent sequence_num hydration."
" sequence_num should be None for ALL hydrated entities or NO hydrated entities of the same type."
)
# If ALL sequence_num are not None, then they must be unique.
if len({eo.sequence_num for eo in ledger_objects}) != len(ledger_objects):
return preamble + "DUPLICATE sequence_num hydration."
return None


def validate_root_entity(
Expand All @@ -181,13 +225,27 @@ def validate_root_entity(
# Yields errors if incorrect number of external IDs
error_messages.extend(_external_id_checks(root_entity))

entities_by_cls: Dict[Type[Entity], List[Entity]] = defaultdict(list)
for child in get_all_entities_from_tree(root_entity, field_index=field_index):
entities_by_cls[type(child)].append(child)

# Yields errors if global_unique_constraints fail
error_messages.extend(_unique_constraint_check(root_entity, field_index))
error_messages.extend(_unique_constraint_check(entities_by_cls))

# TODO(#27113) Check sequence_num on LedgerEntity objects
if isinstance(root_entity, state_entities.StatePerson):
error_messages.extend(_sentencing_entities_checks(root_entity))

# Ensure StateTaskDeadline passes ledger checks
if root_entity.task_deadlines:
if err := ledger_entity_checks(
root_entity,
state_entities.StateTaskDeadline,
root_entity.task_deadlines,
):
error_messages.append(err)

# TODO(#29316) Validate StateSentenceGroup as well

return error_messages


Expand Down
Loading

0 comments on commit 67b2fef

Please sign in to comment.