In [1]:
import casbin

In [2]:
# seems to error opening with my environment
# import casbin_sqlobject_adapter
# adapter = casbin_sqlobject_adapter.Adapter('sqlite:///../casbin.db')

In [2]:
%%bash
touch test_policies.csv

In [3]:
with open("../tests/restful_casbin.conf", "w") as conf_file:
    conf_file.write("""
[request_definition]
r = sub, obj, act

[policy_definition]
p = sub, obj, act


[role_definition]
g = _, _
g2 = _, _

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = match_or_group(r.sub, p.sub, g(r.sub, p.sub)) && keyMatch(r.obj, p.obj) && regexMatch(r.act, p.act) || g(r.sub,"admin")
""")

In [4]:
enforcer = casbin.Enforcer("../tests/restful_casbin.conf", "./test_policies.csv")
enforcer.load_policy()

def match_or_group(r_sub, p_sub, group_bool):
    if group_bool == True:
        return True
    elif r_sub == p_sub:
        return True
    else:
        return False

enforcer.add_function(name="match_or_group", func=match_or_group )

In [5]:
user_one_email = "max@uva.edu"
user_one_id = "ark:99999/max-uva"
user_two_email = "sadnan@uva.edu"
user_two_id = "ark:99999/max-uva"

group_id = "ark:99999/clarklab"
organization_id = "ark:99999/UVA"
project_id = "ark:99999/UVA/B2AI"
computation_id = "ark:99999/UVA/B2AI/pipeline-run"
software_id = "ark:99999/UVA/B2AI/b2ai-pipeline"
dataset_id = "ark:99999/UVA/B2AI/image-dataset"

In [60]:
def createUser(casbinEnforcer: casbin.Enforcer, userEmail: str, userId: str):
    
    casbinEnforcer.add_policies([
        (userEmail, f"/user/{userId}", "(GET)|(PUT)|(DELETE)"), 
        (userEmail, "/group", "(POST)"),
        (userEmail, "/project", "(POST)"),
        (userEmail, "/organization", "(POST)"),
        (userEmail, "/computation", "(POST)"),
        (userEmail, "/software", "(POST)"),
        (userEmail, "/dataset", "(POST)")
    ])
    
    return None

In [61]:
# test enforcement of several api requests
def test_user_access():
    # create a user
    createUser(enforcer, user_one_email, user_one_id)

    # test that user can get, put, delete their own record
    enforcement_decision = enforcer.enforce_ex(user_one_email, f"/user/{user_one_id}", "GET")
    assert enforcement_decision[0] == True
    assert enforcement_decision[1] == (user_one_email, f'/user/{user_one_id}', '(GET)|(PUT)|(DELETE)')

    enforcement_decision = enforcer.enforce_ex(user_one_email, f"/user/{user_one_id}", "PUT")
    assert enforcement_decision[0] == True
    assert enforcement_decision[1] == (user_one_email, f'/user/{user_one_id}', '(GET)|(PUT)|(DELETE)')

    enforcement_decision = enforcer.enforce_ex(user_one_email, f"/user/{user_one_id}", "DELETE")
    assert enforcement_decision[0] == True
    assert enforcement_decision[1] == (user_one_email, f'/user/{user_one_id}', '(GET)|(PUT)|(DELETE)')

    # test that user can create groups
    enforcement_decision = enforcer.enforce_ex(user_one_email, "/group", "POST")
    assert enforcement_decision[0] == True
    assert enforcement_decision[1] == (user_one_email, "/group", "(POST)")

    # test that user can create projects
    enforcement_decision = enforcer.enforce_ex(user_one_email, "/project", "POST")
    assert enforcement_decision[0] == True
    assert enforcement_decision[1] == (user_one_email, "/project", "(POST)")

    # test that user can create organizations
    enforcement_decision = enforcer.enforce_ex(user_one_email, "/organization", "POST")
    assert enforcement_decision[0] == True
    assert enforcement_decision[1] == (user_one_email, "/organization", "(POST)")

    # test that user can create computations
    enforcement_decision = enforcer.enforce_ex(user_one_email, "/computation", "POST")
    assert enforcement_decision[0] == True
    assert enforcement_decision[1] == (user_one_email, "/computation", "(POST)")

    # test that user can create software
    enforcement_decision = enforcer.enforce_ex(user_one_email, "/software", "POST")
    assert enforcement_decision[0] == True
    assert enforcement_decision[1] == (user_one_email, "/software", "(POST)")

    # test that user can create datasets
    enforcement_decision = enforcer.enforce_ex(user_one_email, "/dataset", "POST")
    assert enforcement_decision[0] == True
    assert enforcement_decision[1] == (user_one_email, "/dataset", "(POST)")

    # test that the user cannot delete other users
    enforcement_result = enforcer.enforce_ex(user_one_email, "/user/ark:99999/not-my-user", "DELETE")
    assert enforcement_result[0] == False

#enforcer.enforce_ex(())

In [62]:
test_user_access()

Request: max@uva.edu, /user/ark:99999/not-my-user, DELETE ---> False


In [65]:
enforcer.get_all_actions()
enforcer.get_all_objects()

['/organization/ark:99999/UVA',
 '/user/ark:99999/max-uva',
 '/group',
 '/project',
 '/organization',
 '/computation',
 '/software',
 '/dataset']

In [7]:
enforcer.enforce("*", "ark:99999/clarklab", "GET")

Request: *, ark:99999/clarklab, GET ---> False


False

In [42]:
def createSuperUser(enforcer: casbin.Enforcer, userEmail):
    enforcer.add_grouping_policy(userEmail, groupId)
    return None

In [9]:
def createGroup(enforcer, userEmail, groupId)-> None:

    enforcer.add_policies([
        (userEmail, f"/group/{groupId}", "(GET)|(PUT)|(DELETE)"),
    ])

    # add the user to the policy
    enforcer.add_grouping_policy(userEmail, groupId)

    enforcer.add_policies([
        (groupId, f"/group/{groupId}", "(GET)")
    ])
    
    return None

def addUserToGroup(enforcer, userEmail, groupId):
    # add the user to the policy
    enforcer.add_grouping_policy(userEmail, groupId)
    
    return None


def removeUserFromGroup(enforcer, userEmail, groupId):
    # TODO check if this is the owner
    
    # if the only member of the group remains raise exception
    if len(enforcer.get_filtered_grouping_policy(groupId))==1:
        raise Exception("Can't Remove Last Memeber of Group")
    
    enforcer.remove_grouping_policy(userEmail, groupId)
    
    return None

def deleteGroup(enforcer, groupId)-> None:
    enforcer.delete_role(groupId)
    
    return None

In [10]:
def test_group_creation():

    createGroup(enforcer, user_one_email, group_id)
    assert group_id in enforcer.get_all_named_roles("g")
    assert group_id in enforcer.get_roles_for_user(user_one_email)
    
    createUser(enforcer, user_two_email, user_two_id)

    # add second user to the group
    addUserToGroup(enforcer,  user_two_email, group_id)
    assert group_id in enforcer.get_roles_for_user( user_two_email)
    
    # remove user two from the group
    removeUserFromGroup(enforcer,  user_two_email, group_id)
    assert group_id not in enforcer.get_roles_for_user( user_two_email)
    
    
    # try to remove last user from group
    try:
        removeUserFromGroup(enforcer, user_one_email, group_id)
    except Exception as e:
        pass

    # assert that the user is still in the group
    assert group_id in enforcer.get_roles_for_user(user_one_email)
        
    # delete group totally
    deleteGroup(enforcer, group_id)
    assert group_id not in enforcer.get_all_named_roles("g")

    # check that user 1 is not a member of the group
    assert group_id not in enforcer.get_roles_for_user(user_one_email)

In [11]:
test_group_creation()

In [43]:
help(enforcer.get_filtered_policy)

Help on method get_filtered_policy in module casbin.management_enforcer:

get_filtered_policy(field_index, *field_values) method of casbin.enforcer.Enforcer instance
    gets all the authorization rules in the policy, field filters can be specified.



In [49]:
createOrganization(enforcer, user_one_email, organization_id)

In [57]:
enforcer.get_filtered_policy(2, organization_id)

[]

In [66]:
enforcer.get_policy()

[('max@uva.edu', '/organization/ark:99999/UVA', '(GET)|(PUT)|(DELETE)'),
 ('max@uva.edu', '/user/ark:99999/max-uva', '(GET)|(PUT)|(DELETE)'),
 ('max@uva.edu', '/group', '(POST)'),
 ('max@uva.edu', '/project', '(POST)'),
 ('max@uva.edu', '/organization', '(POST)'),
 ('max@uva.edu', '/computation', '(POST)'),
 ('max@uva.edu', '/software', '(POST)'),
 ('max@uva.edu', '/dataset', '(POST)')]

In [67]:
enforcer.get_filtered_policy(1, "/dataset")

[('max@uva.edu', '/dataset', '(POST)')]

In [68]:
enforcer.remove_filtered_policy(1, "/dataset")

True

In [38]:
def createOrganization(casbinEnforcer: casbin.Enforcer, userEmail: str, organizationID: str) -> None:
    casbinEnforcer.add_policies([
        (userEmail, f"/organization/{organizationID}", "(GET)|(PUT)|(DELETE)"),
    ])
    
    return None

def deleteOrganization(casbinEnforcer: casbin.Enforcer, organizationID: str) -> None:
    deleted_policy = casbinEnforcer.remove_filtered_policy(1, f"/organization/{organizationID}")
    if not deleted_policy:
        raise PolicyNotFoundException()

def addUserToOrganization(casbinEnforcer: casbin.Enforcer, userEmail: str, organizationID: str) -> None:
    casbinEnforcer.add_policies([
        (userEmail, f"/organization/{organizationID}", "(GET)|(PUT)"),
    ])
    
    return None


def removeUserFromOrganization(casbinEnforcer: casbin.Enforcer, userEmail: str, organizationID: str) -> None:
    
    # if it is the owner raise exception
    is_user_admin = casbinEnforcer.enforce(
        userEmail, 
        f"/organization/{organizationID}", 
        "DELETE"
    )

    # can't remove admins from their own group
    if is_user_admin:
       raise Exception(f"User: {userEmail} is an admin of organization {organizationID}")
    
    # otherwise remove member from organization
    casbinEnforcer.remove_filtered_policy(
        0, userEmail, f"/organization/{organizationID}", "(GET)|(PUT)"
    )
    return None

def addGroupToOrganization(casbinEnforcer: casbin.Enforcer, groupID: str, organizationID: str) -> None:
    casbinEnforcer.add_policies([
        (groupID, f"/organization/{organizationID}", "(GET)|(PUT)"),
    ])
    
    return None


def removeGroupFromOrganization(casbinEnforcer: casbin.Enforcer, groupID: str, organizationID: str) -> None:
    casbinEnforcer.remove_filtered_policy(
        0, groupID, f"/organization/{organizationID}", "(GET)|(PUT)"
    )
    return None

In [34]:
enforcer.enforce(
        user_one_email, 
        f"/organization/{organization_id}", 
        "DELETE"
    )

True

In [37]:
enforcer.get_filtered_policy(0, f"/organization/{organization_id}", "(GET)|(PUT)|(DELETE)")

[]

In [23]:
enforcer.delete_role_for_user(user_two_email, organization_id)
enforcer.save_policy()

In [18]:
enforcer.load_policy()

In [39]:
def test_organization():
    
    createOrganization(enforcer, user_one_email, organization_id)
    enforcer.save_policy()
    
    assert enforcer.enforce(user_one_email, f"/organization/{organization_id}", "GET")
    assert enforcer.enforce(user_one_email, f"/organization/{organization_id}", "PUT")
    assert enforcer.enforce(user_one_email, f"/organization/{organization_id}", "DELETE")
    
    # add a user to the group
    addUserToOrganization(enforcer, user_two_email, organization_id)
    enforcer.save_policy()
    
    # check that user can get the organization information
    assert enforcer.enforce(user_two_email,  f"/organization/{organization_id}", "GET")
    # check that added user can update organization
    assert enforcer.enforce(user_two_email, f"/organization/{organization_id}", "PUT")
    # check that added user cannot delete the organization
    assert not enforcer.enforce(user_two_email, f"/organization/{organization_id}", "DELETE")
    
    # remove the second user from the organization
    removeUserFromOrganization(enforcer, user_two_email, organization_id)
    enforcer.save_policy()
    
    assert not enforcer.enforce(user_two_email,  f"/organization/{organization_id}", "GET")
    assert not enforcer.enforce(user_two_email, f"/organization/{organization_id}", "PUT")
    
    # add the second user to a group
    addUserToGroup(enforcer, user_two_email, group_id)
    # add the group to the organization
    addGroupToOrganization(enforcer, group_id, organization_id)
    
    # check that the second user can get and put but not delete the organization
    assert enforcer.enforce(user_two_email, f"/organization/{organization_id}", "GET")
    assert enforcer.enforce(user_two_email, f"/organization/{organization_id}", "PUT")
    assert not enforcer.enforce(user_two_email, f"/organization/{organization_id}", "DELETE")
    
    # remove the group from the organization
    removeGroupFromOrganization(enforcer, group_id, organization_id)
    
    assert not enforcer.enforce(user_two_email, f"/organization/{organization_id}", "GET")
    assert not enforcer.enforce(user_two_email, f"/organization/{organization_id}", "PUT")
    
    # try to remove the owner from the organization
    try:
        removeUserFromOrganization(enforcer, user_one_email, organization_id)
    except:
        pass


In [40]:
enforcer.remove_filtered_policy(0, group_id, f"/organization/{organization_id}", "(GET)|(PUT)"
)

False

In [41]:
class PolicyNotFoundException(Exception):

    def __init__(
        self, 
        message="No Casbin Policy Found", 
        subject: str = "", 
        object: str = "", 
        action: str = ""
    ):
        self.message = message
        super().__init__(self.message)

In [59]:

enforcer.get_all_objects()


['/organization/ark:99999/UVA']

In [58]:
class ObjectPermission():

    def __init__(objectId: str)-> None:
        self.objectId = objectId

    
    def create(
        casbinEnforcer: casbin.Enforcer, 
        userEmail:str, 
        action: str = "(GET)|(PUT)|(DELETE)"
    )-> None:
        casbinEnforcer.add_policy(userEmail, self.objectId, action)
        return None

    def delete(
        casbinEnforcer: casbin.Enforcer,
        objectURL: str
    )-> None:
        deleted_policy = casbinEnforcer.remove_filtered_policy(1, objectURL)
        if not deleted_policy:
            raise PolicyNotFoundException()
        return None
    
    def addPermission(
        casbinEnforcer: casbin.Enforcer, 
        userEmail: str, 
        action: str = "(GET)|(PUT)"
    )-> None:
        casbinEnforcer.add_policy(userEmail, self.objectId, action)
        return None

    
    def removePermission(
        casbinEnforcer: casbin.Enforcer, 
        subjectId: str, 
        action: str = "(GET)|(PUT)"
    )-> None:
         # otherwise remove member from organization
        removed_policy = casbinEnforcer.remove_filtered_policy(0, subjectId, self.objectId, action)
        if not removed_policy:
            raise PolicyNotFoundException()
        return None

In [69]:
def createProject(
    casbinEnforcer: casbin.Enforcer, 
    userEmail: str, 
    projectId:str
    )->None:
    ObjectPermission(objectId=projectId).create(casbinEnforcer, userEmail, action="(GET)|(PUT)|(DELETE)")
    return None

def addUserToProject(
    casbinEnforcer: casbin.Enforcer, 
    userEmail: str, 
    projectId: str
)->None:
    ObjectPermission(objectId=projectId).addPermission(casbinEnforcer, userEmail)

def removeUserFromProject(
    casbinEnforcer: casbin.Enforcer, 
    userEmail: str, 
    projectId: str
)->None:
    ObjectPermission(objectId=projectId).removePermission(casbinEnforcer, userEmail)


def addGroupToProject(    
    casbinEnforcer: casbin.Enforcer, 
    groupId: str, 
    projectId: str
)->None:
    ObjectPermission(objectId=projectId).addPermission(casbinEnforcer, groupId)

    
def removeGroupFromProject(
    casbinEnforcer: casbin.Enforcer, 
    groupId: str, 
    projectId: str
)->None:
    ObjectPermission(objectId=projectId).removePermission(casbinEnforcer, groupId)
    return None

In [None]:
def createROCrate():
    pass

def deleteROCrate():
    pass

def addUserToROCrate():
    pass

def removeUserFromROCrate():
    pass

def addGroupToROCrate():
    pass

def removeGroupFromROCrate():
    pass

In [None]:
def createDataset():
    pass

def addUserToDataset():
    pass

def removeUserFromDataset():
    pass

def addGroupToDataset():
    pass

def removeGroupFromDataset():
    pass

In [None]:
def createSoftware():
    pass


def addUserToSoftware():
    pass

def removeUserFromSoftware():
    pass

def addGroupToSoftware():
    pass

def removeGroupFromSoftware():
    pass

In [None]:
def createComputation():
    pass

def addUserToComputation():
    pass

def removeUserFromComputation():
    pass

def addGroupToComputation():
    pass

def removeGroupFromComputation():
    pass

In [97]:
enforcer.get_all_roles()

['ark:99999/clarklab']

In [99]:
enforcer.get_filtered_grouping_policy("ark:99999/clarklab")

[['max@uva.edu', 'ark:99999/clarklab']]

In [None]:
# check the grouping policies
enforcer.get_grouping_policy()

enforcer.get_all_named_subjects("p")

enforcer.get_all_named_objects("p")

In [32]:
enforcer.get_all_named_roles(ptype="p")

[]

In [35]:
enforcer.get_all_named_subjects("p")

['max@uva.edu']

In [33]:
enforcer.get_all_named_objects("p")

['/user/ark:99999/max-uva',
 '/group',
 '/project',
 '/organization',
 '/computation',
 '/software',
 '/dataset']

In [26]:
help(enforcer.get_all_named_objects)

Help on method get_all_named_objects in module casbin.management_enforcer:

get_all_named_objects(ptype) method of casbin.enforcer.Enforcer instance
    gets the list of objects that show up in the current named policy.



In [None]:
enforcer.enforce_ex(user_two_email,  f"/organization/{organization_id}", "GET")

enforcer.remove_named_policy("p", 'sadnan@uva.edu', '/organization/ark:99999/UVA', '(GET)|(PUT)')

enforcer.remove_filtered_policy(0, user_two_email, f"/organization/{organization_id}", "(GET)|(PUT)")

enforcer.get_filtered_policy(0, )

## Flow to Test
1. our test user is signs up
2. that user is granted access to an organization
3. that user creates a new project within that organization
4. one of each object is created within that new project
5. that user creates a new group of other users
6. the test user gives access to the project for the group they created

### Permissions

#### Organization
- can create project
- can edit organization metadata
- can delete projects

#### Project
- can add a digital object
- can delete the entire project
- can edit project metadata
- can delete any digital object within the project
- can change project permissions

In [8]:
user = "ark:99999/test-user"
org = "ark:99999/CAMA"
proj = "ark:99999/CAMA-HCTSA"

dataset = "ark:99999/CAMA-HCTSA-test-dataset"
dataset_object = "ark:99999/CAMA-HCTSA-test-dataset-download"

software = "ark:99999/CAMA-HCTSA-test-software"
software_object = "ark:99999/CAMA-HCTSA-test-software-download"

computation = "ark:99999/CAMA-HCTSA-computation"

result = "ark:99999/CAMA-HCTSA-computation-result"
result_object = "ark:99999/CAMA-HCTSA-computation-result-download"

In [24]:
# step 1 a user is granted access to an organization
# insert a policy
enforcer.add_policy(user, "canCreateProject", org)

# persisting the policies to storage
enforcer.save_policy()

In [9]:
# test that the user can actually create the project
enforcer.enforce(user, "canCreateProject", org)

True

In [28]:
# a user creates a project
enforcer.add_policies([
    (user, "delete", proj), 
    (user, "update", proj), 
    (user, "managePermissions", proj),
    (user, "createDigitalObject", proj)])

enforcer.save_policy()