Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic shares_base module and specific s3_datasets_shares module - part 9 (share db repositories) #1351

Merged
merged 6 commits into from
Jun 21, 2024

Conversation

dlpzx
Copy link
Contributor

@dlpzx dlpzx commented Jun 20, 2024

Feature or Bugfix

  • Refactoring

Detail

As explained in the design for #1123 and #1283 we are trying to implement generic datasets_base and shares_base modules that can be used by any type of datasets and by any type of shareable object in a generic way.

This PR includes:

  • Split the ShareobjectRepository from s3_datasets_shares into:
    - ShareobjectRepository (shares_base) - generic db operations on share objects - no references to S3, Glue
    - ShareStatusRepository (shares_base) - db operations related to the sharing state machine states - a way to split the db operations into smaller chunks
    - S3ShareobjectRepository (s3_datasets_share) - db operations on s3 share objects - used only in s3_datasets_shares. They might contain references to DatasetTables, S3Datasets... They are used in the clean-up activities and to count resources in environment.

  • Adapt S3ShareobjectRepository to S3 objects. For some queries it was needed to add filters on the type of share items retrieved, so that if in the future anyone adds a new share type the code still makes sense. To add some more meaning, some functions are renamed to clearly point out that they are s3 functions or what they do.

  • Make ShareobjectRepository completely generic. The following queries needed extra work:

    • ShareObjectRepository.get_share_item - renamed as get_share_item_details
    • list_shareable_items - split in 2 parts list_shareable_items_of_type + paginated_list_shareable_items: the first function is invoked recursively over the list of share processors, instead of querying the DatasetTable, DatasetStorageLocation and DatasetBucket we query the shareable_type. The challenge is to get all fields from the db Resource object that all of them are built upon. In particular the field itemName does not match the BucketName (in bucket) or the S3Prefix (in folders). For this reason I added a migration script to backfill the DatasetBucket.name as DatasetBucket.S3BucketName. and the DatasetStorageLocation.name with DatasetStorageLocation.S3Prefix. paginated_list_shareable_items joins the list of subqueries, filters and paginates.
    • In verify_dataset_share_objects instead of using list_shareable_items, I replaced it by ShareObjectRepository.get_all_share_items_in_share which does not need tables, storage, avoiding the whole S3 logic and avoiding unnecessary queries
  • Remove S3 references from shares_base.api.resolvers. Use DatasetBase and DatasetBaseRepository instead. Remove unused ShareableObject.

  • I had some problems with circular dependencies so I created the ShareProcessorManager in shares_base for the registration of processors. The SharingService uses the manager to get all processors.

Missing items for Part10:

  • Lake Formation cross region table references in shares_base/services/share_item_service.py:add_shared_item
  • remove table references in shares_base/services/share_item_service.py:remove_shared_item
  • remove s3_prefix references shares_base/services/share_notification_service:notify_new_data_available_from_owners
  • RENAMING! Right now the names are a bit misleading

Relates

Security

Please answer the questions below briefly where applicable, or write N/A. Based on
OWASP 10.

  • Does this PR introduce or modify any input fields or queries - this includes
    fetching data from storage outside the application (e.g. a database, an S3 bucket)?
    • Is the input sanitized?
    • What precautions are you taking before deserializing the data you consume?
    • Is injection prevented by parametrizing queries?
    • Have you ensured no eval or similar functions are used?
  • Does this PR introduce any functionality or component that requires authorization?
    • How have you ensured it respects the existing AuthN/AuthZ mechanisms?
    • Are you logging failed auth attempts?
  • Are you using or adding any cryptographic features?
    • Do you use a standard proven implementations?
    • Are the used keys controlled by the customer? Where are they stored?
  • Are you introducing any new policies/roles/users?
    • Have you used the least-privilege principle? How?

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@dlpzx dlpzx force-pushed the feat/generic-dataset-sharing-9 branch from ac10947 to 3176054 Compare June 20, 2024 17:11
@@ -20,6 +20,7 @@ def create_dataset_bucket(session, dataset: S3Dataset, data: dict = None) -> Dat
KmsAlias=dataset.KmsAlias,
imported=dataset.imported,
importedKmsKey=dataset.importedKmsKey,
name=dataset.S3BucketName,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added name=BucketName so that list_shareable_items can return the bucketname

)
)
.first()
.count()
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_share_item, used in a resolver --> used in api/types was not used in the frontend, so I removed the whole logic

self._state = new_state
return True

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved get_share_item_shared_states and get_... moved to ShareStatusRepository to avoid circular dependencies

@@ -142,15 +145,6 @@ def add_shared_item(uri: str, data: dict = None):

share_item: ShareObjectItem = ShareObjectRepository.find_sharable_item(session, uri, item_uri)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed dead code

@@ -127,12 +128,14 @@ def add_shared_item(uri: str, data: dict = None):
share_sm = ShareObjectSM(share.status)
new_share_state = share_sm.run_transition(ShareItemActions.AddItem.value)
share_sm.update_state(session, share, new_share_state)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_share_item queried different db tables depending on the itemType:

    @staticmethod
    def get_share_item(session, item_type, item_uri):
        if item_type == ShareableType.Table.value:
            return session.query(DatasetTable).get(item_uri)
        if item_type == ShareableType.StorageLocation.value:
            return session.query(DatasetStorageLocation).get(item_uri)
        if item_type == ShareableType.S3Bucket.value:
            return session.query(DatasetBucket).get(item_uri)

Instead, now we get the processor that corresponds to the itemType and query the processor.shareableType

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use the ShareProcessorManager in the s3_datasets_shares init to register the shareProcessors, because it was part of the SharingService a lot of things were imported causing all sort of trouble in the loading of the modules. That's why i kept it separate

@@ -126,11 +126,11 @@ def verify_dataset_share_objects(uri: str, share_uris: list):
with get_context().db_engine.scoped_session() as session:
for share_uri in share_uris:
share = ShareObjectRepository.get_share_by_uri(session, share_uri)
Copy link
Contributor Author

@dlpzx dlpzx Jun 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list_shareable_items is an ugly complex query (it returns all Table, Folder and bucket details) that is not needed here. Instead I replaced it by get_all_share_items_in_share, which returns only the ShareObjectItems

q = q.filter(ShareObjectItem.itemName.ilike('%' + term + '%'))

return paginate(query=q, page=data.get('page', 1), page_size=data.get('pageSize', 10)).to_dict()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next 2 queries were heavily modified in this PR. The original was:


    @staticmethod
    def list_shareable_items(session, share, states, data): #TODO
        # All tables from dataset with a column isShared
        # marking the table as part of the shareObject
        tables = (
            session.query(
                DatasetTable.tableUri.label('itemUri'),
                func.coalesce('DatasetTable').label('itemType'),
                DatasetTable.GlueTableName.label('itemName'),
                DatasetTable.description.label('description'),
                ShareObjectItem.shareItemUri.label('shareItemUri'),
                ShareObjectItem.status.label('status'),
                ShareObjectItem.healthStatus.label('healthStatus'),
                ShareObjectItem.healthMessage.label('healthMessage'),
                ShareObjectItem.lastVerificationTime.label('lastVerificationTime'),
                case(
                    [(ShareObjectItem.shareItemUri.isnot(None), True)],
                    else_=False,
                ).label('isShared'),
            )
            .outerjoin(
                ShareObjectItem,
                and_(
                    ShareObjectItem.shareUri == share.shareUri,
                    DatasetTable.tableUri == ShareObjectItem.itemUri,
                ),
            )
            .filter(DatasetTable.datasetUri == share.datasetUri)
        )
        if states:
            tables = tables.filter(ShareObjectItem.status.in_(states))

        # All folders from the dataset with a column isShared
        # marking the folder as part of the shareObject
        locations = (
            session.query(
                DatasetStorageLocation.locationUri.label('itemUri'),
                func.coalesce('DatasetStorageLocation').label('itemType'),
                DatasetStorageLocation.S3Prefix.label('itemName'),
                DatasetStorageLocation.description.label('description'),
                ShareObjectItem.shareItemUri.label('shareItemUri'),
                ShareObjectItem.status.label('status'),
                ShareObjectItem.healthStatus.label('healthStatus'),
                ShareObjectItem.healthMessage.label('healthMessage'),
                ShareObjectItem.lastVerificationTime.label('lastVerificationTime'),
                case(
                    [(ShareObjectItem.shareItemUri.isnot(None), True)],
                    else_=False,
                ).label('isShared'),
            )
            .outerjoin(
                ShareObjectItem,
                and_(
                    ShareObjectItem.shareUri == share.shareUri,
                    DatasetStorageLocation.locationUri == ShareObjectItem.itemUri,
                ),
            )
            .filter(DatasetStorageLocation.datasetUri == share.datasetUri)
        )
        if states:
            locations = locations.filter(ShareObjectItem.status.in_(states))

        s3_buckets = (
            session.query(
                DatasetBucket.bucketUri.label('itemUri'),
                func.coalesce('S3Bucket').label('itemType'),
                DatasetBucket.S3BucketName.label('itemName'),
                DatasetBucket.description.label('description'),
                ShareObjectItem.shareItemUri.label('shareItemUri'),
                ShareObjectItem.status.label('status'),
                ShareObjectItem.healthStatus.label('healthStatus'),
                ShareObjectItem.healthMessage.label('healthMessage'),
                ShareObjectItem.lastVerificationTime.label('lastVerificationTime'),
                case(
                    [(ShareObjectItem.shareItemUri.isnot(None), True)],
                    else_=False,
                ).label('isShared'),
            )
            .outerjoin(
                ShareObjectItem,
                and_(
                    ShareObjectItem.shareUri == share.shareUri,
                    DatasetBucket.bucketUri == ShareObjectItem.itemUri,
                ),
            )
            .filter(DatasetBucket.datasetUri == share.datasetUri)
        )
        if states:
            s3_buckets = s3_buckets.filter(ShareObjectItem.status.in_(states))

        shareable_objects = tables.union(locations, s3_buckets).subquery('shareable_objects')
        query = session.query(shareable_objects)

        if data:
            if data.get('term'):
                term = data.get('term')
                query = query.filter(
                    or_(
                        shareable_objects.c.itemName.ilike(term + '%'),
                        shareable_objects.c.description.ilike(term + '%'),
                    )
                )
            if 'isShared' in data:
                is_shared = data.get('isShared')
                query = query.filter(shareable_objects.c.isShared == is_shared)

            if 'isHealthy' in data:
                # healthy_status = ShareItemHealthStatus.Healthy.value
                query = (
                    query.filter(shareable_objects.c.healthStatus == ShareItemHealthStatus.Healthy.value)
                    if data.get('isHealthy')
                    else query.filter(shareable_objects.c.healthStatus != ShareItemHealthStatus.Healthy.value)
                )

        return paginate(
            query.order_by(shareable_objects.c.itemName).distinct(), data.get('page', 1), data.get('pageSize', 10)
        ).to_dict()


share_type_model.datasetUri.label('datasetUri'),
func.coalesce(type.value).label('itemType'),
share_type_model.description.label('description'),
share_type_model.name.label('itemName'),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

share_type_model.name.label('itemName') ---> this field replaces DatasetBucket.S3BucketName.label('itemName').... that's the reason behind the migration script

@@ -195,56 +171,330 @@ def get_all_share_items_in_share(session, share_uri, status=None, healthStatus=N
)
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small change, status is now a list because we use this function in other places in the code instead of list_shareable_items

@dlpzx dlpzx force-pushed the feat/generic-dataset-sharing-9 branch from 87e1b57 to ee4bff3 Compare June 20, 2024 17:51
@dlpzx dlpzx requested a review from petrkalos June 20, 2024 18:06
@dlpzx dlpzx marked this pull request as ready for review June 20, 2024 18:06
@dlpzx
Copy link
Contributor Author

dlpzx commented Jun 21, 2024

Testing

  • list shares in inbox and outbox
  • get share object (open a share)
  • list S3 consumption details
  • ❗ list shared items (list-shareable-items query with shared=True) --> DATASET WITH MULTIPLE SHARES
  • ❗ click edit, and it also lists the not-yet-shared items (list-shareable-items query with shared=False)
  • add items and approve share - share successful (check all processors)
  • revoke items - successful
  • verify items from share request - verify successful
  • ❗ verify items from dataset view (verify_dataset_share_objects)
  • Try deleting dataset with shared items throws error message
  • Deleting dataset with empty share requests, deletes items and share requests
  • Tested migration script with database with existing datasets = bucket names updated
  • create new dataset, DatasetBucket.name = bucketname
part1.mov
part2.mov
Screen.Recording.2024-06-21.at.10.22.27.mov

image

Comment on lines +30 to +32
Processor: Any
shareable_type: Any
shareable_uri: Any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should those be strings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not strings, they are actual sqlalchemy objects. For example, for tables the shareable_type is the DatasetTable from dataall.modules.s3_datasets.db.dataset_models

ShareProcessorDefinition( ShareableType.Table, ProcessLakeFormationShare, DatasetTable, DatasetTable.tableUri )

@dlpzx dlpzx merged commit 342798c into main Jun 21, 2024
9 checks passed
@dlpzx dlpzx deleted the feat/generic-dataset-sharing-9 branch June 26, 2024 09:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants