Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert Dataset Lock Mechanism to Generic Resource Lock #1338

Merged
merged 20 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3170dae
Split APIs and types between S3-shares and shares-base
dlpzx Jun 7, 2024
05fd497
Merge remote-tracking branch 'refs/remotes/origin/main' into feat/gen…
dlpzx Jun 14, 2024
17070cf
Fix issues with tables select worksheet and with unused items in sear…
dlpzx Jun 14, 2024
274cbee
Make resource lock generic pt 1
noah-paige Jun 17, 2024
e61bebe
Create and delete resource locks on env group or CR create or delete
noah-paige Jun 17, 2024
d1ced2a
Add migration script and debug
noah-paige Jun 18, 2024
0c5c0cb
Merge latest from os
noah-paige Jun 18, 2024
07a2e3d
add back resources
noah-paige Jun 18, 2024
d1ce3ba
Fix revision IDs
noah-paige Jun 18, 2024
670e12a
Merge branch 'os-main' into generic-resource-locking
noah-paige Jun 19, 2024
3fe45e0
merge latest from main and update alembic revision ID
noah-paige Jun 19, 2024
53ae0a1
Merge latest from main + resolve conflicts + update db migration revi…
noah-paige Jun 24, 2024
3ff6734
Merge latest from main + resolve conflicts + update db migration revi…
noah-paige Jun 24, 2024
74555b1
Merge branch 'os-main' into generic-resource-locking
noah-paige Jun 26, 2024
2b4909a
Merge branch 'os-main' into generic-resource-locking
noah-paige Jun 26, 2024
8867991
Add context manager to acquire locks with retry and move function to …
noah-paige Jun 26, 2024
ccde783
Remove isLocked and create/delete + fix migration script
noah-paige Jun 26, 2024
d41e8e4
Fix tests
noah-paige Jun 26, 2024
59752d7
make methods private acquire and release
noah-paige Jun 27, 2024
9e2e569
make methods private acquire and release
noah-paige Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions backend/dataall/base/db/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,15 @@ def __init__(self, action, message):

def __str__(self):
return f'{self.message}'


class ResourceLockTimeout(Exception):
def __init__(self, action, message):
self.action = action
self.message = f"""
An error occurred (ResourceLockTimeout) when calling {self.action} operation:
{message}
"""

def __str__(self):
return f'{self.message}'
10 changes: 1 addition & 9 deletions backend/dataall/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
"""The package contains the core functionality that is required by data.all to work correctly"""

from dataall.core import (
permissions,
stacks,
groups,
environment,
organizations,
tasks,
vpc,
)
from dataall.core import permissions, stacks, groups, environment, organizations, tasks, vpc, resource_lock
28 changes: 28 additions & 0 deletions backend/dataall/core/environment/services/environment_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
CREDENTIALS_ENVIRONMENT,
)
from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
from dataall.core.resource_lock.db.resource_lock_repositories import ResourceLockRepository
from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService
from dataall.core.activity.db.activity_models import Activity
from dataall.core.environment.db.environment_models import EnvironmentParameter, ConsumptionRole
Expand Down Expand Up @@ -295,6 +296,12 @@ def create_environment(uri, data=None):
)
session.commit()

ResourceLockRepository.create_resource_lock(
session=session,
resource_uri=f'{env_group.groupUri}-{env_group.environmentUri}',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we assume that groupUris are unique? why do you need to append the envUri?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same group can be a memeber of many different environments

In practice primary key for environment_group_permission (aka EnvironmentGroup) is groupUri + environmentUri

We similarly need a way to distinguish each Env + Group Pair for resource lock since the underlying resource we are "locking" is the consumer IAM role for the group in that particular env

resource_type=env_group.__tablename__,
)

activity = Activity(
action='ENVIRONMENT:CREATE',
label='ENVIRONMENT:CREATE',
Expand Down Expand Up @@ -413,6 +420,13 @@ def invite_group(uri, data=None) -> (Environment, EnvironmentGroup):
)
session.add(environment_group)
session.commit()

ResourceLockRepository.create_resource_lock(
session=session,
resource_uri=f'{environment_group.groupUri}-{environment_group.environmentUri}',
resource_type=environment_group.__tablename__,
)

ResourcePolicyService.attach_resource_policy(
session=session,
group=group,
Expand Down Expand Up @@ -466,6 +480,9 @@ def remove_group(uri, group):
).delete_all_policies()

if group_membership:
ResourceLockRepository.delete_resource_lock(
session=session, resource_uri=f'{group}-{environment.environmentUri}'
)
session.delete(group_membership)
session.commit()

Expand Down Expand Up @@ -592,6 +609,12 @@ def add_consumption_role(uri, data=None) -> (Environment, EnvironmentGroup):
session.add(consumption_role)
session.commit()

ResourceLockRepository.create_resource_lock(
session=session,
resource_uri=consumption_role.consumptionRoleUri,
resource_type=consumption_role.__tablename__,
)

ResourcePolicyService.attach_resource_policy(
session=session,
group=group,
Expand Down Expand Up @@ -631,6 +654,7 @@ def remove_consumption_role(uri, env_uri):
resource_uri=consumption_role.consumptionRoleUri,
resource_type=ConsumptionRole.__name__,
)
ResourceLockRepository.delete_resource_lock(session=session, resource_uri=uri)

session.delete(consumption_role)
session.commit()
Expand Down Expand Up @@ -881,6 +905,9 @@ def delete_environment(uri):
EnvironmentParameterRepository(session).delete_params(environment.environmentUri)

for group in env_groups:
ResourceLockRepository.delete_resource_lock(
session=session, resource_uri=f'{group.groupUri}-{environment.environmentUri}'
)
session.delete(group)

ResourcePolicyService.delete_resource_policy(
Expand All @@ -890,6 +917,7 @@ def delete_environment(uri):
)

for role in env_roles:
ResourceLockRepository.delete_resource_lock(session=session, resource_uri=role.consumptionRoleUri)
session.delete(role)

return session.delete(environment), environment
Expand Down
Empty file.
Empty file.
28 changes: 28 additions & 0 deletions backend/dataall/core/resource_lock/db/resource_lock_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Optional
from sqlalchemy import Column, String, Boolean

from dataall.base.db import Base


class ResourceLock(Base):
__tablename__ = 'resource_lock'

resourceUri = Column(String, nullable=False, primary_key=True)
resourceType = Column(String, nullable=False, primary_key=True)
isLocked = Column(Boolean, default=False)
acquiredByUri = Column(String, nullable=True)
acquiredByType = Column(String, nullable=True)

def __init__(
self,
resourceUri: str,
resourceType: str,
isLocked: bool = False,
acquiredByUri: Optional[str] = None,
acquiredByType: Optional[str] = None,
):
self.resourceUri = resourceUri
self.resourceType = resourceType
self.isLocked = isLocked
self.acquiredByUri = acquiredByUri
self.acquiredByType = acquiredByType
122 changes: 122 additions & 0 deletions backend/dataall/core/resource_lock/db/resource_lock_repositories.py
petrkalos marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import logging

from dataall.core.resource_lock.db.resource_lock_models import ResourceLock
from sqlalchemy import and_, or_

log = logging.getLogger(__name__)


class ResourceLockRepository:
@staticmethod
def create_resource_lock(
session, resource_uri, resource_type, is_locked=False, acquired_by_uri=None, acquired_by_type=None
):
resource_lock = ResourceLock(
resourceUri=resource_uri,
resourceType=resource_type,
isLocked=is_locked,
acquiredByUri=acquired_by_uri,
acquiredByType=acquired_by_type,
)
session.add(resource_lock)
session.commit()

@staticmethod
def delete_resource_lock(session, resource_uri):
resource_lock = session.query(ResourceLock).filter(ResourceLock.resourceUri == resource_uri).first()
session.delete(resource_lock)
session.commit()

@staticmethod
def acquire_locks(resources, session, acquired_by_uri, acquired_by_type):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a ContextManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed logic to make acquire_lock_with_retry(...) as a context manager which will acquire on enter and release on exit

Let me know if this is what you were thinking as well

Copy link
Contributor

@petrkalos petrkalos Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think acquire_locks and release_lock should be private methods to discourage devs from using them raw. As such we must be prefix them with _ and optionally moved them to the bottom of the file.
If we want to expose acquire/release without a retry we should write another contextmanager.

"""
Attempts to acquire one or more locks on the resources identified by resourceUri and resourceType.

Args:
resources: List of resource tuples (resourceUri, resourceType) to acquire locks for.
session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database.
acquired_by_uri: The ID of the resource that is attempting to acquire the lock.
acquired_by_type: The resource type that is attempting to acquire the lock.qu

Returns:
bool: True if the lock is successfully acquired, False otherwise.
"""
try:
# Execute the query to get the ResourceLock object
filter_conditions = [
and_(
ResourceLock.resourceUri == resource[0],
ResourceLock.resourceType == resource[1],
~ResourceLock.isLocked,
)
for resource in resources
]
resource_locks = session.query(ResourceLock).filter(or_(*filter_conditions)).with_for_update().all()

# Ensure lock record found for each resource
if len(resource_locks) == len(resources):
# Update the attributes of the ResourceLock object
for resource_lock in resource_locks:
resource_lock.isLocked = True
resource_lock.acquiredByUri = acquired_by_uri
resource_lock.acquiredByType = acquired_by_type
session.commit()
return True
else:
log.info(
'Not all ResourceLocks were found. One or more ResourceLocks may be acquired by another resource...'
)
return False
except Exception as e:
session.expunge_all()
session.rollback()
log.error('Error occurred while acquiring lock:', e)
return False

@staticmethod
def release_lock(session, resource_uri, resource_type, share_uri):
"""
Releases the lock on the resource identified by resource_uri, resource_type.

Args:
session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database.
resource_uri: The ID of the resource that owns the lock.
resource_type: The type of the resource that owns the lock.
share_uri: The ID of the share that is attempting to release the lock.

Returns:
bool: True if the lock is successfully released, False otherwise.
"""
try:
log.info(f'Releasing lock for resource: {resource_uri=}, {resource_type=}')

resource_lock = (
session.query(ResourceLock)
.filter(
and_(
ResourceLock.resourceUri == resource_uri,
ResourceLock.resourceType == resource_type,
ResourceLock.isLocked,
ResourceLock.acquiredByUri == share_uri,
)
)
.with_for_update()
.first()
)

if resource_lock:
resource_lock.isLocked = False
resource_lock.acquiredByUri = ''
resource_lock.acquiredByType = ''

session.commit()
return True
else:
log.info(f'ResourceLock not found for resource: {resource_uri=}, {resource_type=}')
return False

except Exception as e:
session.expunge_all()
session.rollback()
log.error('Error occurred while releasing lock:', e)
return False
12 changes: 0 additions & 12 deletions backend/dataall/modules/datasets_base/db/dataset_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,3 @@ def uri(cls):


DatasetBase.__name__ = 'Dataset'


class DatasetLock(Base):
__tablename__ = 'dataset_lock'
datasetUri = Column(String, ForeignKey('dataset.datasetUri'), nullable=False, primary_key=True)
isLocked = Column(Boolean, default=False)
acquiredBy = Column(String, nullable=True)

def __init__(self, datasetUri, isLocked=False, acquiredBy=None):
self.datasetUri = datasetUri
self.isLocked = isLocked
self.acquiredBy = acquiredBy
14 changes: 1 addition & 13 deletions backend/dataall/modules/datasets_base/db/dataset_repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,14 @@
from dataall.base.db import paginate
from dataall.base.db.exceptions import ObjectNotFound
from dataall.core.activity.db.activity_models import Activity
from dataall.modules.datasets_base.db.dataset_models import DatasetBase, DatasetLock
from dataall.modules.datasets_base.db.dataset_models import DatasetBase

logger = logging.getLogger(__name__)


class DatasetBaseRepository:
"""DAO layer for GENERIC Datasets"""

@staticmethod
def create_dataset_lock(session, dataset: DatasetBase):
dataset_lock = DatasetLock(datasetUri=dataset.datasetUri, isLocked=False, acquiredBy='')
session.add(dataset_lock)
session.commit()

@staticmethod
def delete_dataset_lock(session, dataset: DatasetBase):
dataset_lock = session.query(DatasetLock).filter(DatasetLock.datasetUri == dataset.datasetUri).first()
session.delete(dataset_lock)
session.commit()

@staticmethod
def update_dataset_activity(session, dataset: DatasetBase, username):
activity = Activity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
from typing import List
from dataall.core.resource_lock.db.resource_lock_repositories import ResourceLockRepository
from dataall.base.aws.quicksight import QuicksightClient
from dataall.base.db import exceptions
from dataall.base.utils.naming_convention import NamingConventionPattern
Expand Down Expand Up @@ -164,8 +165,9 @@ def create_dataset(uri, admin_group, data: dict):
DatasetService.check_imported_resources(dataset)

dataset = DatasetRepository.create_dataset(session=session, env=environment, dataset=dataset, data=data)
DatasetBaseRepository.create_dataset_lock(session=session, dataset=dataset)

ResourceLockRepository.create_resource_lock(
session=session, resource_uri=dataset.datasetUri, resource_type=dataset.__tablename__
)
DatasetBucketRepository.create_dataset_bucket(session, dataset, data)

ResourcePolicyService.attach_resource_policy(
Expand Down Expand Up @@ -411,7 +413,7 @@ def delete_dataset(uri: str, delete_from_aws: bool = False):
ResourcePolicyService.delete_resource_policy(session=session, resource_uri=uri, group=env.SamlGroupName)
if dataset.stewards:
ResourcePolicyService.delete_resource_policy(session=session, resource_uri=uri, group=dataset.stewards)
DatasetBaseRepository.delete_dataset_lock(session=session, dataset=dataset)
ResourceLockRepository.delete_resource_lock(session=session, resource_uri=dataset.datasetUri)
DatasetRepository.delete_dataset(session, dataset)

if delete_from_aws:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,3 @@ def __init__(self, action, message):
class PrincipalRoleNotFound(BaseShareException):
def __init__(self, action, message):
super().__init__('PrincipalRoleNotFound', action, message)


class DatasetLockTimeout(BaseShareException):
def __init__(self, action, message):
super().__init__('DatasetLockTimeout', action, message)
Loading
Loading