Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ __pycache__
dist
.DS_Store
*egg-info*
.DS_Store
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = aws-data-mesh-utils
version = 1.0.15
version = 1.0.16
author = Ian Meyers
author_email = meyersi@amazon.co.uk
license = Apache 2.0
Expand Down
49 changes: 49 additions & 0 deletions src/data_mesh_util/DataMeshAdmin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
import logging
import botocore.session
import json

from data_mesh_util.lib.constants import *
import data_mesh_util.lib.utils as utils
Expand Down Expand Up @@ -202,6 +203,51 @@ def _api_tuple(self, item_tuple: tuple):
"GroupArn": item_tuple[2]
}

def _allow_glue_ram_integration(self):
# update the glue resource policy to allow RAM integration
cf = {
"region": self._region,
"account_id": self._data_mesh_account_id
}
optin_fragment = json.loads(utils.generate_policy('glue_ram_optin.pystache', config=cf))

current_policy, current_hash = self._automator.get_current_glue_policy()

if current_policy is not None:
# scan the existing glue policy to see if we've already added the clause
optin_found = False
for s in current_policy.get('Statement'):
# break early if we see a condition - this is a tbac statement
if 'Condition' in s:
continue

if s.get('Action') == 'glue:ShareResource' and 'Service' in s.get('Principal') and \
s.get('Principal').get('Service') == 'ram.amazonaws.com':
optin_found = True
self._logger.info("Already found Glue/RAM optin policy for LF sharing")
break

# add the optin fragment if it's not already there
if not optin_found:
current_policy['Statement'].append(optin_fragment)

# update the policy
self._automator.write_glue_catalog_resource_policy(
policy=current_policy,
current_hash=current_hash
)
self._logger.info("Created new Glue/RAM optin policy for LF sharing")
else:
# write a new policy
glue_policy = {
"Version": "2012-10-17",
"Statement": optin_fragment
}
self._automator.write_glue_catalog_resource_policy(
policy=glue_policy
)
self._logger.info("Created new Glue/RAM optin policy for LF sharing")

def initialize_mesh_account(self):
'''
Sets up an AWS Account to act as a Data Mesh central account. This method should be invoked by an Administrator
Expand All @@ -222,6 +268,9 @@ def initialize_mesh_account(self):
# create a new IAM role in the Data Mesh Account to be used for future grants
mgr_tuple = self._create_data_mesh_manager_role()

# setup the account to allow glue:ShareResource through RAM
self._allow_glue_ram_integration()

return {
"Manager": self._api_tuple(mgr_tuple),
"ReadOnly": self._api_tuple(ro_tuple),
Expand Down
2 changes: 1 addition & 1 deletion src/data_mesh_util/DataMeshProducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ def approve_access_request(self, request_id: str,
)

def _add_principal_to_glue_resource_policy(self, database_name: str, tables: list, add_principal: str):
self._mesh_automator.update_glue_catalog_resource_policy(
self._mesh_automator.add_tbac_glue_catalog_resource_policy(
region=self._current_region,
database_name=database_name,
tables=tables,
Expand Down
70 changes: 48 additions & 22 deletions src/data_mesh_util/lib/ApiAutomator.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,15 +520,52 @@ def _no_data():

return all_tables

def update_glue_catalog_resource_policy(self, region: str, producer_account_id: str, consumer_account_id: str,
database_name: str, tables: list):
def write_glue_catalog_resource_policy(self, policy: dict, current_hash: str = None):
'''
Write a new glue catalog policy document. This is a low level interface that just performs the mechanic of
correctly writing the supplied policy, including where a hash must be supplied.
:param policy:
:param current_hash:
:return:
'''
glue_client = self._get_client('glue')
new_resource_policy = None
current_resource_policy = None

try:
current_resource_policy = glue_client.get_resource_policy()

# if no external hash has been provided, then just use the current hash from the doc.
if current_hash is None:
current_hash = current_resource_policy.get('PolicyHash')

glue_client.put_resource_policy(
PolicyInJson=json.dumps(policy),
PolicyHashCondition=current_hash,
PolicyExistsCondition='MUST_EXIST',
EnableHybrid='TRUE'
)
except glue_client.exceptions.EntityNotFoundException:
pass
# write the resource policy as new
glue_client.put_resource_policy(
PolicyInJson=json.dumps(policy),
PolicyExistsCondition='NOT_EXIST',
EnableHybrid='TRUE'
)

def get_current_glue_policy(self):
glue_client = self._get_client('glue')

try:
current_resource_policy = glue_client.get_resource_policy()
glue_policy = json.loads(current_resource_policy.get('PolicyInJson'))
current_hash = current_resource_policy.get('PolicyHash')

return glue_policy, current_hash
except glue_client.exceptions.EntityNotFoundException:
return None, None

def add_tbac_glue_catalog_resource_policy(self, region: str, producer_account_id: str, consumer_account_id: str,
database_name: str, tables: list):
current_resource_policy, current_hash = self.get_current_glue_policy()

cf = {
'region': region,
Expand All @@ -542,44 +579,33 @@ def update_glue_catalog_resource_policy(self, region: str, producer_account_id:
cf['table_list'] = tables
policy = json.loads(utils.generate_policy('lf_cross_account_tbac.pystache', config=cf))

policy_condition = None
if current_resource_policy is None:
new_resource_policy = {
"Version": "2012-10-17",
"Statement": policy
}
glue_client.put_resource_policy(
PolicyInJson=json.dumps(new_resource_policy),
PolicyExistsCondition='NOT_EXIST',
EnableHybrid='TRUE'
self.write_glue_catalog_resource_policy(
policy=new_resource_policy
)
self._logger.info(
f"Created new Catalog Resource Policy on {producer_account_id} allowing Tag Based Access by {consumer_account_id}")
else:
new_resource_policy = json.loads(current_resource_policy.get('PolicyInJson'))
current_hash = current_resource_policy.get('PolicyHash')

update_statement, policy_index, did_modification = self._get_glue_resource_policy_statement_to_modify(
region=region,
policy=new_resource_policy, producer_account_id=producer_account_id,
policy=current_resource_policy, producer_account_id=producer_account_id,
consumer_account_id=consumer_account_id,
database_name=database_name, tables=tables
)

# add the new statement
if update_statement is None:
new_resource_policy['Statement'].append(policy)
current_resource_policy['Statement'].append(policy)
did_modification = True
elif update_statement is not None:
new_resource_policy['Statement'][policy_index] = update_statement
current_resource_policy['Statement'][policy_index] = update_statement

if did_modification is True:
glue_client.put_resource_policy(
PolicyInJson=json.dumps(new_resource_policy),
PolicyHashCondition=current_hash,
PolicyExistsCondition='MUST_EXIST',
EnableHybrid='TRUE'
)
self.write_glue_catalog_resource_policy(current_hash=current_hash, policy=current_resource_policy)
self._logger.info(
f"Updated Catalog Resource Policy on {producer_account_id} allowing Tag Based Access by {consumer_account_id}")

Expand Down
16 changes: 16 additions & 0 deletions src/data_mesh_util/resource/glue_ram_optin.pystache
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"Effect": "Allow",
"Action": [
"glue:ShareResource"
],
"Principal": {
"Service": [
"ram.amazonaws.com"
]
},
"Resource": [
"arn:aws:glue:{{region}}:{{account_id}}:table/*/*",
"arn:aws:glue:{{region}}:{{account_id}}:database/*",
"arn:aws:glue:{{region}}:{{account_id}}:catalog"
]
}