Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test Enabling S3 bucket share - COPY #846

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/dataall/modules/dataset_sharing/api/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class ShareableType(GraphQLEnumMapper):
Table = 'DatasetTable'
StorageLocation = 'DatasetStorageLocation'
View = 'View'
S3Bucket = 'S3Bucket'


class ShareObjectPermission(GraphQLEnumMapper):
Expand Down
1 change: 1 addition & 0 deletions backend/dataall/modules/dataset_sharing/api/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def resolve_consumption_data(context: Context, source: ShareObject, **kwargs):
return {
's3AccessPointName': S3AccessPointName,
'sharedGlueDatabase': (ds.GlueDatabaseName + '_shared_' + source.shareUri)[:254] if ds else 'Not created',
's3bucketName': ds.S3BucketName,
}


Expand Down
1 change: 1 addition & 0 deletions backend/dataall/modules/dataset_sharing/api/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
fields=[
gql.Field(name='s3AccessPointName', type=gql.String),
gql.Field(name='sharedGlueDatabase', type=gql.String),
gql.Field(name='s3bucketName', type=gql.String),
],
)

Expand Down
19 changes: 19 additions & 0 deletions backend/dataall/modules/dataset_sharing/aws/kms_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,22 @@ def check_key_exists(self, key_alias: str):
return None
else:
return key_exist

def add_tags_to_key(self, key_id: str, tags: list):
"""
Add tags to an existing AWS KMS key.
:param key_id: The ID of the KMS key to add tags to.
:param tags: A list of dictionaries containing the tags to be added. For example:
[{'TagKey': 'Purpose', 'TagValue': 'Test'}]
:return: None
"""
try:
self._client.tag_resource(
KeyId=key_id,
Tags=tags,
)
except Exception as e:
log.error(
f'Failed to add tags to kms key {key_id} : {e}'
)
raise e
44 changes: 44 additions & 0 deletions backend/dataall/modules/dataset_sharing/aws/s3_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,50 @@ def generate_access_point_policy_template(
}
return policy

@staticmethod
def generate_default_bucket_policy(
s3_bucket_name: str,
owner_roleId: list
):
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowAllToAdmin",
"Effect": "Allow",
"Principal": "*",
"Action": "s3:*",
"Resource": [
f"arn:aws:s3:::{s3_bucket_name}",
f"arn:aws:s3:::{s3_bucket_name}/*"
],
"Condition": {
"StringLike": {
"aws:userId": owner_roleId
}
}
},
{
"Effect": "Deny",
"Principal": {
"AWS": "*"
},
"Sid": "RequiredSecureTransport",
"Action": "s3:*",
"Resource": [
f"arn:aws:s3:::{s3_bucket_name}",
f"arn:aws:s3:::{s3_bucket_name}/*"
],
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
}
]
}
return policy


class S3Client:
def __init__(self, account_id, region):
Expand Down
1 change: 1 addition & 0 deletions backend/dataall/modules/dataset_sharing/db/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class ShareableType(Enum):
Table = 'DatasetTable'
StorageLocation = 'DatasetStorageLocation'
View = 'View'
S3Bucket = 'S3Bucket'


class PrincipalType(Enum):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
ShareItemStatus, ShareableType, PrincipalType
from dataall.modules.dataset_sharing.db.share_object_models import ShareObjectItem, ShareObject
from dataall.modules.datasets_base.db.dataset_repositories import DatasetRepository
from dataall.modules.datasets_base.db.dataset_models import DatasetStorageLocation, DatasetTable, Dataset
from dataall.modules.datasets_base.db.dataset_models import DatasetStorageLocation, DatasetTable, Dataset, DatasetBucket

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -356,6 +356,8 @@ def get_share_item(session, item_type, item_uri):
return session.query(DatasetTable).get(item_uri)
if item_type == ShareableType.StorageLocation.value:
return session.query(DatasetStorageLocation).get(item_uri)
if item_type == ShareableType.S3Bucket.value:
return session.query(DatasetBucket).get(item_uri)

@staticmethod
def get_share_by_uri(session, uri):
Expand Down Expand Up @@ -525,7 +527,33 @@ def list_shareable_items(session, share, states, data):
if states:
locations = locations.filter(ShareObjectItem.status.in_(states))

shareable_objects = tables.union(locations).subquery('shareable_objects')
s3_buckets = (
session.query(
DatasetBucket.bucketUri.label('itemUri'),
func.coalesce('S3Bucket').label('itemType'),
DatasetBucket.S3BucketName.label('itemName'),
DatasetBucket.description.label('description'),
ShareObjectItem.shareItemUri.label('shareItemUri'),
ShareObjectItem.status.label('status'),
case(
[(ShareObjectItem.shareItemUri.isnot(None), True)],
else_=False,
).label('isShared'),
)
.outerjoin(
ShareObjectItem,
and_(
ShareObjectItem.shareUri == share.shareUri,
DatasetBucket.bucketUri
== ShareObjectItem.itemUri,
),
)
.filter(DatasetBucket.datasetUri == share.datasetUri)
)
if states:
s3_buckets = s3_buckets.filter(ShareObjectItem.status.in_(states))

shareable_objects = tables.union(locations, s3_buckets).subquery('shareable_objects')
query = session.query(shareable_objects)

if data:
Expand Down Expand Up @@ -732,9 +760,14 @@ def get_share_data_items(session, share_uri, status):
session, share, status, DatasetStorageLocation, DatasetStorageLocation.locationUri
)

s3_buckets = ShareObjectRepository._find_all_share_item(
session, share, status, DatasetBucket, DatasetBucket.bucketUri
)

return (
tables,
folders,
s3_buckets,
)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import logging

from dataall.modules.dataset_sharing.services.share_processors.lf_process_cross_account_share import ProcessLFCrossAccountShare
from dataall.modules.dataset_sharing.services.share_processors.lf_process_same_account_share import ProcessLFSameAccountShare
from dataall.modules.dataset_sharing.services.share_processors.s3_process_share import ProcessS3Share
from dataall.modules.dataset_sharing.services.share_processors.lf_process_cross_account_share import \
ProcessLFCrossAccountShare
from dataall.modules.dataset_sharing.services.share_processors.lf_process_same_account_share import \
ProcessLFSameAccountShare
from dataall.modules.dataset_sharing.services.share_processors.s3_access_point_process_share import \
ProcessS3AccessPointShare
from dataall.modules.dataset_sharing.services.share_processors.s3_bucket_process_share import ProcessS3BucketShare

from dataall.base.db import Engine
from dataall.modules.dataset_sharing.db.enums import ShareObjectActions, ShareItemStatus, ShareableType
from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectSM, ShareObjectRepository, ShareItemSM
from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectSM, ShareObjectRepository, \
ShareItemSM

log = logging.getLogger(__name__)

Expand All @@ -21,8 +26,9 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool:
1) Updates share object State Machine with the Action: Start
2) Retrieves share data and items in Share_Approved state
3) Calls sharing folders processor to grant share
4) Calls sharing tables processor for same or cross account sharing to grant share
5) Updates share object State Machine with the Action: Finish
4) Calls sharing buckets processor to grant share
5) Calls sharing tables processor for same or cross account sharing to grant share
6) Updates share object State Machine with the Action: Finish

Parameters
----------
Expand Down Expand Up @@ -50,12 +56,13 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool:

(
shared_tables,
shared_folders
shared_folders,
shared_buckets
) = ShareObjectRepository.get_share_data_items(session, share_uri, ShareItemStatus.Share_Approved.value)

log.info(f'Granting permissions to folders: {shared_folders}')

approved_folders_succeed = ProcessS3Share.process_approved_shares(
approved_folders_succeed = ProcessS3AccessPointShare.process_approved_shares(
session,
dataset,
share,
Expand All @@ -67,6 +74,20 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool:
)
log.info(f'sharing folders succeeded = {approved_folders_succeed}')

log.info('Granting permissions to S3 buckets')

approved_s3_buckets_succeed = ProcessS3BucketShare.process_approved_shares(
session,
dataset,
share,
shared_buckets,
source_environment,
target_environment,
source_env_group,
env_group
)
log.info(f'sharing s3 buckets succeeded = {approved_s3_buckets_succeed}')

if source_environment.AwsAccountId != target_environment.AwsAccountId:
processor = ProcessLFCrossAccountShare(
session,
Expand Down Expand Up @@ -97,7 +118,7 @@ def approve_share(cls, engine: Engine, share_uri: str) -> bool:
new_share_state = share_sm.run_transition(ShareObjectActions.Finish.value)
share_sm.update_state(session, share, new_share_state)

return approved_tables_succeed if approved_folders_succeed else False
return approved_folders_succeed and approved_s3_buckets_succeed and approved_tables_succeed

@classmethod
def revoke_share(cls, engine: Engine, share_uri: str):
Expand All @@ -108,7 +129,8 @@ def revoke_share(cls, engine: Engine, share_uri: str):
4) Checks if remaining folders are shared and effectuates clean up with folders processor
5) Calls sharing tables processor for same or cross account sharing to revoke share
6) Checks if remaining tables are shared and effectuates clean up with tables processor
7) Updates share object State Machine with the Action: Finish
7) Calls sharing buckets processor to revoke share
8) Updates share object State Machine with the Action: Finish

Parameters
----------
Expand Down Expand Up @@ -139,15 +161,16 @@ def revoke_share(cls, engine: Engine, share_uri: str):

(
revoked_tables,
revoked_folders
revoked_folders,
revoked_buckets
) = ShareObjectRepository.get_share_data_items(session, share_uri, ShareItemStatus.Revoke_Approved.value)

new_state = revoked_item_sm.run_transition(ShareObjectActions.Start.value)
revoked_item_sm.update_state(session, share_uri, new_state)

log.info(f'Revoking permissions to folders: {revoked_folders}')

revoked_folders_succeed = ProcessS3Share.process_revoked_shares(
revoked_folders_succeed = ProcessS3AccessPointShare.process_revoked_shares(
session,
dataset,
share,
Expand All @@ -166,13 +189,27 @@ def revoke_share(cls, engine: Engine, share_uri: str):
log.info(f'Still remaining S3 resources shared = {existing_shared_items}')
if not existing_shared_items and revoked_folders:
log.info("Clean up S3 access points...")
clean_up_folders = ProcessS3Share.clean_up_share(
clean_up_folders = ProcessS3AccessPointShare.clean_up_share(
dataset=dataset,
share=share,
target_environment=target_environment
)
log.info(f"Clean up S3 successful = {clean_up_folders}")

log.info('Revoking permissions to S3 buckets')

revoked_s3_buckets_succeed = ProcessS3BucketShare.process_revoked_shares(
session,
dataset,
share,
revoked_buckets,
source_environment,
target_environment,
source_env_group,
env_group,
)
log.info(f'revoking s3 buckets succeeded = {revoked_s3_buckets_succeed}')

if source_environment.AwsAccountId != target_environment.AwsAccountId:
processor = ProcessLFCrossAccountShare(
session,
Expand Down Expand Up @@ -217,4 +254,4 @@ def revoke_share(cls, engine: Engine, share_uri: str):
new_share_state = share_sm.run_transition(ShareObjectActions.Finish.value)
share_sm.update_state(session, share, new_share_state)

return revoked_tables_succeed and revoked_folders_succeed
return revoked_folders_succeed and revoked_s3_buckets_succeed and revoked_tables_succeed
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from dataall.core.environment.db.environment_models import Environment
from dataall.modules.dataset_sharing.db.share_object_models import ShareObject
from dataall.modules.datasets_base.db.dataset_models import DatasetTable, Dataset, DatasetStorageLocation
from dataall.modules.datasets_base.db.dataset_models import DatasetTable, Dataset, DatasetStorageLocation, DatasetBucket
from dataall.base.utils.alarm_service import AlarmService

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -147,5 +147,48 @@ def trigger_revoke_folder_sharing_failure_alarm(
Share Target
- AWS Account: {target_environment.AwsAccountId}
- Region: {target_environment.region}
"""
return self.publish_message_to_alarms_topic(subject, message)

def trigger_s3_bucket_sharing_failure_alarm(
self,
bucket: DatasetBucket,
share: ShareObject,
target_environment: Environment,
):
alarm_type = "Share"
return self.handle_bucket_sharing_failure(bucket, share, target_environment, alarm_type)

def trigger_revoke_s3_bucket_sharing_failure_alarm(
self,
bucket: DatasetBucket,
share: ShareObject,
target_environment: Environment,
):
alarm_type = "Sharing Revoke"
return self.handle_bucket_sharing_failure(bucket, share, target_environment, alarm_type)

def handle_bucket_sharing_failure(self, bucket: DatasetBucket,
share: ShareObject,
target_environment: Environment,
alarm_type: str):
log.info(f'Triggering {alarm_type} failure alarm...')
subject = (
f'ALARM: DATAALL S3 Bucket {bucket.S3BucketName} {alarm_type} Failure Notification'
)
message = f"""
You are receiving this email because your DATAALL {self.envname} environment in the {self.region} region has entered the ALARM state, because it failed to {alarm_type} the S3 Bucket {bucket.S3BucketName}.
Alarm Details:
- State Change: OK -> ALARM
- Reason for State Change: S3 Bucket {alarm_type} failure
- Timestamp: {datetime.now()}
Share Source
- Dataset URI: {share.datasetUri}
- AWS Account: {bucket.AwsAccountId}
- Region: {bucket.region}
- S3 Bucket: {bucket.S3BucketName}
Share Target
- AWS Account: {target_environment.AwsAccountId}
- Region: {target_environment.region}
"""
return self.publish_message_to_alarms_topic(subject, message)
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .s3_share_manager import S3ShareManager
from .s3_access_point_share_manager import S3AccessPointShareManager
from .lf_share_manager import LFShareManager
from .s3_bucket_share_manager import S3BucketShareManager
Loading