Skip to content

Commit

Permalink
Fix permission issue for dag that has dot in name (#23510)
Browse files Browse the repository at this point in the history
How we determine if a DAG is a subdag in airflow.security.permissions.resource_name_for_dag is not right.
If a dag_id contains a dot, the permission is not recorded correctly.

The current solution makes a query every time we check for permission for dags that has a dot in the name. Not that I like it but I think it's better than other options I considered such as changing how we name dags for subdag. That's not
good in UX. Another option I considered was making a query when parsing, that's not good and it's avoided
by passing root_dag to resource_name_for_dag

Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
  • Loading branch information
3 people committed Jun 8, 2022
1 parent ae3e68a commit cc35fca
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 27 deletions.
8 changes: 5 additions & 3 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,8 @@ def _sync_perm_for_dag(self, dag, session: Session = None):
from airflow.security.permissions import DAG_ACTIONS, resource_name_for_dag
from airflow.www.fab_security.sqla.models import Action, Permission, Resource

root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id

def needs_perms(dag_id: str) -> bool:
dag_resource_name = resource_name_for_dag(dag_id)
for permission_name in DAG_ACTIONS:
Expand All @@ -654,9 +656,9 @@ def needs_perms(dag_id: str) -> bool:
return True
return False

if dag.access_control or needs_perms(dag.dag_id):
self.log.debug("Syncing DAG permissions: %s to the DB", dag.dag_id)
if dag.access_control or needs_perms(root_dag_id):
self.log.debug("Syncing DAG permissions: %s to the DB", root_dag_id)
from airflow.www.security import ApplessAirflowSecurityManager

security_manager = ApplessAirflowSecurityManager(session=session)
security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control)
security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
19 changes: 10 additions & 9 deletions airflow/security/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@
DAG_ACTIONS = {ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE}


def resource_name_for_dag(dag_id):
"""Returns the resource name for a DAG id."""
if dag_id == RESOURCE_DAG:
return dag_id
def resource_name_for_dag(root_dag_id: str) -> str:
"""Returns the resource name for a DAG id.
if dag_id.startswith(RESOURCE_DAG_PREFIX):
return dag_id

# To account for SubDags
root_dag_id = dag_id.split(".")[0]
Note that since a sub-DAG should follow the permission of its
parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
for a subdag. A normal dag should pass the ``DagModel.dag_id``.
"""
if root_dag_id == RESOURCE_DAG:
return root_dag_id
if root_dag_id.startswith(RESOURCE_DAG_PREFIX):
return root_dag_id
return f"{RESOURCE_DAG_PREFIX}{root_dag_id}"
29 changes: 23 additions & 6 deletions airflow/www/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ def __init__(self, appbuilder):
view.datamodel = CustomSQLAInterface(view.datamodel.obj)
self.perms = None

def _get_root_dag_id(self, dag_id):
if '.' in dag_id:
dm = (
self.get_session.query(DagModel.dag_id, DagModel.root_dag_id)
.filter(DagModel.dag_id == dag_id)
.first()
)
return dm.root_dag_id or dm.dag_id
return dag_id

def init_role(self, role_name, perms):
"""
Initialize the role with actions and related resources.
Expand Down Expand Up @@ -340,7 +350,8 @@ def get_accessible_dag_ids(self, user, user_actions=None, session=None) -> Set[s
def can_access_some_dags(self, action: str, dag_id: Optional[str] = None) -> bool:
"""Checks if user has read or write access to some dags."""
if dag_id and dag_id != '~':
return self.has_access(action, permissions.resource_name_for_dag(dag_id))
root_dag_id = self._get_root_dag_id(dag_id)
return self.has_access(action, permissions.resource_name_for_dag(root_dag_id))

user = g.user
if action == permissions.ACTION_CAN_READ:
Expand All @@ -349,17 +360,20 @@ def can_access_some_dags(self, action: str, dag_id: Optional[str] = None) -> boo

def can_read_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG read access."""
dag_resource_name = permissions.resource_name_for_dag(dag_id)
root_dag_id = self._get_root_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user)

def can_edit_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG edit access."""
dag_resource_name = permissions.resource_name_for_dag(dag_id)
root_dag_id = self._get_root_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user)

def can_delete_dag(self, dag_id, user=None) -> bool:
"""Determines whether a user has DAG delete access."""
dag_resource_name = permissions.resource_name_for_dag(dag_id)
root_dag_id = self._get_root_dag_id(dag_id)
dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
return self.has_access(permissions.ACTION_CAN_DELETE, dag_resource_name, user=user)

def prefixed_dag_id(self, dag_id):
Expand All @@ -370,7 +384,8 @@ def prefixed_dag_id(self, dag_id):
DeprecationWarning,
stacklevel=2,
)
return permissions.resource_name_for_dag(dag_id)
root_dag_id = self._get_root_dag_id(dag_id)
return permissions.resource_name_for_dag(root_dag_id)

def is_dag_resource(self, resource_name):
"""Determines if a resource belongs to a DAG or all DAGs."""
Expand Down Expand Up @@ -530,7 +545,8 @@ def create_dag_specific_permissions(self) -> None:
dags = dagbag.dags.values()

for dag in dags:
dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id
dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
for action_name in self.DAG_ACTIONS:
if (action_name, dag_resource_name) not in perms:
self._merge_perm(action_name, dag_resource_name)
Expand Down Expand Up @@ -615,6 +631,7 @@ def _sync_dag_view_permissions(self, dag_id, access_control):
:param access_control: a dict where each key is a rolename and
each value is a set() of action names (e.g. {'can_read'})
"""

dag_resource_name = permissions.resource_name_for_dag(dag_id)

def _get_or_create_dag_permission(action_name: str) -> Optional[Permission]:
Expand Down
66 changes: 57 additions & 9 deletions tests/www/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ def sample_dags(security_manager):
@pytest.fixture(scope="module")
def has_dag_perm(security_manager):
def _has_dag_perm(perm, dag_id, user):
return security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user)
root_dag_id = security_manager._get_root_dag_id(dag_id)
return security_manager.has_access(perm, permissions.resource_name_for_dag(root_dag_id), user)

return _has_dag_perm

Expand Down Expand Up @@ -351,7 +352,7 @@ def test_verify_anon_user_with_admin_role_has_access_to_each_dag(
user.roles = security_manager.get_user_roles(user)
assert user.roles == {security_manager.get_public_role()}

test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"]
test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3", "test_dag_id_4.with_dot"]

for dag_id in test_dag_ids:
with _create_dag_model_context(dag_id, session, security_manager):
Expand Down Expand Up @@ -588,7 +589,8 @@ def test_access_control_with_invalid_permission(app, security_manager):
for action in invalid_actions:
with pytest.raises(AirflowException) as ctx:
security_manager._sync_dag_view_permissions(
'access_control_test', access_control={rolename: {action}}
'access_control_test',
access_control={rolename: {action}},
)
assert "invalid permissions" in str(ctx.value)

Expand Down Expand Up @@ -728,11 +730,13 @@ def test_create_dag_specific_permissions(session, security_manager, monkeypatch,
assert ('can_edit', dag_resource_name) in all_perms

security_manager._sync_dag_view_permissions.assert_called_once_with(
permissions.resource_name_for_dag('has_access_control'), access_control
permissions.resource_name_for_dag('has_access_control'),
access_control,
)

del dagbag_mock.dags["has_access_control"]
with assert_queries_count(1): # one query to get all perms; dagbag is mocked
with assert_queries_count(2): # two query to get all perms; dagbag is mocked
# The extra query happens at permission check
security_manager.create_dag_specific_permissions()


Expand Down Expand Up @@ -782,10 +786,12 @@ def test_prefixed_dag_id_is_deprecated(security_manager):
security_manager.prefixed_dag_id("hello")


def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms):
def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms, session):
username = 'dag_permission_user'
role_name = 'dag_permission_role'
parent_dag_name = "parent_dag"
subdag_name = parent_dag_name + ".subdag"
subsubdag_name = parent_dag_name + ".subdag.subsubdag"
with app.app_context():
mock_roles = [
{
Expand All @@ -801,15 +807,57 @@ def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_
username=username,
role_name=role_name,
) as user:
dag1 = DagModel(dag_id=parent_dag_name)
dag2 = DagModel(dag_id=subdag_name, is_subdag=True, root_dag_id=parent_dag_name)
dag3 = DagModel(dag_id=subsubdag_name, is_subdag=True, root_dag_id=parent_dag_name)
session.add_all([dag1, dag2, dag3])
session.commit()
security_manager.bulk_sync_roles(mock_roles)
security_manager._sync_dag_view_permissions(
parent_dag_name, access_control={role_name: READ_WRITE}
)
for dag in [dag1, dag2, dag3]:
security_manager._sync_dag_view_permissions(
parent_dag_name, access_control={role_name: READ_WRITE}
)

assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name, user=user)
assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name + ".subdag", user=user)
assert_user_has_dag_perms(
perms=READ_WRITE, dag_id=parent_dag_name + ".subdag.subsubdag", user=user
)
session.query(DagModel).delete()


def test_permissions_work_for_dags_with_dot_in_dagname(
app, security_manager, assert_user_has_dag_perms, assert_user_does_not_have_dag_perms, session
):
username = 'dag_permission_user'
role_name = 'dag_permission_role'
dag_id = "dag_id_1"
dag_id_2 = "dag_id_1.with_dot"
with app.app_context():
mock_roles = [
{
'role': role_name,
'perms': [
(permissions.ACTION_CAN_READ, f"DAG:{dag_id}"),
(permissions.ACTION_CAN_EDIT, f"DAG:{dag_id}"),
],
}
]
with create_user_scope(
app,
username=username,
role_name=role_name,
) as user:
dag1 = DagModel(dag_id=dag_id)
dag2 = DagModel(dag_id=dag_id_2)
session.add_all([dag1, dag2])
session.commit()
security_manager.bulk_sync_roles(mock_roles)
security_manager.sync_perm_for_dag(dag1.dag_id, access_control={role_name: READ_WRITE})
security_manager.sync_perm_for_dag(dag2.dag_id, access_control={role_name: READ_WRITE})
assert_user_has_dag_perms(perms=READ_WRITE, dag_id=dag_id, user=user)
assert_user_does_not_have_dag_perms(perms=READ_WRITE, dag_id=dag_id_2, user=user)
session.query(DagModel).delete()


def test_fab_models_use_airflow_base_meta():
Expand Down

0 comments on commit cc35fca

Please sign in to comment.