Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement filter_permitted_menu_items in AWS auth manager #37627

Merged
merged 5 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/auth/managers/base_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def _is_permitted_dag_id(method: ResourceMethod, methods: Container[ResourceMeth
if _is_permitted_dag_id("GET", methods, dag_id) or _is_permitted_dag_id("PUT", methods, dag_id)
}

def get_permitted_menu_items(self, menu_items: list[MenuItem]) -> list[MenuItem]:
def filter_permitted_menu_items(self, menu_items: list[MenuItem]) -> list[MenuItem]:
potiuk marked this conversation as resolved.
Show resolved Hide resolved
"""
Filter menu items based on user permissions.

Expand Down
134 changes: 98 additions & 36 deletions airflow/providers/amazon/aws/auth_manager/avp/facade.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
from airflow.providers.amazon.aws.auth_manager.user import AwsAuthManagerUser


# Amazon Verified Permissions allows only up to 30 requests per batch_is_authorized call. See
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/verifiedpermissions/client/batch_is_authorized.html
NB_REQUESTS_PER_BATCH = 30


class IsAuthorizedRequest(TypedDict, total=False):
"""Represent the parameters of ``is_authorized`` method in AVP facade."""

Expand Down Expand Up @@ -125,62 +130,97 @@ def is_authorized(

return resp["decision"] == "ALLOW"

def batch_is_authorized(
def get_batch_is_authorized_results(
self,
*,
requests: Sequence[IsAuthorizedRequest],
user: AwsAuthManagerUser | None,
) -> bool:
user: AwsAuthManagerUser,
) -> list[dict]:
"""
Make a batch authorization decision against Amazon Verified Permissions.

Check whether the user has permissions to access given resources.
Return a list of results for each request.

:param requests: the list of requests containing the method, the entity_type and the entity ID
:param user: the user
"""
if user is None:
return False

entity_list = self._get_user_role_entities(user)

self.log.debug("Making batch authorization request for user=%s, requests=%s", user.get_id(), requests)

avp_requests = [
prune_dict(
{
"principal": {"entityType": get_entity_type(AvpEntities.USER), "entityId": user.get_id()},
"action": {
"actionType": get_entity_type(AvpEntities.ACTION),
"actionId": get_action_id(request["entity_type"], request["method"]),
},
"resource": {
"entityType": get_entity_type(request["entity_type"]),
"entityId": request.get("entity_id", "*"),
},
"context": self._build_context(request.get("context")),
}
)
for request in requests
avp_requests = [self._build_is_authorized_request_payload(request, user) for request in requests]
avp_requests_chunks = [
avp_requests[i : i + NB_REQUESTS_PER_BATCH]
for i in range(0, len(avp_requests), NB_REQUESTS_PER_BATCH)
]

resp = self.avp_client.batch_is_authorized(
policyStoreId=self.avp_policy_store_id,
requests=avp_requests,
entities={"entityList": entity_list},
)
results = []
for avp_requests in avp_requests_chunks:
resp = self.avp_client.batch_is_authorized(
policyStoreId=self.avp_policy_store_id,
requests=avp_requests,
entities={"entityList": entity_list},
)

self.log.debug("Authorization response: %s", resp)
self.log.debug("Authorization response: %s", resp)

has_errors = any(len(result.get("errors", [])) > 0 for result in resp["results"])
has_errors = any(len(result.get("errors", [])) > 0 for result in resp["results"])

if has_errors:
self.log.error(
"Error occurred while making a batch authorization decision. Result: %s", resp["results"]
)
raise AirflowException("Error occurred while making a batch authorization decision.")
if has_errors:
self.log.error(
"Error occurred while making a batch authorization decision. Result: %s", resp["results"]
)
raise AirflowException("Error occurred while making a batch authorization decision.")

results.extend(resp["results"])

return results

def batch_is_authorized(
self,
*,
requests: Sequence[IsAuthorizedRequest],
user: AwsAuthManagerUser | None,
) -> bool:
"""
Make a batch authorization decision against Amazon Verified Permissions.

Check whether the user has permissions to access all resources.

:param requests: the list of requests containing the method, the entity_type and the entity ID
:param user: the user
"""
if user is None:
return False
results = self.get_batch_is_authorized_results(requests=requests, user=user)
return all(result["decision"] == "ALLOW" for result in results)

def get_batch_is_authorized_single_result(
self,
*,
batch_is_authorized_results: list[dict],
request: IsAuthorizedRequest,
user: AwsAuthManagerUser,
) -> dict:
"""
Get a specific authorization result from the output of ``get_batch_is_authorized_results``.

return all(result["decision"] == "ALLOW" for result in resp["results"])
:param batch_is_authorized_results: the response from the ``batch_is_authorized`` API
:param request: the request information. Used to find the result in the response.
:param user: the user
"""
request_payload = self._build_is_authorized_request_payload(request, user)

for result in batch_is_authorized_results:
if result["request"] == request_payload:
return result

self.log.error(
"Could not find the authorization result for request %s in results %s.",
request_payload,
batch_is_authorized_results,
)
raise AirflowException("Could not find the authorization result.")

@staticmethod
def _get_user_role_entities(user: AwsAuthManagerUser) -> list[dict]:
Expand All @@ -205,3 +245,25 @@ def _build_context(context: dict | None) -> dict | None:
return {
"contextMap": context,
}

def _build_is_authorized_request_payload(self, request: IsAuthorizedRequest, user: AwsAuthManagerUser):
"""
Build a payload of an individual authorization request that could be sent through the ``batch_is_authorized`` API.

:param request: the request information
:param user: the user
"""
return prune_dict(
{
"principal": {"entityType": get_entity_type(AvpEntities.USER), "entityId": user.get_id()},
"action": {
"actionType": get_entity_type(AvpEntities.ACTION),
"actionId": get_action_id(request["entity_type"], request["method"]),
},
"resource": {
"entityType": get_entity_type(request["entity_type"]),
"entityId": request.get("entity_id", "*"),
},
"context": self._build_context(request.get("context")),
}
)