diff --git a/requiam/grouper_admin.py b/requiam/grouper_admin.py index a3309f13..da62eb8d 100644 --- a/requiam/grouper_admin.py +++ b/requiam/grouper_admin.py @@ -119,3 +119,62 @@ def add_group(self, group, group_type, description): except requests.exceptions.HTTPError: raise requests.exceptions.HTTPError + def add_privilege(self, access_group, target_group, target_group_type, privileges): + """ + Purpose: + Add privilege(s) for a Grouper group to access target + + :param access_group: name of group to give access to, ex: arizona.edu:Dept:LBRY:figshare:GrouperSuperAdmins + :param target_group: name of group to add privilege on, ex: "apitest" + :param target_group_type: name of stem associated with the group to add privilege on, + ex: use 'test' for arizona.edu:Dept:LBRY:figtest:test + :param privileges: single string, or list of strings of allowed values: + 'read', 'view', 'update', 'admin', 'optin', 'optout' + + :return: True on success, otherwise raises GrouperAPIException + """ + + # This is a hack. The endpoint needs to change so "groups" is not hardcoded. + endpoint = join(dirname(self.endpoint), 'grouperPrivileges') + + # Check privileges + if isinstance(privileges, str): + privileges = [privileges] + for privilege in privileges: + if privilege not in ['read', 'view', 'update', 'admin', 'optin', 'optout']: + raise ValueError(f"Invalid privilege name: {privilege}") + + target_groupname = figshare_group(target_group, target_group_type, + production=self.grouper_production) + + try: + group_exists = self.check_group_exists(target_group, target_group_type) + except KeyError: + raise KeyError("ERROR: Stem is empty") + + if group_exists: + args = self.get_group_details(access_group) + if len(args): + access_group_detail = args.pop() + else: + raise Exception(f"Could NOT find access_group: {access_group}") + + # initialize + params = { + 'WsRestAssignGrouperPrivilegesLiteRequest': { + 'allowed': 'T', + 'subjectId': access_group_detail['uuid'], + 'privilegeName': '', + 'groupName': target_groupname, + 'privilegeType': 'access' + } + } + + for privilege in privileges: + params['WsRestAssignGrouperPrivilegesLiteRequest']['privilegeName'] = privilege + result = requests.post(endpoint, json=params, headers=self.headers, + auth=(self.grouper_user, self.grouper_password)) + metadata = result.json()['WsAssignGrouperPrivilegesLiteResult']['resultMetadata'] + + if metadata['resultCode'] not in ['SUCCESS_ALLOWED', 'SUCCESS_ALLOWED_ALREADY_EXISTED']: + raise ValueError(f"Unexpected result received: {metadata['resultCode']}")