diff --git a/backend/dataall/base/db/exceptions.py b/backend/dataall/base/db/exceptions.py index b7b4bf583..95c2c2a73 100644 --- a/backend/dataall/base/db/exceptions.py +++ b/backend/dataall/base/db/exceptions.py @@ -169,3 +169,15 @@ def __init__(self, action, message): def __str__(self): return f'{self.message}' + + +class ResourceLockTimeout(Exception): + def __init__(self, action, message): + self.action = action + self.message = f""" + An error occurred (ResourceLockTimeout) when calling {self.action} operation: + {message} + """ + + def __str__(self): + return f'{self.message}' diff --git a/backend/dataall/core/__init__.py b/backend/dataall/core/__init__.py index 9f1bc3e38..f15c835aa 100644 --- a/backend/dataall/core/__init__.py +++ b/backend/dataall/core/__init__.py @@ -1,11 +1,3 @@ """The package contains the core functionality that is required by data.all to work correctly""" -from dataall.core import ( - permissions, - stacks, - groups, - environment, - organizations, - tasks, - vpc, -) +from dataall.core import permissions, stacks, groups, environment, organizations, tasks, vpc, resource_lock diff --git a/backend/dataall/core/resource_lock/__init__.py b/backend/dataall/core/resource_lock/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/dataall/core/resource_lock/db/__init__.py b/backend/dataall/core/resource_lock/db/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/dataall/core/resource_lock/db/resource_lock_models.py b/backend/dataall/core/resource_lock/db/resource_lock_models.py new file mode 100644 index 000000000..7e478758e --- /dev/null +++ b/backend/dataall/core/resource_lock/db/resource_lock_models.py @@ -0,0 +1,25 @@ +from typing import Optional +from sqlalchemy import Column, String, Boolean + +from dataall.base.db import Base + + +class ResourceLock(Base): + __tablename__ = 'resource_lock' + + resourceUri = Column(String, nullable=False, primary_key=True) + resourceType = Column(String, nullable=False, primary_key=True) + acquiredByUri = Column(String, nullable=True) + acquiredByType = Column(String, nullable=True) + + def __init__( + self, + resourceUri: str, + resourceType: str, + acquiredByUri: Optional[str] = None, + acquiredByType: Optional[str] = None, + ): + self.resourceUri = resourceUri + self.resourceType = resourceType + self.acquiredByUri = acquiredByUri + self.acquiredByType = acquiredByType diff --git a/backend/dataall/core/resource_lock/db/resource_lock_repositories.py b/backend/dataall/core/resource_lock/db/resource_lock_repositories.py new file mode 100644 index 000000000..25d4e24c8 --- /dev/null +++ b/backend/dataall/core/resource_lock/db/resource_lock_repositories.py @@ -0,0 +1,137 @@ +import logging + +from dataall.core.resource_lock.db.resource_lock_models import ResourceLock +from sqlalchemy import and_, or_ +from sqlalchemy.orm import Session +from time import sleep +from typing import List, Tuple +from contextlib import contextmanager +from dataall.base.db.exceptions import ResourceLockTimeout + +log = logging.getLogger(__name__) + +MAX_RETRIES = 10 +RETRY_INTERVAL = 60 + + +class ResourceLockRepository: + @staticmethod + def _acquire_locks(resources, session, acquired_by_uri, acquired_by_type): + """ + Attempts to acquire/create one or more locks on the resources identified by resourceUri and resourceType. + + Args: + resources: List of resource tuples (resourceUri, resourceType) to acquire locks for. + session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database. + acquired_by_uri: The ID of the resource that is attempting to acquire the lock. + acquired_by_type: The resource type that is attempting to acquire the lock.qu + + Returns: + bool: True if the lock is successfully acquired, False otherwise. + """ + try: + # Execute the query to get the ResourceLock object + filter_conditions = [ + and_( + ResourceLock.resourceUri == resource[0], + ResourceLock.resourceType == resource[1], + ) + for resource in resources + ] + + if not session.query(ResourceLock).filter(or_(*filter_conditions)).first(): + records = [] + for resource in resources: + records.append( + ResourceLock( + resourceUri=resource[0], + resourceType=resource[1], + acquiredByUri=acquired_by_uri, + acquiredByType=acquired_by_type, + ) + ) + session.add_all(records) + session.commit() + return True + else: + log.info( + 'Not all ResourceLocks were found. One or more ResourceLocks may be acquired by another resource...' + ) + return False + except Exception as e: + session.expunge_all() + session.rollback() + log.error('Error occurred while acquiring lock:', e) + return False + + @staticmethod + def _release_lock(session, resource_uri, resource_type, share_uri): + """ + Releases/delete the lock on the resource identified by resource_uri, resource_type. + + Args: + session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database. + resource_uri: The ID of the resource that owns the lock. + resource_type: The type of the resource that owns the lock. + share_uri: The ID of the share that is attempting to release the lock. + + Returns: + bool: True if the lock is successfully released, False otherwise. + """ + try: + log.info(f'Releasing lock for resource: {resource_uri=}, {resource_type=}') + + resource_lock = ( + session.query(ResourceLock) + .filter( + and_( + ResourceLock.resourceUri == resource_uri, + ResourceLock.resourceType == resource_type, + ResourceLock.acquiredByUri == share_uri, + ) + ) + .with_for_update() + .first() + ) + + if resource_lock: + session.delete(resource_lock) + session.commit() + return True + else: + log.info(f'ResourceLock not found for resource: {resource_uri=}, {resource_type=}') + return False + + except Exception as e: + session.expunge_all() + session.rollback() + log.error('Error occurred while releasing lock:', e) + return False + + @staticmethod + @contextmanager + def acquire_lock_with_retry( + resources: List[Tuple[str, str]], session: Session, acquired_by_uri: str, acquired_by_type: str + ): + retries_remaining = MAX_RETRIES + log.info(f'Attempting to acquire lock for resources {resources} by share {acquired_by_uri}...') + while not ( + lock_acquired := ResourceLockRepository._acquire_locks( + resources, session, acquired_by_uri, acquired_by_type + ) + ): + log.info( + f'Lock for one or more resources {resources} already acquired. Retrying in {RETRY_INTERVAL} seconds...' + ) + sleep(RETRY_INTERVAL) + retries_remaining -= 1 + if retries_remaining <= 0: + raise ResourceLockTimeout( + 'process shares', + f'Failed to acquire lock for one or more of {resources=}', + ) + try: + yield lock_acquired + finally: + for resource in resources: + ResourceLockRepository._release_lock(session, resource[0], resource[1], acquired_by_uri) diff --git a/backend/dataall/modules/datasets_base/db/dataset_models.py b/backend/dataall/modules/datasets_base/db/dataset_models.py index 363baccc6..0aeaf68a4 100644 --- a/backend/dataall/modules/datasets_base/db/dataset_models.py +++ b/backend/dataall/modules/datasets_base/db/dataset_models.py @@ -40,15 +40,3 @@ def uri(cls): DatasetBase.__name__ = 'Dataset' - - -class DatasetLock(Base): - __tablename__ = 'dataset_lock' - datasetUri = Column(String, ForeignKey('dataset.datasetUri'), nullable=False, primary_key=True) - isLocked = Column(Boolean, default=False) - acquiredBy = Column(String, nullable=True) - - def __init__(self, datasetUri, isLocked=False, acquiredBy=None): - self.datasetUri = datasetUri - self.isLocked = isLocked - self.acquiredBy = acquiredBy diff --git a/backend/dataall/modules/datasets_base/db/dataset_repositories.py b/backend/dataall/modules/datasets_base/db/dataset_repositories.py index f9d1b33a6..2ff9e396f 100644 --- a/backend/dataall/modules/datasets_base/db/dataset_repositories.py +++ b/backend/dataall/modules/datasets_base/db/dataset_repositories.py @@ -5,7 +5,7 @@ from dataall.base.db import paginate from dataall.base.db.exceptions import ObjectNotFound from dataall.core.activity.db.activity_models import Activity -from dataall.modules.datasets_base.db.dataset_models import DatasetBase, DatasetLock +from dataall.modules.datasets_base.db.dataset_models import DatasetBase logger = logging.getLogger(__name__) @@ -13,18 +13,6 @@ class DatasetBaseRepository: """DAO layer for GENERIC Datasets""" - @staticmethod - def create_dataset_lock(session, dataset: DatasetBase): - dataset_lock = DatasetLock(datasetUri=dataset.datasetUri, isLocked=False, acquiredBy='') - session.add(dataset_lock) - session.commit() - - @staticmethod - def delete_dataset_lock(session, dataset: DatasetBase): - dataset_lock = session.query(DatasetLock).filter(DatasetLock.datasetUri == dataset.datasetUri).first() - session.delete(dataset_lock) - session.commit() - @staticmethod def update_dataset_activity(session, dataset: DatasetBase, username): activity = Activity( diff --git a/backend/dataall/modules/s3_datasets/services/dataset_service.py b/backend/dataall/modules/s3_datasets/services/dataset_service.py index 0ea96b2dc..58561112a 100644 --- a/backend/dataall/modules/s3_datasets/services/dataset_service.py +++ b/backend/dataall/modules/s3_datasets/services/dataset_service.py @@ -2,6 +2,7 @@ import json import logging from typing import List +from dataall.core.resource_lock.db.resource_lock_repositories import ResourceLockRepository from dataall.base.aws.quicksight import QuicksightClient from dataall.base.db import exceptions from dataall.base.utils.naming_convention import NamingConventionPattern @@ -164,8 +165,6 @@ def create_dataset(uri, admin_group, data: dict): DatasetService.check_imported_resources(dataset) dataset = DatasetRepository.create_dataset(session=session, env=environment, dataset=dataset, data=data) - DatasetBaseRepository.create_dataset_lock(session=session, dataset=dataset) - DatasetBucketRepository.create_dataset_bucket(session, dataset, data) ResourcePolicyService.attach_resource_policy( @@ -411,7 +410,6 @@ def delete_dataset(uri: str, delete_from_aws: bool = False): ResourcePolicyService.delete_resource_policy(session=session, resource_uri=uri, group=env.SamlGroupName) if dataset.stewards: ResourcePolicyService.delete_resource_policy(session=session, resource_uri=uri, group=dataset.stewards) - DatasetBaseRepository.delete_dataset_lock(session=session, dataset=dataset) DatasetRepository.delete_dataset(session, dataset) if delete_from_aws: diff --git a/backend/dataall/modules/shares_base/services/share_exceptions.py b/backend/dataall/modules/shares_base/services/share_exceptions.py index bfe051776..ea9091f8a 100644 --- a/backend/dataall/modules/shares_base/services/share_exceptions.py +++ b/backend/dataall/modules/shares_base/services/share_exceptions.py @@ -19,8 +19,3 @@ def __init__(self, action, message): class PrincipalRoleNotFound(BaseShareException): def __init__(self, action, message): super().__init__('PrincipalRoleNotFound', action, message) - - -class DatasetLockTimeout(BaseShareException): - def __init__(self, action, message): - super().__init__('DatasetLockTimeout', action, message) diff --git a/backend/dataall/modules/shares_base/services/sharing_service.py b/backend/dataall/modules/shares_base/services/sharing_service.py index 5a52dd80c..ce35623e7 100644 --- a/backend/dataall/modules/shares_base/services/sharing_service.py +++ b/backend/dataall/modules/shares_base/services/sharing_service.py @@ -1,10 +1,10 @@ import logging -from typing import Any from dataclasses import dataclass -from sqlalchemy import and_ -from time import sleep + +from typing import Any +from dataall.core.resource_lock.db.resource_lock_repositories import ResourceLockRepository from dataall.base.db import Engine -from dataall.core.environment.db.environment_models import Environment, EnvironmentGroup +from dataall.core.environment.db.environment_models import ConsumptionRole, Environment, EnvironmentGroup from dataall.modules.shares_base.db.share_object_state_machines import ( ShareObjectSM, ShareItemSM, @@ -14,6 +14,7 @@ ShareObjectActions, ShareItemActions, ShareItemStatus, + PrincipalType, ) from dataall.modules.shares_base.db.share_object_models import ShareObject from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository @@ -22,17 +23,11 @@ from dataall.modules.shares_base.services.share_object_service import ( ShareObjectService, ) -from dataall.modules.shares_base.services.share_exceptions import ( - PrincipalRoleNotFound, - DatasetLockTimeout, -) -from dataall.modules.datasets_base.db.dataset_models import DatasetLock +from dataall.modules.shares_base.services.share_exceptions import PrincipalRoleNotFound +from dataall.base.db.exceptions import ResourceLockTimeout log = logging.getLogger(__name__) -MAX_RETRIES = 10 -RETRY_INTERVAL = 60 - @dataclass class ShareData: @@ -75,6 +70,17 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool: log.info(f'Starting share {share_data.share.shareUri}') new_share_state = share_object_sm.run_transition(ShareObjectActions.Start.value) share_object_sm.update_state(session, share_data.share, new_share_state) + + resources = [(share_data.dataset.datasetUri, share_data.dataset.__tablename__)] + resources.append( + (share_data.share.principalId, ConsumptionRole.__tablename__) + if share_data.share.principalType == PrincipalType.ConsumptionRole.value + else ( + f'{share_data.share.principalId}-{share_data.share.environmentUri}', + EnvironmentGroup.__tablename__, + ) + ) + share_successful = True try: if not ShareObjectService.verify_principal_role(session, share_data.share): @@ -82,42 +88,46 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool: 'process approved shares', f'Principal role {share_data.share.principalIAMRoleName} is not found.', ) - if not cls.acquire_lock_with_retry(share_data.dataset.datasetUri, session, share_data.share.shareUri): - raise DatasetLockTimeout( - 'process approved shares', - f'Failed to acquire lock for dataset {share_data.dataset.datasetUri}', - ) - for type, processor in ShareProcessorManager.SHARING_PROCESSORS.items(): - try: - log.info(f'Granting permissions of {type.value}') - shareable_items = ShareObjectRepository.get_share_data_items_by_type( - session, - share_data.share, - processor.shareable_type, - processor.shareable_uri, - status=ShareItemStatus.Share_Approved.value, - ) - success = processor.Processor(session, share_data, shareable_items).process_approved_shares() - log.info(f'Sharing {type.value} succeeded = {success}') - if not success: + + with ResourceLockRepository.acquire_lock_with_retry( + resources=resources, + session=session, + acquired_by_uri=share_data.share.shareUri, + acquired_by_type=share_data.share.__tablename__, + ): + for type, processor in ShareProcessorManager.SHARING_PROCESSORS.items(): + try: + log.info(f'Granting permissions of {type.value}') + shareable_items = ShareObjectRepository.get_share_data_items_by_type( + session, + share_data.share, + processor.shareable_type, + processor.shareable_uri, + status=ShareItemStatus.Share_Approved.value, + ) + success = processor.Processor( + session, share_data, shareable_items + ).process_approved_shares() + log.info(f'Sharing {type.value} succeeded = {success}') + if not success: + share_successful = False + except Exception as e: + log.error(f'Error occurred during sharing of {type.value}: {e}') + ShareStatusRepository.update_share_item_status_batch( + session, + share_uri, + old_status=ShareItemStatus.Share_Approved.value, + new_status=ShareItemStatus.Share_Failed.value, + share_item_type=processor.type.value, + ) + ShareStatusRepository.update_share_item_status_batch( + session, + share_uri, + old_status=ShareItemStatus.Share_In_Progress.value, + new_status=ShareItemStatus.Share_Failed.value, + share_item_type=processor.type.value, + ) share_successful = False - except Exception as e: - log.error(f'Error occurred during sharing of {type.value}: {e}') - ShareStatusRepository.update_share_item_status_batch( - session, - share_uri, - old_status=ShareItemStatus.Share_Approved.value, - new_status=ShareItemStatus.Share_Failed.value, - share_item_type=processor.type.value, - ) - ShareStatusRepository.update_share_item_status_batch( - session, - share_uri, - old_status=ShareItemStatus.Share_In_Progress.value, - new_status=ShareItemStatus.Share_Failed.value, - share_item_type=processor.type.value, - ) - share_successful = False return share_successful except Exception as e: @@ -129,9 +139,6 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool: finally: new_share_state = share_object_sm.run_transition(ShareObjectActions.Finish.value) share_object_sm.update_state(session, share_data.share, new_share_state) - lock_released = cls.release_lock(share_data.dataset.datasetUri, session, share_data.share.shareUri) - if not lock_released: - log.error(f'Failed to release lock for dataset {share_data.dataset.datasetUri}.') @classmethod def revoke_share(cls, engine: Engine, share_uri: str) -> bool: @@ -163,6 +170,17 @@ def revoke_share(cls, engine: Engine, share_uri: str) -> bool: log.info(f'Starting revoke {share_data.share.shareUri}') new_share_state = share_sm.run_transition(ShareObjectActions.Start.value) share_sm.update_state(session, share_data.share, new_share_state) + + resources = [(share_data.dataset.datasetUri, share_data.dataset.__tablename__)] + resources.append( + (share_data.share.principalId, ConsumptionRole.__tablename__) + if share_data.share.principalType == PrincipalType.ConsumptionRole.value + else ( + f'{share_data.share.principalId}-{share_data.share.environmentUri}', + EnvironmentGroup.__tablename__, + ) + ) + revoke_successful = True try: if not ShareObjectService.verify_principal_role(session, share_data.share): @@ -170,43 +188,44 @@ def revoke_share(cls, engine: Engine, share_uri: str) -> bool: 'process revoked shares', f'Principal role {share_data.share.principalIAMRoleName} is not found.', ) - if not cls.acquire_lock_with_retry(share_data.dataset.datasetUri, session, share_data.share.shareUri): - raise DatasetLockTimeout( - 'process revoked shares', - f'Failed to acquire lock for dataset {share_data.dataset.datasetUri}', - ) - for type, processor in ShareProcessorManager.SHARING_PROCESSORS.items(): - try: - shareable_items = ShareObjectRepository.get_share_data_items_by_type( - session, - share_data.share, - processor.shareable_type, - processor.shareable_uri, - status=ShareItemStatus.Revoke_Approved.value, - ) - log.info(f'Revoking permissions with {type.value}') - success = processor.Processor(session, share_data, shareable_items).process_revoked_shares() - log.info(f'Revoking {type.value} succeeded = {success}') - if not success: + with ResourceLockRepository.acquire_lock_with_retry( + resources=resources, + session=session, + acquired_by_uri=share_data.share.shareUri, + acquired_by_type=share_data.share.__tablename__, + ): + for type, processor in ShareProcessorManager.SHARING_PROCESSORS.items(): + try: + shareable_items = ShareObjectRepository.get_share_data_items_by_type( + session, + share_data.share, + processor.shareable_type, + processor.shareable_uri, + status=ShareItemStatus.Revoke_Approved.value, + ) + log.info(f'Revoking permissions with {type.value}') + success = processor.Processor(session, share_data, shareable_items).process_revoked_shares() + log.info(f'Revoking {type.value} succeeded = {success}') + if not success: + revoke_successful = False + except Exception as e: + log.error(f'Error occurred during share revoking of {type.value}: {e}') + ShareStatusRepository.update_share_item_status_batch( + session, + share_uri, + old_status=ShareItemStatus.Revoke_Approved.value, + new_status=ShareItemStatus.Revoke_Failed.value, + share_item_type=processor.type.value, + ) + ShareStatusRepository.update_share_item_status_batch( + session, + share_uri, + old_status=ShareItemStatus.Revoke_In_Progress.value, + new_status=ShareItemStatus.Revoke_Failed.value, + share_item_type=processor.type.value, + ) revoke_successful = False - except Exception as e: - log.error(f'Error occurred during share revoking of {type.value}: {e}') - ShareStatusRepository.update_share_item_status_batch( - session, - share_uri, - old_status=ShareItemStatus.Revoke_Approved.value, - new_status=ShareItemStatus.Revoke_Failed.value, - share_item_type=processor.type.value, - ) - ShareStatusRepository.update_share_item_status_batch( - session, - share_uri, - old_status=ShareItemStatus.Revoke_In_Progress.value, - new_status=ShareItemStatus.Revoke_Failed.value, - share_item_type=processor.type.value, - ) - revoke_successful = False return revoke_successful except Exception as e: @@ -223,10 +242,6 @@ def revoke_share(cls, engine: Engine, share_uri: str) -> bool: new_share_state = share_sm.run_transition(ShareObjectActions.Finish.value) share_sm.update_state(session, share_data.share, new_share_state) - lock_released = cls.release_lock(share_data.dataset.datasetUri, session, share_data.share.shareUri) - if not lock_released: - log.error(f'Failed to release lock for dataset {share_data.dataset.datasetUri}.') - @classmethod def verify_share( cls, @@ -303,6 +318,16 @@ def reapply_share(cls, engine: Engine, share_uri: str) -> bool: share_data, share_items = cls._get_share_data_and_items( session, share_uri, None, ShareItemHealthStatus.PendingReApply.value ) + resources = [(share_data.dataset.datasetUri, share_data.dataset.__tablename__)] + resources.append( + (share_data.share.principalId, ConsumptionRole.__tablename__) + if share_data.share.principalType == PrincipalType.ConsumptionRole.value + else ( + f'{share_data.share.principalId}-{share_data.share.environmentUri}', + EnvironmentGroup.__tablename__, + ) + ) + try: log.info(f'Verifying principal IAM Role {share_data.share.principalIAMRoleName}') reapply_successful = ShareObjectService.verify_principal_role(session, share_data.share) @@ -310,52 +335,47 @@ def reapply_share(cls, engine: Engine, share_uri: str) -> bool: log.error(f'Failed to get Principal IAM Role {share_data.share.principalIAMRoleName}, exiting...') return False else: - lock_acquired = cls.acquire_lock_with_retry( - share_data.dataset.datasetUri, session, share_data.share.shareUri - ) - if not lock_acquired: - log.error(f'Failed to acquire lock for dataset {share_data.dataset.datasetUri}, exiting...') - error_message = f'SHARING PROCESS TIMEOUT: Failed to acquire lock for dataset {share_data.dataset.datasetUri}' - ShareStatusRepository.update_share_item_health_status_batch( - session, - share_uri, - old_status=ShareItemHealthStatus.PendingReApply.value, - new_status=ShareItemHealthStatus.Unhealthy.value, - message=error_message, - ) - return False - - for type, processor in ShareProcessorManager.SHARING_PROCESSORS.items(): - try: - log.info(f'Reapplying permissions to {type.value}') - shareable_items = ShareObjectRepository.get_share_data_items_by_type( - session, - share_data.share, - processor.shareable_type, - processor.shareable_uri, - None, - ShareItemHealthStatus.PendingReApply.value, - ) - success = processor.Processor( - session, share_data, shareable_items - ).process_approved_shares() - log.info(f'Reapplying {type.value} succeeded = {success}') - if not success: - reapply_successful = False - except Exception as e: - log.error(f'Error occurred during share reapplying of {type.value}: {e}') + with ResourceLockRepository.acquire_lock_with_retry( + resources=resources, + session=session, + acquired_by_uri=share_data.share.shareUri, + acquired_by_type=share_data.share.__tablename__, + ): + for type, processor in ShareProcessorManager.SHARING_PROCESSORS.items(): + try: + log.info(f'Reapplying permissions to {type.value}') + shareable_items = ShareObjectRepository.get_share_data_items_by_type( + session, + share_data.share, + processor.shareable_type, + processor.shareable_uri, + None, + ShareItemHealthStatus.PendingReApply.value, + ) + success = processor.Processor( + session, share_data, shareable_items + ).process_approved_shares() + log.info(f'Reapplying {type.value} succeeded = {success}') + if not success: + reapply_successful = False + except Exception as e: + log.error(f'Error occurred during share reapplying of {type.value}: {e}') return reapply_successful + + except ResourceLockTimeout as e: + ShareStatusRepository.update_share_item_health_status_batch( + session, + share_uri, + old_status=ShareItemHealthStatus.PendingReApply.value, + new_status=ShareItemHealthStatus.Unhealthy.value, + message=str(e), + ) + except Exception as e: log.error(f'Error occurred during share approval: {e}') return False - finally: - with engine.scoped_session() as session: - lock_released = cls.release_lock(share_data.dataset.datasetUri, session, share_data.share.shareUri) - if not lock_released: - log.error(f'Failed to release lock for dataset {share_data.dataset.datasetUri}.') - @staticmethod def _get_share_data_and_items(session, share_uri, status, healthStatus=None): data = ShareObjectRepository.get_share_data(session, share_uri) @@ -371,105 +391,3 @@ def _get_share_data_and_items(session, share_uri, status, healthStatus=None): session=session, share_uri=share_uri, status=[status], healthStatus=[healthStatus] ) return share_data, share_items - - @staticmethod - def acquire_lock(dataset_uri, session, share_uri): - """ - Attempts to acquire a lock on the dataset identified by dataset_id. - - Args: - dataset_uri: The ID of the dataset for which the lock is being acquired. - session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database. - share_uri: The ID of the share that is attempting to acquire the lock. - - Returns: - bool: True if the lock is successfully acquired, False otherwise. - """ - try: - # Execute the query to get the DatasetLock object - dataset_lock = ( - session.query(DatasetLock) - .filter(and_(DatasetLock.datasetUri == dataset_uri, ~DatasetLock.isLocked)) - .with_for_update() - .first() - ) - - # Check if dataset_lock is not None before attempting to update - if dataset_lock: - # Update the attributes of the DatasetLock object - dataset_lock.isLocked = True - dataset_lock.acquiredBy = share_uri - - session.commit() - return True - else: - log.info('DatasetLock not found for the given criteria.') - return False - except Exception as e: - session.expunge_all() - session.rollback() - log.error('Error occurred while acquiring lock:', e) - return False - - @staticmethod - def acquire_lock_with_retry(dataset_uri, session, share_uri): - for attempt in range(MAX_RETRIES): - try: - log.info(f'Attempting to acquire lock for dataset {dataset_uri} by share {share_uri}...') - lock_acquired = SharingService.acquire_lock(dataset_uri, session, share_uri) - if lock_acquired: - return True - - log.info(f'Lock for dataset {dataset_uri} already acquired. Retrying in {RETRY_INTERVAL} seconds...') - sleep(RETRY_INTERVAL) - - except Exception as e: - log.error('Error occurred while retrying acquiring lock:', e) - return False - - log.info(f'Max retries reached. Failed to acquire lock for dataset {dataset_uri}') - return False - - @staticmethod - def release_lock(dataset_uri, session, share_uri): - """ - Releases the lock on the dataset identified by dataset_uri. - - Args: - dataset_uri: The ID of the dataset for which the lock is being released. - session (sqlalchemy.orm.Session): The SQLAlchemy session object used for interacting with the database. - share_uri: The ID of the share that is attempting to release the lock. - - Returns: - bool: True if the lock is successfully released, False otherwise. - """ - try: - log.info(f'Releasing lock for dataset: {dataset_uri} last acquired by share: {share_uri}') - dataset_lock = ( - session.query(DatasetLock) - .filter( - and_( - DatasetLock.datasetUri == dataset_uri, - DatasetLock.isLocked == True, - DatasetLock.acquiredBy == share_uri, - ) - ) - .with_for_update() - .first() - ) - - if dataset_lock: - dataset_lock.isLocked = False - dataset_lock.acquiredBy = '' - - session.commit() - return True - else: - log.info('DatasetLock not found for the given criteria.') - return False - - except Exception as e: - session.expunge_all() - session.rollback() - log.error('Error occurred while releasing lock:', e) - return False diff --git a/backend/migrations/versions/797dd1012be1_resource_lock_table.py b/backend/migrations/versions/797dd1012be1_resource_lock_table.py new file mode 100644 index 000000000..de5f27351 --- /dev/null +++ b/backend/migrations/versions/797dd1012be1_resource_lock_table.py @@ -0,0 +1,117 @@ +"""resource_lock_table + +Revision ID: 797dd1012be1 +Revises: 18da23d3ba44 +Create Date: 2024-06-17 19:06:51.569471 + +""" + +from alembic import op +from sqlalchemy import orm, Column, String, Boolean, ForeignKey +import sqlalchemy as sa +from typing import Optional, List +from sqlalchemy.ext.declarative import declarative_base +from dataall.base.db import utils + +# revision identifiers, used by Alembic. +revision = '797dd1012be1' +down_revision = '18da23d3ba44' +branch_labels = None +depends_on = None + +Base = declarative_base() + + +class ResourceLock(Base): + __tablename__ = 'resource_lock' + + resourceUri = Column(String, nullable=False, primary_key=True) + resourceType = Column(String, nullable=False, primary_key=True) + acquiredByUri = Column(String, nullable=True) + acquiredByType = Column(String, nullable=True) + + def __init__( + self, + resourceUri: str, + resourceType: str, + acquiredByUri: Optional[str] = None, + acquiredByType: Optional[str] = None, + ): + self.resourceUri = resourceUri + self.resourceType = resourceType + self.acquiredByUri = acquiredByUri + self.acquiredByType = acquiredByType + + +class DatasetBase(Base): + __tablename__ = 'dataset' + environmentUri = Column(String, ForeignKey('environment.environmentUri'), nullable=False) + organizationUri = Column(String, nullable=False) + datasetUri = Column(String, primary_key=True, default=utils.uuid('dataset')) + + +class S3Dataset(DatasetBase): + __tablename__ = 's3_dataset' + datasetUri = Column(String, ForeignKey('dataset.datasetUri'), primary_key=True) + + +class DatasetLock(Base): + __tablename__ = 'dataset_lock' + datasetUri = Column(String, nullable=False, primary_key=True) + isLocked = Column(Boolean, default=False, nullable=False) + acquiredBy = Column(String, nullable=True) + + @classmethod + def uri(cls): + return cls.datasetUri + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + + ## drop dataset_lock table + op.drop_table('dataset_lock') + + ## create resource_lock table + op.create_table( + 'resource_lock', + sa.Column('resourceUri', sa.String(), nullable=False, primary_key=True), + sa.Column('resourceType', sa.String(), nullable=False, primary_key=True), + sa.Column('acquiredByUri', sa.String(), nullable=True), + sa.Column('acquiredByType', sa.String(), nullable=True), + ) + # ### end Alembic commands ### + + +def downgrade(): + # Drop resource_lock table + op.drop_table('resource_lock') + + bind = op.get_bind() + session = orm.Session(bind=bind) + datasets: List[S3Dataset] = session.query(S3Dataset).all() + + print('Creating dataset_lock table') + + op.create_table( + 'dataset_lock', + sa.Column('datasetUri', sa.String(), primary_key=True), + sa.Column('isLocked', sa.Boolean(), nullable=False), + sa.Column('acquiredBy', sa.String(), nullable=True), + ) + + op.create_foreign_key( + 'fk_dataset_lock_datasetUri', # Constraint name + 'dataset_lock', + 'dataset', + ['datasetUri'], + ['datasetUri'], + ) + + print('Creating a new row for each existing dataset in dataset_lock table') + for dataset in datasets: + dataset_lock = DatasetLock(datasetUri=dataset.datasetUri, isLocked=False, acquiredBy='') + session.add(dataset_lock) + session.flush() # flush to get the datasetUri + session.commit() + # ### end Alembic commands ### diff --git a/tests/modules/s3_datasets/test_dataset.py b/tests/modules/s3_datasets/test_dataset.py index ca943abfe..ee6508a8e 100644 --- a/tests/modules/s3_datasets/test_dataset.py +++ b/tests/modules/s3_datasets/test_dataset.py @@ -8,7 +8,8 @@ from dataall.core.organizations.db.organization_models import Organization from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository from dataall.modules.s3_datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, S3Dataset -from dataall.modules.datasets_base.db.dataset_models import DatasetLock, DatasetBase +from dataall.modules.datasets_base.db.dataset_models import DatasetBase +from dataall.core.resource_lock.db.resource_lock_models import ResourceLock from tests.core.stacks.test_stack import update_stack_query from dataall.modules.datasets_base.services.datasets_enums import ConfidentialityClassification @@ -350,7 +351,7 @@ def test_dataset_in_environment(client, env_fixture, dataset1, group): def test_delete_dataset(client, dataset, env_fixture, org_fixture, db, module_mocker, group, user): # Delete any Dataset before effectuating the test with db.scoped_session() as session: - session.query(DatasetLock).delete() + session.query(ResourceLock).delete() session.query(S3Dataset).delete() session.query(DatasetBase).delete() session.commit() diff --git a/tests/modules/s3_datasets/test_dataset_resource_found.py b/tests/modules/s3_datasets/test_dataset_resource_found.py index 32c6fd372..45c2267d4 100644 --- a/tests/modules/s3_datasets/test_dataset_resource_found.py +++ b/tests/modules/s3_datasets/test_dataset_resource_found.py @@ -1,5 +1,4 @@ from dataall.modules.s3_datasets.db.dataset_models import S3Dataset -from dataall.modules.datasets_base.db.dataset_models import DatasetLock from dataall.modules.s3_datasets.services.dataset_permissions import CREATE_DATASET @@ -124,8 +123,6 @@ def test_dataset_resource_found(db, client, env_fixture, org_fixture, group2, us assert 'EnvironmentResourcesFound' in response.errors[0].message with db.scoped_session() as session: - dataset_lock = session.query(DatasetLock).filter(DatasetLock.datasetUri == dataset.datasetUri).first() - session.delete(dataset_lock) dataset = session.query(S3Dataset).get(dataset.datasetUri) session.delete(dataset) session.commit()