https://learn.microsoft.com/en-us/rest/api/authorization/privileged-role-eligibility-rest-sample


In [1]:
from functions.msgraphapi import GraphAPI
from pprint import pprint
from creds import (
    azure_tenant_id,
    azure_client_id,
    azure_client_secret,
    confluence_page_id,
    confluence_token,
    confluence_url,
    confluence_azure_resource_page_name,
    azure_root,
)
from azure.identity import ClientSecretCredential
from azure.mgmt.authorization import AuthorizationManagementClient
from azure.identity import AzureCliCredential
from functions.confluence import confluence_update_page, style_text
from atlassian import Confluence

from functions.functions import (
    get_assignments,
    build_user_array,
    get_documented_mappings,
    check_new_mappings,
    check_removed_mappings,
)

credential = ClientSecretCredential(
    azure_tenant_id, azure_client_id, azure_client_secret
)

graph_client = GraphAPI(
    azure_tenant_id=azure_tenant_id,
    azure_client_id=azure_client_id,
    azure_client_secret=azure_client_secret,
)
confluence = Confluence(url=confluence_url, token=confluence_token)



In [2]:
def get_role_assignments(subscription_id, credential):
    scope = f"/subscriptions/{subscription_id}"
    client = AuthorizationManagementClient(credential, subscription_id)
    assignments = client.role_eligibility_schedule_instances.list_for_scope(scope)

    results = []

    for assignment in assignments:
        expanded = assignment.expanded_properties
        end_date_time = assignment.end_date_time or "permanent"
        result = {
            "PrincipalName": expanded.principal.display_name,
            # "PrincipalEmail": expanded.principal.email,
            "PrincipalType": expanded.principal.type,
            "PrincipalId": expanded.principal.id,
            "RoleName": expanded.role_definition.display_name,
            "RoleType": expanded.role_definition.type,
            # "RoleId": expanded.role_definition.id,
            # "ScopeId": expanded.scope.id,
            "ScopeName": expanded.scope.display_name,
            "ScopeType": expanded.scope.type,
            # "Status": assignment.status,
            # "createdOn": assignment.created_on,
            # "startDateTime": assignment.start_date_time,
            # "endDateTime": end_date_time,
            # "updatedOn": assignment.updated_on,
            "memberType": assignment.member_type,
            # "id": assignment.id,
        }

        results.append(result)
    return results



async def build_assignments(
    role_assignments={}, assignment_dict={}, groups_evaluated=[]
):
    for role_assignment in role_assignments:
        if role_assignment["PrincipalType"] == "Group":
            if role_assignment["ScopeName"] not in assignment_dict:
                assignment_dict[role_assignment["ScopeName"]] = {}

            # Extract the elements for the new format
            username = None
            scope = role_assignment["ScopeName"]
            role = role_assignment["RoleName"]

            if role_assignment["PrincipalId"] not in groups_evaluated:
                group_members = await graph_client.get_group_members(
                    role_assignment["PrincipalId"]
                )
                groups_evaluated.append(role_assignment["PrincipalId"])
                if len(group_members) > 0:

                    # Add the extracted elements to the dictionary
                    if scope not in assignment_dict:
                        assignment_dict[scope] = {}

                    if role not in assignment_dict[scope]:
                        assignment_dict[scope][role] = []
                    for group_member in group_members:
                        user_display_name = group_member.display_name
                        if user_display_name not in assignment_dict[scope][role]:
                            assignment_dict[scope][role].append(user_display_name)
    return assignment_dict, groups_evaluated

In [3]:
def build_confluence_table(assignment_dict):
    ct = []
    for scope, roles in assignment_dict.items():
        for role in roles:
            for user in roles[role]:
                ct.append({"Benutzer": user, "Scope": scope, "Rolle": role})
    return ct

In [None]:
from azure.mgmt.subscription import SubscriptionClient

client = SubscriptionClient(credential)

response = client.subscriptions.list()
subscriptions = []
for item in response:
    subscriptions.append(item.display_name)

pprint(subscriptions)



In [None]:
assignment_dict = {}
groups_evaluated = []
assignment_dict, groups_evaluated = await build_assignments(
    role_assignments=role_assignments,
    assignment_dict=assignment_dict,
    groups_evaluated=groups_evaluated,
)
pprint(assignment_dict)

In [6]:
existing_role_mappings, headers = get_documented_mappings(
    confluence, confluence_page_id, confluence_azure_resource_page_name
)

In [7]:
def check_new_azure_resource_mappings(
    existing_role_mappings=[], new_role_mappings=[], headers=[]
):
    changes = []
    for new_role_mapping in new_role_mappings:
        mapped = False
        for existing_role_mapping in existing_role_mappings:
            if (
                new_role_mapping["Benutzer"] == existing_role_mapping["Benutzer"]
                and new_role_mapping["Rolle"] == existing_role_mapping["Rolle"]
                and new_role_mapping["Scope"] == existing_role_mapping["Scope"]
            ):
                mapped = True
                break
        if not mapped:
            for header in headers:
                if header not in new_role_mapping:
                    new_role_mapping[header] = ""
            print(f"New mapping: {new_role_mapping}")
            existing_role_mappings.append(new_role_mapping)
            changes.append(new_role_mapping)
    if len(changes) < 1:
        changes = False
    return existing_role_mappings, changes

def check_removed_azure_resource_mappings(existing_role_mappings=[], new_role_mappings=[]):
    changes = []
    for existing_role_mapping in existing_role_mappings:
        mapped = False
        for new_role_mapping in new_role_mappings:
            if (
                new_role_mapping["Benutzer"] == existing_role_mapping["Benutzer"]
                and new_role_mapping["Rolle"] == existing_role_mapping["Rolle"]
                and new_role_mapping["Scope"] == existing_role_mapping["Scope"]
            ):
                mapped = True
                break
        if not mapped:
            print(f"Removed mapping: {existing_role_mapping}")
            existing_role_mappings.remove(existing_role_mapping)
            changes.append(existing_role_mapping)
    if len(changes) < 1:
        changes = False
    return existing_role_mappings, changes

new_role_mappings = build_confluence_table(assignment_dict)
role_mappings, new_mappings = check_new_azure_resource_mappings(
    existing_role_mappings=existing_role_mappings,
    new_role_mappings=new_role_mappings,
    headers=headers,
)

role_mappings, removed_mappings = check_removed_azure_resource_mappings(existing_role_mappings=role_mappings, new_role_mappings=new_role_mappings)

In [None]:
role_mappings = sorted(role_mappings, key=lambda x: (x["Benutzer"], x["Scope"], x["Rolle"]), reverse=False)
if new_mappings or removed_mappings:
    confluence_update_page(
        confluence=confluence,
        title=confluence_azure_resource_page_name,
        parent_id=confluence_page_id,
        table=role_mappings,
        representation="storage",
        full_width=False,
        escape_table=True,
        body_header=style_text(
            "Achtung! Nur bestehende Einträge ergänzen, keine neue hinzufügen!<br/>Bei bedarf an neuen rechten bitte via Incident",
            color="red",
            bold=True,
        ),
    )
else:
    print("No changes detected")