Skip to content

Commit

Permalink
Implement filter_permitted_menu_items in AWS auth manager (apache#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
vincbeck authored and utkarsharma2 committed Apr 22, 2024
1 parent 8afe07c commit 88be328
Show file tree
Hide file tree
Showing 9 changed files with 517 additions and 43 deletions.
2 changes: 1 addition & 1 deletion airflow/auth/managers/base_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def _is_permitted_dag_id(method: ResourceMethod, methods: Container[ResourceMeth
if _is_permitted_dag_id("GET", methods, dag_id) or _is_permitted_dag_id("PUT", methods, dag_id)
}

def get_permitted_menu_items(self, menu_items: list[MenuItem]) -> list[MenuItem]:
def filter_permitted_menu_items(self, menu_items: list[MenuItem]) -> list[MenuItem]:
"""
Filter menu items based on user permissions.
Expand Down
134 changes: 98 additions & 36 deletions airflow/providers/amazon/aws/auth_manager/avp/facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
from airflow.providers.amazon.aws.auth_manager.user import AwsAuthManagerUser


# Amazon Verified Permissions allows only up to 30 requests per batch_is_authorized call. See
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/verifiedpermissions/client/batch_is_authorized.html
NB_REQUESTS_PER_BATCH = 30


class IsAuthorizedRequest(TypedDict, total=False):
"""Represent the parameters of ``is_authorized`` method in AVP facade."""

Expand Down Expand Up @@ -125,62 +130,97 @@ def is_authorized(

return resp["decision"] == "ALLOW"

def batch_is_authorized(
def get_batch_is_authorized_results(
self,
*,
requests: Sequence[IsAuthorizedRequest],
user: AwsAuthManagerUser | None,
) -> bool:
user: AwsAuthManagerUser,
) -> list[dict]:
"""
Make a batch authorization decision against Amazon Verified Permissions.
Check whether the user has permissions to access given resources.
Return a list of results for each request.
:param requests: the list of requests containing the method, the entity_type and the entity ID
:param user: the user
"""
if user is None:
return False

entity_list = self._get_user_role_entities(user)

self.log.debug("Making batch authorization request for user=%s, requests=%s", user.get_id(), requests)

avp_requests = [
prune_dict(
{
"principal": {"entityType": get_entity_type(AvpEntities.USER), "entityId": user.get_id()},
"action": {
"actionType": get_entity_type(AvpEntities.ACTION),
"actionId": get_action_id(request["entity_type"], request["method"]),
},
"resource": {
"entityType": get_entity_type(request["entity_type"]),
"entityId": request.get("entity_id", "*"),
},
"context": self._build_context(request.get("context")),
}
)
for request in requests
avp_requests = [self._build_is_authorized_request_payload(request, user) for request in requests]
avp_requests_chunks = [
avp_requests[i : i + NB_REQUESTS_PER_BATCH]
for i in range(0, len(avp_requests), NB_REQUESTS_PER_BATCH)
]

resp = self.avp_client.batch_is_authorized(
policyStoreId=self.avp_policy_store_id,
requests=avp_requests,
entities={"entityList": entity_list},
)
results = []
for avp_requests in avp_requests_chunks:
resp = self.avp_client.batch_is_authorized(
policyStoreId=self.avp_policy_store_id,
requests=avp_requests,
entities={"entityList": entity_list},
)

self.log.debug("Authorization response: %s", resp)
self.log.debug("Authorization response: %s", resp)

has_errors = any(len(result.get("errors", [])) > 0 for result in resp["results"])
has_errors = any(len(result.get("errors", [])) > 0 for result in resp["results"])

if has_errors:
self.log.error(
"Error occurred while making a batch authorization decision. Result: %s", resp["results"]
)
raise AirflowException("Error occurred while making a batch authorization decision.")
if has_errors:
self.log.error(
"Error occurred while making a batch authorization decision. Result: %s", resp["results"]
)
raise AirflowException("Error occurred while making a batch authorization decision.")

results.extend(resp["results"])

return results

def batch_is_authorized(
self,
*,
requests: Sequence[IsAuthorizedRequest],
user: AwsAuthManagerUser | None,
) -> bool:
"""
Make a batch authorization decision against Amazon Verified Permissions.
Check whether the user has permissions to access all resources.
:param requests: the list of requests containing the method, the entity_type and the entity ID
:param user: the user
"""
if user is None:
return False
results = self.get_batch_is_authorized_results(requests=requests, user=user)
return all(result["decision"] == "ALLOW" for result in results)

def get_batch_is_authorized_single_result(
self,
*,
batch_is_authorized_results: list[dict],
request: IsAuthorizedRequest,
user: AwsAuthManagerUser,
) -> dict:
"""
Get a specific authorization result from the output of ``get_batch_is_authorized_results``.
return all(result["decision"] == "ALLOW" for result in resp["results"])
:param batch_is_authorized_results: the response from the ``batch_is_authorized`` API
:param request: the request information. Used to find the result in the response.
:param user: the user
"""
request_payload = self._build_is_authorized_request_payload(request, user)

for result in batch_is_authorized_results:
if result["request"] == request_payload:
return result

self.log.error(
"Could not find the authorization result for request %s in results %s.",
request_payload,
batch_is_authorized_results,
)
raise AirflowException("Could not find the authorization result.")

@staticmethod
def _get_user_role_entities(user: AwsAuthManagerUser) -> list[dict]:
Expand All @@ -205,3 +245,25 @@ def _build_context(context: dict | None) -> dict | None:
return {
"contextMap": context,
}

def _build_is_authorized_request_payload(self, request: IsAuthorizedRequest, user: AwsAuthManagerUser):
"""
Build a payload of an individual authorization request that could be sent through the ``batch_is_authorized`` API.
:param request: the request information
:param user: the user
"""
return prune_dict(
{
"principal": {"entityType": get_entity_type(AvpEntities.USER), "entityId": user.get_id()},
"action": {
"actionType": get_entity_type(AvpEntities.ACTION),
"actionId": get_action_id(request["entity_type"], request["method"]),
},
"resource": {
"entityType": get_entity_type(request["entity_type"]),
"entityId": request.get("entity_id", "*"),
},
"context": self._build_context(request.get("context")),
}
)

0 comments on commit 88be328

Please sign in to comment.