From dc1d5deb191531322a4ba1bad81abb2e0832b282 Mon Sep 17 00:00:00 2001 From: dlpzx <71252798+dlpzx@users.noreply.github.com> Date: Tue, 25 Jun 2024 08:42:13 +0200 Subject: [PATCH] Generic shares_base module and specific s3_datasets_shares module - part 10 (other s3 references in shares_base) (#1357) ### Feature or Bugfix - Refactoring ### Detail As explained in the design for #1123 and #1283 we are trying to implement generic `datasets_base` and `shares_base` modules that can be used by any type of datasets and by any type of shareable object in a generic way. This PR: - Remove the delete_resource_policy conditional for Tables in `backend/dataall/modules/shares_base/services/share_item_service.py` --> Permissions to the Table in data.all are granted once the share has succeeded, the conditional that checks for share_failed tables should not exist. - Remove unnecessary check in share_item_service: in add_share_item we check if it is a table whether it is a cross-region share. This check is completely unnecessary because when we create a share request object we are already checking if it is cross-region - Use `get_share_item_details` in add_share_item - we want to check if the table, folder, bucket exist so we need to query those tables. - Move s3_prefix notifications to subscription task - Fix error in query in `backend/dataall/modules/shares_base/db/share_state_machines_repositories.py` ### Relates - #1283 - #1123 - #955 ### Security Please answer the questions below briefly where applicable, or write `N/A`. Based on [OWASP 10](https://owasp.org/Top10/en/). - Does this PR introduce or modify any input fields or queries - this includes fetching data from storage outside the application (e.g. a database, an S3 bucket)? - Is the input sanitized? - What precautions are you taking before deserializing the data you consume? - Is injection prevented by parametrizing queries? - Have you ensured no `eval` or similar functions are used? - Does this PR introduce any functionality or component that requires authorization? - How have you ensured it respects the existing AuthN/AuthZ mechanisms? - Are you logging failed auth attempts? - Are you using or adding any cryptographic features? - Do you use a standard proven implementations? - Are the used keys controlled by the customer? Where are they stored? - Are you introducing any new policies/roles/users? - Have you used the least-privilege principle? How? By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --- .../tasks/dataset_subscription_task.py | 19 ++++++++++--- .../db/share_object_repositories.py | 2 +- .../db/share_state_machines_repositories.py | 2 +- .../services/share_item_service.py | 27 ++----------------- .../services/share_notification_service.py | 16 +++-------- 5 files changed, 24 insertions(+), 42 deletions(-) diff --git a/backend/dataall/modules/s3_datasets_shares/tasks/dataset_subscription_task.py b/backend/dataall/modules/s3_datasets_shares/tasks/dataset_subscription_task.py index ac37d633d..e382b05ef 100644 --- a/backend/dataall/modules/s3_datasets_shares/tasks/dataset_subscription_task.py +++ b/backend/dataall/modules/s3_datasets_shares/tasks/dataset_subscription_task.py @@ -18,6 +18,9 @@ from dataall.modules.s3_datasets_shares.tasks.subscriptions import poll_queues from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository from dataall.modules.s3_datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, S3Dataset +from dataall.modules.datasets_base.db.dataset_models import DatasetBase +from dataall.modules.shares_base.db.share_object_models import ShareObject +from dataall.modules.shares_base.services.share_notification_service import DataSharingNotificationType root = logging.getLogger() root.setLevel(logging.INFO) @@ -130,15 +133,25 @@ def publish_sns_message(self, session, message, dataset, share_items, prefix, ta response = sns_client.publish_dataset_message(message) log.info(f'SNS update publish response {response}') - notifications = ShareNotificationService( - session=session, dataset=dataset, share=share_object - ).notify_new_data_available_from_owners(s3_prefix=prefix) + notifications = self.notify_new_data_available_from_owners( + session=session, dataset=dataset, share=share_object, s3_prefix=prefix + ) log.info(f'Notifications for share owners {notifications}') except ClientError as e: log.error(f'Failed to deliver message {message} due to: {e}') + @staticmethod + def notify_new_data_available_from_owners(session, dataset: DatasetBase, share: ShareObject, s3_prefix: str): + msg = ( + f'New data (at {s3_prefix}) is available from dataset {dataset.datasetUri} shared by owner {dataset.owner}' + ) + notifications = ShareNotificationService(session=session, dataset=dataset, share=share).register_notifications( + notification_type=DataSharingNotificationType.DATASET_VERSION.value, msg=msg + ) + return notifications + if __name__ == '__main__': ENVNAME = os.environ.get('envname', 'local') diff --git a/backend/dataall/modules/shares_base/db/share_object_repositories.py b/backend/dataall/modules/shares_base/db/share_object_repositories.py index bea8d3876..0a9e2eafb 100644 --- a/backend/dataall/modules/shares_base/db/share_object_repositories.py +++ b/backend/dataall/modules/shares_base/db/share_object_repositories.py @@ -67,7 +67,7 @@ def get_share_item_by_uri(session, uri): return share_item @staticmethod - def get_share_item_details(session, share_type_model, item_uri): # TODO CHECK THAT IT WORKS + def get_share_item_details(session, share_type_model, item_uri): return session.query(share_type_model).get(item_uri) @staticmethod diff --git a/backend/dataall/modules/shares_base/db/share_state_machines_repositories.py b/backend/dataall/modules/shares_base/db/share_state_machines_repositories.py index ddb5d9ae9..2da805cef 100644 --- a/backend/dataall/modules/shares_base/db/share_state_machines_repositories.py +++ b/backend/dataall/modules/shares_base/db/share_state_machines_repositories.py @@ -78,7 +78,7 @@ def update_share_item_status_batch( and_(ShareObjectItem.shareUri == share_uri, ShareObjectItem.status == old_status) ) if share_item_type: - query = query.filter(ShareObjectItem.shareableType == share_item_type.value) + query = query.filter(ShareObjectItem.itemType == share_item_type.value) query.update( { diff --git a/backend/dataall/modules/shares_base/services/share_item_service.py b/backend/dataall/modules/shares_base/services/share_item_service.py index 426fa8b35..058820c23 100644 --- a/backend/dataall/modules/shares_base/services/share_item_service.py +++ b/backend/dataall/modules/shares_base/services/share_item_service.py @@ -123,26 +123,16 @@ def add_shared_item(uri: str, data: dict = None): item_type = data.get('itemType') item_uri = data.get('itemUri') share = ShareObjectRepository.get_share_by_uri(session, uri) - target_environment = EnvironmentService.get_environment_by_uri(session, share.environmentUri) share_sm = ShareObjectSM(share.status) new_share_state = share_sm.run_transition(ShareItemActions.AddItem.value) share_sm.update_state(session, share, new_share_state) + processor = ShareProcessorManager.get_processor_by_item_type(item_type) item = ShareObjectRepository.get_share_item_details(session, processor.shareable_type, item_uri) if not item: raise ObjectNotFound('ShareObjectItem', item_uri) - if ( - item_type == ShareableType.Table.value and item.region != target_environment.region - ): # TODO Part10: remove from here (we might be able to remove get_share_item_details entirely - raise UnauthorizedOperation( - action=ADD_ITEM, - message=f'Lake Formation cross region sharing is not supported. ' - f'Table {item.itemUri} is in {item.region} and target environment ' - f'{target_environment.name} is in {target_environment.region} ', - ) - share_item: ShareObjectItem = ShareObjectRepository.find_sharable_item(session, uri, item_uri) if not share_item: @@ -162,17 +152,6 @@ def add_shared_item(uri: str, data: dict = None): def remove_shared_item(uri: str): with get_context().db_engine.scoped_session() as session: share_item = ShareObjectRepository.get_share_item_by_uri(session, uri) - if ( - share_item.itemType == ShareableType.Table.value # TODO Part10 - REMOVE - and share_item.status == ShareItemStatus.Share_Failed.value - ): - share = ShareObjectRepository.get_share_by_uri(session, share_item.shareUri) - ResourcePolicyService.delete_resource_policy( - session=session, - group=share.groupUri, - resource_uri=share_item.itemUri, - ) - item_sm = ShareItemSM(share_item.status) item_sm.run_transition(ShareItemActions.RemoveItem.value) ShareObjectRepository.remove_share_object_item(session, share_item) @@ -183,9 +162,7 @@ def remove_shared_item(uri: str): def resolve_shared_item(uri, item: ShareObjectItem): with get_context().db_engine.scoped_session() as session: processor = ShareProcessorManager.get_processor_by_item_type(item.itemType) - return ShareObjectRepository.get_share_item_details( - session, processor.shareable_type, item.itemUri - ) # TODO - check it works + return ShareObjectRepository.get_share_item_details(session, processor.shareable_type, item.itemUri) @staticmethod def check_existing_shared_items(share): diff --git a/backend/dataall/modules/shares_base/services/share_notification_service.py b/backend/dataall/modules/shares_base/services/share_notification_service.py index 197b706f4..765138af9 100644 --- a/backend/dataall/modules/shares_base/services/share_notification_service.py +++ b/backend/dataall/modules/shares_base/services/share_notification_service.py @@ -49,7 +49,7 @@ def notify_share_object_submission(self, email_id: str): subject = f'Data.all | Share Request Submitted for {self.dataset.label}' email_notification_msg = msg + share_link_text - notifications = self._register_notifications( + notifications = self.register_notifications( notification_type=DataSharingNotificationType.SHARE_OBJECT_SUBMITTED.value, msg=msg ) @@ -64,7 +64,7 @@ def notify_share_object_approval(self, email_id: str): subject = f'Data.all | Share Request Approved for {self.dataset.label}' email_notification_msg = msg + share_link_text - notifications = self._register_notifications( + notifications = self.register_notifications( notification_type=DataSharingNotificationType.SHARE_OBJECT_APPROVED.value, msg=msg ) @@ -86,21 +86,13 @@ def notify_share_object_rejection(self, email_id: str): subject = f'Data.all | Share Request Rejected / Revoked for {self.dataset.label}' email_notification_msg = msg + share_link_text - notifications = self._register_notifications( + notifications = self.register_notifications( notification_type=DataSharingNotificationType.SHARE_OBJECT_REJECTED.value, msg=msg ) self._create_notification_task(subject=subject, msg=email_notification_msg) return notifications - def notify_new_data_available_from_owners(self, s3_prefix): # TODO part10: remove, this is specific for S3 - msg = f'New data (at {s3_prefix}) is available from dataset {self.dataset.datasetUri} shared by owner {self.dataset.owner}' - - notifications = self._register_notifications( - notification_type=DataSharingNotificationType.DATASET_VERSION.value, msg=msg - ) - return notifications - def _get_share_object_targeted_users(self): targeted_users = list() targeted_users.append(self.dataset.SamlAdminGroupName) @@ -109,7 +101,7 @@ def _get_share_object_targeted_users(self): targeted_users.append(self.share.groupUri) return targeted_users - def _register_notifications(self, notification_type, msg): + def register_notifications(self, notification_type, msg): """ Notifications sent to: - dataset.SamlAdminGroupName