diff --git a/recidiviz/big_query/big_query_utils.py b/recidiviz/big_query/big_query_utils.py index c747e23d78..b36bca93f2 100644 --- a/recidiviz/big_query/big_query_utils.py +++ b/recidiviz/big_query/big_query_utils.py @@ -92,8 +92,6 @@ def _bq_schema_column_type_for_type(field_type: Type) -> bigquery.enums.SqlTypeN return bigquery.enums.SqlTypeNames.DATETIME if field_type is bool: return bigquery.enums.SqlTypeNames.BOOLEAN - if field_type is datetime.timedelta: - return bigquery.enums.SqlTypeNames.INTERVAL # TODO(#7285): Add support for ARRAY types when we turn on the regular # CloudSQL to BQ refresh for the JUSTICE_COUNTS schema raise ValueError(f"Unhandled field type for field_type: {field_type}") diff --git a/recidiviz/ingest/direct/metadata/direct_ingest_raw_data_resource_lock_manager.py b/recidiviz/ingest/direct/metadata/direct_ingest_raw_data_resource_lock_manager.py index 039badaad2..1d8c4397cb 100644 --- a/recidiviz/ingest/direct/metadata/direct_ingest_raw_data_resource_lock_manager.py +++ b/recidiviz/ingest/direct/metadata/direct_ingest_raw_data_resource_lock_manager.py @@ -75,11 +75,15 @@ def _get_unreleased_locks_for_resoures( def _is_lock_unreleased_but_expired( lock: schema.DirectIngestRawDataResourceLock, ) -> bool: + return ( not lock.released - and lock.lock_ttl is not None - and datetime.datetime.now(tz=datetime.UTC) - lock.lock_acquisition_time - > lock.lock_ttl + and lock.lock_ttl_seconds is not None + and ( + time_delta := datetime.datetime.now(tz=datetime.UTC) + - lock.lock_acquisition_time + ) + and time_delta.seconds > lock.lock_ttl_seconds ) def _update_unreleased_but_expired_locks_for_resoures( @@ -104,7 +108,7 @@ def _register_new_locks( resources: List[DirectIngestRawDataResourceLockResource], actor: DirectIngestRawDataLockActor, description: str, - ttl: Optional[datetime.timedelta], + ttl_seconds: Optional[int], session: Session, ) -> List[schema.DirectIngestRawDataResourceLock]: """Creates and returns new locks with the provided information.""" @@ -117,7 +121,7 @@ def _register_new_locks( raw_data_source_instance=self.raw_data_source_instance.value, released=False, lock_acquisition_time=datetime.datetime.now(tz=datetime.UTC), - lock_ttl=ttl, + lock_ttl_seconds=ttl_seconds, lock_description=description, ) @@ -160,7 +164,7 @@ def get_most_recent_locks_for_resources( raw_data_source_instance, released, lock_acquisition_time, - lock_ttl, + lock_ttl_seconds, lock_description FROM ( SELECT @@ -227,7 +231,7 @@ def acquire_lock_for_resources( resources: List[DirectIngestRawDataResourceLockResource], actor: DirectIngestRawDataLockActor, description: str, - ttl: Optional[datetime.timedelta] = None, + ttl_seconds: Optional[int] = None, ) -> List[entities.DirectIngestRawDataResourceLock]: """Attempts to acquire the resource locks for the provided |resources|. If successful, returns the newly created locks; otherwise raises @@ -249,7 +253,7 @@ def acquire_lock_for_resources( ) new_locks = self._register_new_locks( - resources, actor, description, ttl, session + resources, actor, description, ttl_seconds, session ) # commit here so that we can get the lock_id populated on the object try: diff --git a/recidiviz/persistence/database/migrations/operations/versions/2024_04_29_1942_2a6d882eaf1c_update_lock_ttl_to_use_seconds.py b/recidiviz/persistence/database/migrations/operations/versions/2024_04_29_1942_2a6d882eaf1c_update_lock_ttl_to_use_seconds.py new file mode 100644 index 0000000000..0614afe92c --- /dev/null +++ b/recidiviz/persistence/database/migrations/operations/versions/2024_04_29_1942_2a6d882eaf1c_update_lock_ttl_to_use_seconds.py @@ -0,0 +1,59 @@ +# pylint: skip-file +"""update_lock_ttl_to_use_seconds + +Revision ID: 2a6d882eaf1c +Revises: a92c4af755cc +Create Date: 2024-04-29 19:42:55.756028 + +""" +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "2a6d882eaf1c" +down_revision = "a92c4af755cc" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint( + "all_process_actors_must_specify_ttl", + "direct_ingest_raw_data_resource_lock", + type_="check", + ) + op.drop_column("direct_ingest_raw_data_resource_lock", "lock_ttl") + op.add_column( + "direct_ingest_raw_data_resource_lock", + sa.Column("lock_ttl_seconds", sa.Integer(), nullable=True), + ) + op.create_check_constraint( + "all_process_actors_must_specify_ttl_seconds", + "direct_ingest_raw_data_resource_lock", + "lock_actor = 'ADHOC' OR (lock_actor = 'PROCESS' and lock_ttl_seconds IS NOT NULL)", + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint( + "all_process_actors_must_specify_ttl_seconds", + "direct_ingest_raw_data_resource_lock", + type_="check", + ) + op.add_column( + "direct_ingest_raw_data_resource_lock", + sa.Column( + "lock_ttl", postgresql.INTERVAL(), autoincrement=False, nullable=True + ), + ) + op.drop_column("direct_ingest_raw_data_resource_lock", "lock_ttl_seconds") + op.create_check_constraint( + "all_process_actors_must_specify_ttl", + "direct_ingest_raw_data_resource_lock", + "lock_actor = 'ADHOC' OR (lock_actor = 'PROCESS' and lock_ttl IS NOT NULL)", + ) + # ### end Alembic commands ### diff --git a/recidiviz/persistence/database/schema/operations/schema.py b/recidiviz/persistence/database/schema/operations/schema.py index a3c9d85044..98830303a3 100644 --- a/recidiviz/persistence/database/schema/operations/schema.py +++ b/recidiviz/persistence/database/schema/operations/schema.py @@ -28,7 +28,6 @@ ForeignKey, Index, Integer, - Interval, PrimaryKeyConstraint, String, UniqueConstraint, @@ -326,16 +325,18 @@ class DirectIngestRawDataResourceLock(OperationsBase): # The time this lock was acquired lock_acquisition_time = Column(DateTime(timezone=True)) - # The TTL for this lock. pg Interval is like a datetime.timedelta() object. - lock_ttl = Column(Interval) + # The TTL for this lock in seconds. consider switching this to pg Interval which + # sqlalchemy converst to datetime.timedelta object if bq federated queries support + # the pg interval type in the future + lock_ttl_seconds = Column(Integer) # Descirption for why the lock was acquired lock_description = Column(String(255), nullable=False) __table_args__ = ( CheckConstraint( - "lock_actor = 'ADHOC' OR (lock_actor = 'PROCESS' and lock_ttl IS NOT NULL)", - name="all_process_actors_must_specify_ttl", + "lock_actor = 'ADHOC' OR (lock_actor = 'PROCESS' and lock_ttl_seconds IS NOT NULL)", + name="all_process_actors_must_specify_ttl_seconds", ), Index( "at_most_one_active_lock_per_resource_region_and_instance", diff --git a/recidiviz/persistence/entity/operations/entities.py b/recidiviz/persistence/entity/operations/entities.py index ce6a6f17a3..1b997488d9 100644 --- a/recidiviz/persistence/entity/operations/entities.py +++ b/recidiviz/persistence/entity/operations/entities.py @@ -181,7 +181,7 @@ class DirectIngestRawDataResourceLock(Entity, BuildableAttr, DefaultableAttr): released: bool = attr.ib() # The time this lock was acquired lock_acquisition_time: datetime.datetime = attr.ib() - # The TTL for this lock. pg Interval is like a datetime.timedelta() object. - lock_ttl: datetime.timedelta = attr.ib() + # The TTL for this lock in seconds + lock_ttl_seconds: Optional[int] = attr.ib() # Descirption for why the lock was acquired lock_description: str = attr.ib() diff --git a/recidiviz/tests/ingest/direct/metadata/direct_ingest_raw_data_resource_lock_manager_test.py b/recidiviz/tests/ingest/direct/metadata/direct_ingest_raw_data_resource_lock_manager_test.py index a6b6eb93dc..5a40860d7b 100644 --- a/recidiviz/tests/ingest/direct/metadata/direct_ingest_raw_data_resource_lock_manager_test.py +++ b/recidiviz/tests/ingest/direct/metadata/direct_ingest_raw_data_resource_lock_manager_test.py @@ -16,7 +16,6 @@ # ============================================================================= """Implements tests for the DirectIngestRawDataResourceLockManager.""" import datetime -from datetime import timedelta from itertools import chain from typing import Any, Optional from unittest import TestCase @@ -154,7 +153,7 @@ def test_acquire_lock_existing_unreleased_unexpired_with_ttl(self) -> None: self.raw_data, DirectIngestRawDataLockActor.PROCESS, "testing-testing-123", - ttl=timedelta(days=10), + ttl_seconds=10 * 24 * 60 * 60, # 10 days ) assert len(active_lock) == 1 @@ -190,7 +189,7 @@ def test_acquire_lock_existing_expired(self) -> None: self.raw_data, DirectIngestRawDataLockActor.PROCESS, "testing-testing-123", - ttl=timedelta(hours=1), + ttl_seconds=60 * 60, ) active_lock = self.us_xx_manager.acquire_lock_for_resources( @@ -205,7 +204,7 @@ def test_lock_process_must_supply_ttl(self) -> None: with self.assertRaisesRegex( IntegrityError, r"\(.*CheckViolation\) new row for relation \"direct_ingest_raw_data_resource_lock\" " - r"violates check constraint \"all_process_actors_must_specify_ttl\".*", + r"violates check constraint \"all_process_actors_must_specify_ttl_seconds\".*", ): _ = self.us_xx_manager.acquire_lock_for_resources( self.raw_data, @@ -247,7 +246,7 @@ def test_get_lock_by_id_reflects_update(self) -> None: self.raw_data, DirectIngestRawDataLockActor.PROCESS, "testing-testing-123", - ttl=timedelta(hours=1), + ttl_seconds=60 * 60, ) assert len(expired_locks_in_jan) == 1 @@ -266,7 +265,7 @@ def test_get_most_recent_lock_for_resources_reflects_update(self) -> None: self.all_resources, DirectIngestRawDataLockActor.PROCESS, "testing-testing-123", - ttl=timedelta(hours=1), + ttl_seconds=60 * 60, ) current_locks = self.us_xx_manager.get_most_recent_locks_for_resources( @@ -337,7 +336,7 @@ def test_lock_acquire_race_condition_with_active_expired_after_update_commit( self.raw_data, DirectIngestRawDataLockActor.PROCESS, "testing-testing-123", - ttl=timedelta(hours=1), + ttl_seconds=60 * 60, ) persisted_lock = [] @@ -394,7 +393,7 @@ def test_lock_acquire_race_condition_with_active_expired_before_update_commit( self.raw_data, DirectIngestRawDataLockActor.PROCESS, "testing-testing-123", - ttl=timedelta(hours=1), + ttl_seconds=60 * 60, ) persisted_lock = [] diff --git a/recidiviz/tools/admin_panel/fixtures/operations_db/direct_ingest_raw_data_resource_lock.csv b/recidiviz/tools/admin_panel/fixtures/operations_db/direct_ingest_raw_data_resource_lock.csv index eac6e5b41c..979ddb05d6 100644 --- a/recidiviz/tools/admin_panel/fixtures/operations_db/direct_ingest_raw_data_resource_lock.csv +++ b/recidiviz/tools/admin_panel/fixtures/operations_db/direct_ingest_raw_data_resource_lock.csv @@ -1,3 +1,3 @@ -lock_id,lock_actor,lock_resource,region_code,raw_data_source_instance,released,lock_acquisition_time,lock_ttl,lock_description +lock_id,lock_actor,lock_resource,region_code,raw_data_source_instance,released,lock_acquisition_time,lock_ttl_seconds,lock_description 1,ADHOC,BUCKET,US_CA,PRIMARY,true,2024-02-09T19:06:00.000000+00,,we held this lock just bc we could 2,ADHOC,BUCKET,US_CA,PRIMARY,false,2024-02-10T19:06:00.000000+00,,we holding this lock just bc we can