From 853af295d91cd0abde62b6c70787a036de150ed8 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Fri, 21 Jun 2024 09:29:34 -0400 Subject: [PATCH] Various RBAC fixes related to managed RoleDefinitions (#15287) * Add migration testing for certain managed roles * Fix managed role bugs * Add more tests * Fix another bug with org workflow admin role reference * Add test because another issue is fixed * Mark reason for test * Remove internal markers * Reword failure message Co-authored-by: Seth Foster --------- Co-authored-by: Seth Foster --- awx/main/migrations/_dab_rbac.py | 9 +++--- awx/main/models/rbac.py | 7 ++++- .../functional/dab_rbac/test_access_list.py | 14 +++++++++ .../functional/dab_rbac/test_dab_rbac_api.py | 30 ++++++++++++++++++ .../functional/dab_rbac/test_managed_roles.py | 31 +++++++++++++++++++ .../dab_rbac/test_translation_layer.py | 11 ++++++- awx/main/tests/functional/test_migrations.py | 7 +++++ 7 files changed, 103 insertions(+), 6 deletions(-) create mode 100644 awx/main/tests/functional/dab_rbac/test_managed_roles.py diff --git a/awx/main/migrations/_dab_rbac.py b/awx/main/migrations/_dab_rbac.py index b9d5a5645c34..4ffe46890fb8 100644 --- a/awx/main/migrations/_dab_rbac.py +++ b/awx/main/migrations/_dab_rbac.py @@ -292,12 +292,13 @@ def setup_managed_role_definitions(apps, schema_editor): org_perms = set() for cls in permission_registry.all_registered_models: ct = ContentType.objects.get_for_model(cls) + cls_name = cls._meta.model_name object_perms = set(Permission.objects.filter(content_type=ct)) # Special case for InstanceGroup which has an organiation field, but is not an organization child object - if cls._meta.model_name != 'instancegroup': + if cls_name != 'instancegroup': org_perms.update(object_perms) - if 'object_admin' in to_create and cls != Organization: + if 'object_admin' in to_create and cls_name != 'organization': indiv_perms = object_perms.copy() add_perms = [perm for perm in indiv_perms if perm.codename.startswith('add_')] if add_perms: @@ -310,7 +311,7 @@ def setup_managed_role_definitions(apps, schema_editor): ) ) - if 'org_children' in to_create and cls != Organization: + if 'org_children' in to_create and (cls_name not in ('organization', 'instancegroup', 'team')): org_child_perms = object_perms.copy() org_child_perms.add(Permission.objects.get(codename='view_organization')) @@ -327,7 +328,7 @@ def setup_managed_role_definitions(apps, schema_editor): if 'special' in to_create: special_perms = [] for perm in object_perms: - if perm.codename.split('_')[0] not in ('add', 'change', 'update', 'delete', 'view'): + if perm.codename.split('_')[0] not in ('add', 'change', 'delete', 'view'): special_perms.append(perm) for perm in special_perms: action = perm.codename.split('_')[0] diff --git a/awx/main/models/rbac.py b/awx/main/models/rbac.py index 3dfb583b42da..ffed4f68247c 100644 --- a/awx/main/models/rbac.py +++ b/awx/main/models/rbac.py @@ -591,8 +591,13 @@ def get_role_from_object_role(object_role): role_name = role_name.lower() model_cls = apps.get_model('main', target_model_name) target_model_name = get_type_for_model(model_cls) + + # exception cases completely specific to one model naming convention if target_model_name == 'notification_template': - target_model_name = 'notification' # total exception + target_model_name = 'notification' + elif target_model_name == 'workflow_job_template': + target_model_name = 'workflow' + role_name = f'{target_model_name}_admin_role' elif rd.name.endswith(' Admin'): # cases like "project-admin" diff --git a/awx/main/tests/functional/dab_rbac/test_access_list.py b/awx/main/tests/functional/dab_rbac/test_access_list.py index 2a88b5b18f4c..0a409efa9c8a 100644 --- a/awx/main/tests/functional/dab_rbac/test_access_list.py +++ b/awx/main/tests/functional/dab_rbac/test_access_list.py @@ -109,3 +109,17 @@ def test_team_indirect_access(get, team, admin_user, inventory): assert len(by_username['u1']['summary_fields']['indirect_access']) == 0 access_entry = by_username['u1']['summary_fields']['direct_access'][0] assert sorted(access_entry['descendant_roles']) == sorted(['adhoc_role', 'use_role', 'update_role', 'read_role', 'admin_role']) + + +@pytest.mark.django_db +def test_workflow_access_list(workflow_job_template, alice, bob, setup_managed_roles, get, admin_user): + """Basic verification that WFJT access_list is functional""" + workflow_job_template.admin_role.members.add(alice) + workflow_job_template.organization.workflow_admin_role.members.add(bob) + + url = reverse('api:workflow_job_template_access_list', kwargs={'pk': workflow_job_template.pk}) + for u in (alice, bob, admin_user): + response = get(url, user=u, expect=200) + user_ids = [item['id'] for item in response.data['results']] + assert alice.pk in user_ids + assert bob.pk in user_ids diff --git a/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py b/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py index d9e3453ef254..66df98b07a17 100644 --- a/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py +++ b/awx/main/tests/functional/dab_rbac/test_dab_rbac_api.py @@ -5,6 +5,7 @@ from awx.api.versioning import reverse from awx.main.models import JobTemplate, Inventory, Organization +from awx.main.access import JobTemplateAccess, WorkflowJobTemplateAccess from ansible_base.rbac.models import RoleDefinition @@ -88,3 +89,32 @@ def test_assign_custom_add_role(admin_user, rando, organization, post, setup_man inv_id = r.data['id'] inventory = Inventory.objects.get(id=inv_id) assert rando.has_obj_perm(inventory, 'change') + + +@pytest.mark.django_db +def test_jt_creation_permissions(setup_managed_roles, inventory, project, rando): + """This tests that if you assign someone required permissions in the new API + using the managed roles, then that works to give permissions to create a job template""" + inv_rd = RoleDefinition.objects.get(name='Inventory Admin') + proj_rd = RoleDefinition.objects.get(name='Project Admin') + # establish prior state + access = JobTemplateAccess(rando) + assert not access.can_add({'inventory': inventory.pk, 'project': project.pk, 'name': 'foo-jt'}) + + inv_rd.give_permission(rando, inventory) + proj_rd.give_permission(rando, project) + + assert access.can_add({'inventory': inventory.pk, 'project': project.pk, 'name': 'foo-jt'}) + + +@pytest.mark.django_db +def test_workflow_creation_permissions(setup_managed_roles, organization, workflow_job_template, rando): + """Similar to JT, assigning new roles gives creator permissions""" + org_wf_rd = RoleDefinition.objects.get(name='Organization WorkflowJobTemplate Admin') + assert workflow_job_template.organization == organization # sanity + # establish prior state + access = WorkflowJobTemplateAccess(rando) + assert not access.can_add({'name': 'foo-flow', 'organization': organization.pk}) + org_wf_rd.give_permission(rando, organization) + + assert access.can_add({'name': 'foo-flow', 'organization': organization.pk}) diff --git a/awx/main/tests/functional/dab_rbac/test_managed_roles.py b/awx/main/tests/functional/dab_rbac/test_managed_roles.py new file mode 100644 index 000000000000..ec7763d618d6 --- /dev/null +++ b/awx/main/tests/functional/dab_rbac/test_managed_roles.py @@ -0,0 +1,31 @@ +import pytest + +from ansible_base.rbac.models import RoleDefinition, DABPermission + + +@pytest.mark.django_db +def test_roles_to_not_create(setup_managed_roles): + assert RoleDefinition.objects.filter(name='Organization Admin').count() == 1 + + SHOULD_NOT_EXIST = ('Organization Organization Admin', 'Organization Team Admin', 'Organization InstanceGroup Admin') + + bad_rds = RoleDefinition.objects.filter(name__in=SHOULD_NOT_EXIST) + if bad_rds.exists(): + bad_names = list(bad_rds.values_list('name', flat=True)) + raise Exception(f'Found RoleDefinitions that should not exist: {bad_names}') + + +@pytest.mark.django_db +def test_project_update_role(setup_managed_roles): + """Role to allow updating a project on the object-level should exist""" + assert RoleDefinition.objects.filter(name='Project Update').count() == 1 + + +@pytest.mark.django_db +def test_org_child_add_permission(setup_managed_roles): + for model_name in ('Project', 'NotificationTemplate', 'WorkflowJobTemplate', 'Inventory'): + rd = RoleDefinition.objects.get(name=f'Organization {model_name} Admin') + assert 'add_' in str(rd.permissions.values_list('codename', flat=True)), f'The {rd.name} role definition expected to contain add_ permissions' + + # special case for JobTemplate, anyone can create one with use permission to project/inventory + assert not DABPermission.objects.filter(codename='add_jobtemplate').exists() diff --git a/awx/main/tests/functional/dab_rbac/test_translation_layer.py b/awx/main/tests/functional/dab_rbac/test_translation_layer.py index c8e529c902cd..c39cd9f95d11 100644 --- a/awx/main/tests/functional/dab_rbac/test_translation_layer.py +++ b/awx/main/tests/functional/dab_rbac/test_translation_layer.py @@ -16,7 +16,16 @@ @pytest.mark.django_db @pytest.mark.parametrize( 'role_name', - ['execution_environment_admin_role', 'project_admin_role', 'admin_role', 'auditor_role', 'read_role', 'execute_role', 'notification_admin_role'], + [ + 'execution_environment_admin_role', + 'workflow_admin_role', + 'project_admin_role', + 'admin_role', + 'auditor_role', + 'read_role', + 'execute_role', + 'notification_admin_role', + ], ) def test_round_trip_roles(organization, rando, role_name, setup_managed_roles): """ diff --git a/awx/main/tests/functional/test_migrations.py b/awx/main/tests/functional/test_migrations.py index 89697f2cc1d5..41eab9dd3852 100644 --- a/awx/main/tests/functional/test_migrations.py +++ b/awx/main/tests/functional/test_migrations.py @@ -85,3 +85,10 @@ def test_migrate_DAB_RBAC(self, migrator): RoleUserAssignment = new_state.apps.get_model('dab_rbac', 'RoleUserAssignment') assert RoleUserAssignment.objects.filter(user=user.id, object_id=org.id).exists() + + # Regression testing for bug that comes from current vs past models mismatch + RoleDefinition = new_state.apps.get_model('dab_rbac', 'RoleDefinition') + assert not RoleDefinition.objects.filter(name='Organization Organization Admin').exists() + # Test special cases in managed role creation + assert not RoleDefinition.objects.filter(name='Organization Team Admin').exists() + assert not RoleDefinition.objects.filter(name='Organization InstanceGroup Admin').exists()