Skip to content

Commit

Permalink
[raw data resource locks] update lock_ttl to lock_ttl_seconds (Recidi…
Browse files Browse the repository at this point in the history
…viz/recidiviz-data#29452)

## Description of the change

i apparently didn't look that hard and the pg `Interval` type is indeed
[not
supported](https://cloud.google.com/bigquery/docs/reference/standard-sql/federated_query_functions#postgresql_mapping)
by federated bq queries to pg.

this pr updates the `lock_ttl` column of type `Interval` to instead be
`lock_ttl_seconds` of type `Integer` and updates the associated manager
and tests to reflect this change.

i havent written a ton of alembic migrations but took my best swing at
removing and adding the associated `CheckConstraint`; would love some
feedback if that looks right!

## Type of change

> All pull requests must have at least one of the following labels
applied (otherwise the PR will fail):

| Label | Description |
|-----------------------------
|-----------------------------------------------------------------------------------------------------------
|
| Type: Bug | non-breaking change that fixes an issue |
| Type: Feature | non-breaking change that adds functionality |
| Type: Breaking Change | fix or feature that would cause existing
functionality to not work as expected |
| Type: Non-breaking refactor | change addresses some tech debt item or
prepares for a later change, but does not change functionality |
| Type: Configuration Change | adjusts configuration to achieve some end
related to functionality, development, performance, or security |
| Type: Dependency Upgrade | upgrades a project dependency - these
changes are not included in release notes |

## Related issues

Closes Recidiviz/recidiviz-data#29451

## Checklists

### Development

**This box MUST be checked by the submitter prior to merging**:
- [x] **Double- and triple-checked that there is no Personally
Identifiable Information (PII) being mistakenly added in this pull
request**

These boxes should be checked by the submitter prior to merging:
- [x] Tests have been written to cover the code changed/added as part of
this pull request

### Code review

These boxes should be checked by reviewers prior to merging:

- [ ] This pull request has a descriptive title and information useful
to a reviewer
- [ ] Potential security implications or infrastructural changes have
been considered, if relevant

GitOrigin-RevId: 86ea4499f7711babb75348d18ce3b5fc94c5e7ad
  • Loading branch information
ethan-oro authored and Helper Bot committed May 15, 2024
1 parent 86ed1ff commit 19f7318
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 26 deletions.
2 changes: 0 additions & 2 deletions recidiviz/big_query/big_query_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ def _bq_schema_column_type_for_type(field_type: Type) -> bigquery.enums.SqlTypeN
return bigquery.enums.SqlTypeNames.DATETIME
if field_type is bool:
return bigquery.enums.SqlTypeNames.BOOLEAN
if field_type is datetime.timedelta:
return bigquery.enums.SqlTypeNames.INTERVAL
# TODO(#7285): Add support for ARRAY types when we turn on the regular
# CloudSQL to BQ refresh for the JUSTICE_COUNTS schema
raise ValueError(f"Unhandled field type for field_type: {field_type}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,15 @@ def _get_unreleased_locks_for_resoures(
def _is_lock_unreleased_but_expired(
lock: schema.DirectIngestRawDataResourceLock,
) -> bool:

return (
not lock.released
and lock.lock_ttl is not None
and datetime.datetime.now(tz=datetime.UTC) - lock.lock_acquisition_time
> lock.lock_ttl
and lock.lock_ttl_seconds is not None
and (
time_delta := datetime.datetime.now(tz=datetime.UTC)
- lock.lock_acquisition_time
)
and time_delta.seconds > lock.lock_ttl_seconds
)

def _update_unreleased_but_expired_locks_for_resoures(
Expand All @@ -104,7 +108,7 @@ def _register_new_locks(
resources: List[DirectIngestRawDataResourceLockResource],
actor: DirectIngestRawDataLockActor,
description: str,
ttl: Optional[datetime.timedelta],
ttl_seconds: Optional[int],
session: Session,
) -> List[schema.DirectIngestRawDataResourceLock]:
"""Creates and returns new locks with the provided information."""
Expand All @@ -117,7 +121,7 @@ def _register_new_locks(
raw_data_source_instance=self.raw_data_source_instance.value,
released=False,
lock_acquisition_time=datetime.datetime.now(tz=datetime.UTC),
lock_ttl=ttl,
lock_ttl_seconds=ttl_seconds,
lock_description=description,
)

Expand Down Expand Up @@ -160,7 +164,7 @@ def get_most_recent_locks_for_resources(
raw_data_source_instance,
released,
lock_acquisition_time,
lock_ttl,
lock_ttl_seconds,
lock_description
FROM (
SELECT
Expand Down Expand Up @@ -227,7 +231,7 @@ def acquire_lock_for_resources(
resources: List[DirectIngestRawDataResourceLockResource],
actor: DirectIngestRawDataLockActor,
description: str,
ttl: Optional[datetime.timedelta] = None,
ttl_seconds: Optional[int] = None,
) -> List[entities.DirectIngestRawDataResourceLock]:
"""Attempts to acquire the resource locks for the provided |resources|.
If successful, returns the newly created locks; otherwise raises
Expand All @@ -249,7 +253,7 @@ def acquire_lock_for_resources(
)

new_locks = self._register_new_locks(
resources, actor, description, ttl, session
resources, actor, description, ttl_seconds, session
)
# commit here so that we can get the lock_id populated on the object
try:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# pylint: skip-file
"""update_lock_ttl_to_use_seconds
Revision ID: 2a6d882eaf1c
Revises: a92c4af755cc
Create Date: 2024-04-29 19:42:55.756028
"""
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = "2a6d882eaf1c"
down_revision = "a92c4af755cc"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"all_process_actors_must_specify_ttl",
"direct_ingest_raw_data_resource_lock",
type_="check",
)
op.drop_column("direct_ingest_raw_data_resource_lock", "lock_ttl")
op.add_column(
"direct_ingest_raw_data_resource_lock",
sa.Column("lock_ttl_seconds", sa.Integer(), nullable=True),
)
op.create_check_constraint(
"all_process_actors_must_specify_ttl_seconds",
"direct_ingest_raw_data_resource_lock",
"lock_actor = 'ADHOC' OR (lock_actor = 'PROCESS' and lock_ttl_seconds IS NOT NULL)",
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"all_process_actors_must_specify_ttl_seconds",
"direct_ingest_raw_data_resource_lock",
type_="check",
)
op.add_column(
"direct_ingest_raw_data_resource_lock",
sa.Column(
"lock_ttl", postgresql.INTERVAL(), autoincrement=False, nullable=True
),
)
op.drop_column("direct_ingest_raw_data_resource_lock", "lock_ttl_seconds")
op.create_check_constraint(
"all_process_actors_must_specify_ttl",
"direct_ingest_raw_data_resource_lock",
"lock_actor = 'ADHOC' OR (lock_actor = 'PROCESS' and lock_ttl IS NOT NULL)",
)
# ### end Alembic commands ###
11 changes: 6 additions & 5 deletions recidiviz/persistence/database/schema/operations/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
ForeignKey,
Index,
Integer,
Interval,
PrimaryKeyConstraint,
String,
UniqueConstraint,
Expand Down Expand Up @@ -326,16 +325,18 @@ class DirectIngestRawDataResourceLock(OperationsBase):
# The time this lock was acquired
lock_acquisition_time = Column(DateTime(timezone=True))

# The TTL for this lock. pg Interval is like a datetime.timedelta() object.
lock_ttl = Column(Interval)
# The TTL for this lock in seconds. consider switching this to pg Interval which
# sqlalchemy converst to datetime.timedelta object if bq federated queries support
# the pg interval type in the future
lock_ttl_seconds = Column(Integer)

# Descirption for why the lock was acquired
lock_description = Column(String(255), nullable=False)

__table_args__ = (
CheckConstraint(
"lock_actor = 'ADHOC' OR (lock_actor = 'PROCESS' and lock_ttl IS NOT NULL)",
name="all_process_actors_must_specify_ttl",
"lock_actor = 'ADHOC' OR (lock_actor = 'PROCESS' and lock_ttl_seconds IS NOT NULL)",
name="all_process_actors_must_specify_ttl_seconds",
),
Index(
"at_most_one_active_lock_per_resource_region_and_instance",
Expand Down
4 changes: 2 additions & 2 deletions recidiviz/persistence/entity/operations/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class DirectIngestRawDataResourceLock(Entity, BuildableAttr, DefaultableAttr):
released: bool = attr.ib()
# The time this lock was acquired
lock_acquisition_time: datetime.datetime = attr.ib()
# The TTL for this lock. pg Interval is like a datetime.timedelta() object.
lock_ttl: datetime.timedelta = attr.ib()
# The TTL for this lock in seconds
lock_ttl_seconds: Optional[int] = attr.ib()
# Descirption for why the lock was acquired
lock_description: str = attr.ib()
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# =============================================================================
"""Implements tests for the DirectIngestRawDataResourceLockManager."""
import datetime
from datetime import timedelta
from itertools import chain
from typing import Any, Optional
from unittest import TestCase
Expand Down Expand Up @@ -154,7 +153,7 @@ def test_acquire_lock_existing_unreleased_unexpired_with_ttl(self) -> None:
self.raw_data,
DirectIngestRawDataLockActor.PROCESS,
"testing-testing-123",
ttl=timedelta(days=10),
ttl_seconds=10 * 24 * 60 * 60, # 10 days
)

assert len(active_lock) == 1
Expand Down Expand Up @@ -190,7 +189,7 @@ def test_acquire_lock_existing_expired(self) -> None:
self.raw_data,
DirectIngestRawDataLockActor.PROCESS,
"testing-testing-123",
ttl=timedelta(hours=1),
ttl_seconds=60 * 60,
)

active_lock = self.us_xx_manager.acquire_lock_for_resources(
Expand All @@ -205,7 +204,7 @@ def test_lock_process_must_supply_ttl(self) -> None:
with self.assertRaisesRegex(
IntegrityError,
r"\(.*CheckViolation\) new row for relation \"direct_ingest_raw_data_resource_lock\" "
r"violates check constraint \"all_process_actors_must_specify_ttl\".*",
r"violates check constraint \"all_process_actors_must_specify_ttl_seconds\".*",
):
_ = self.us_xx_manager.acquire_lock_for_resources(
self.raw_data,
Expand Down Expand Up @@ -247,7 +246,7 @@ def test_get_lock_by_id_reflects_update(self) -> None:
self.raw_data,
DirectIngestRawDataLockActor.PROCESS,
"testing-testing-123",
ttl=timedelta(hours=1),
ttl_seconds=60 * 60,
)

assert len(expired_locks_in_jan) == 1
Expand All @@ -266,7 +265,7 @@ def test_get_most_recent_lock_for_resources_reflects_update(self) -> None:
self.all_resources,
DirectIngestRawDataLockActor.PROCESS,
"testing-testing-123",
ttl=timedelta(hours=1),
ttl_seconds=60 * 60,
)

current_locks = self.us_xx_manager.get_most_recent_locks_for_resources(
Expand Down Expand Up @@ -337,7 +336,7 @@ def test_lock_acquire_race_condition_with_active_expired_after_update_commit(
self.raw_data,
DirectIngestRawDataLockActor.PROCESS,
"testing-testing-123",
ttl=timedelta(hours=1),
ttl_seconds=60 * 60,
)

persisted_lock = []
Expand Down Expand Up @@ -394,7 +393,7 @@ def test_lock_acquire_race_condition_with_active_expired_before_update_commit(
self.raw_data,
DirectIngestRawDataLockActor.PROCESS,
"testing-testing-123",
ttl=timedelta(hours=1),
ttl_seconds=60 * 60,
)

persisted_lock = []
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
lock_id,lock_actor,lock_resource,region_code,raw_data_source_instance,released,lock_acquisition_time,lock_ttl,lock_description
lock_id,lock_actor,lock_resource,region_code,raw_data_source_instance,released,lock_acquisition_time,lock_ttl_seconds,lock_description
1,ADHOC,BUCKET,US_CA,PRIMARY,true,2024-02-09T19:06:00.000000+00,,we held this lock just bc we could
2,ADHOC,BUCKET,US_CA,PRIMARY,false,2024-02-10T19:06:00.000000+00,,we holding this lock just bc we can

0 comments on commit 19f7318

Please sign in to comment.