Skip to content

Commit

Permalink
models: added get() wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
ergo committed Oct 23, 2016
1 parent b9d5aab commit d6c4f51
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 18 deletions.
4 changes: 4 additions & 0 deletions ziggurat_foundations/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ def _get_keys(cls):
""" returns column names for this model """
return sa.orm.class_mapper(cls).c.keys()

@classmethod
def get_primary_key(cls):
return sa.orm.class_mapper(cls).primary_key

def get_dict(self, exclude_keys=None, include_keys=None):
""" return dictionary of keys and values corresponding to this model's
data
Expand Down
7 changes: 7 additions & 0 deletions ziggurat_foundations/models/services/external_identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@


class ExternalIdentityService(BaseService):
@classmethod
def get(cls, external_id, local_user_id, provider_name, db_session=None):
db_session = get_db_session(db_session)
lvars = locals()
pkey = [lvars[c.name] for c in cls.model.get_primary_key()]
return db_session.query(cls.model).get(pkey)

@classmethod
def by_external_id_and_provider(cls, external_id, provider_name,
db_session=None):
Expand Down
9 changes: 7 additions & 2 deletions ziggurat_foundations/models/services/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@


class GroupService(BaseService):
@classmethod
def get(cls, group_id, db_session=None):
db_session = get_db_session(db_session)
return db_session.query(cls.model).get(group_id)

@classmethod
def by_group_name(cls, group_name, db_session=None):
""" fetch group by name"""
Expand Down Expand Up @@ -45,7 +50,7 @@ def resources_with_possible_perms(cls, instance, perm_names=None,
cls.models_proxy.GroupResourcePermission.perm_name,
cls.models_proxy.Group,
cls.models_proxy.Resource
)
)
query = query.filter(
cls.models_proxy.Resource.resource_id ==
cls.models_proxy.GroupResourcePermission.resource_id)
Expand All @@ -62,7 +67,7 @@ def resources_with_possible_perms(cls, instance, perm_names=None,
cls.models_proxy.Resource.resource_type.in_(resource_types))

if (perm_names not in (
[ANY_PERMISSION], ANY_PERMISSION) and perm_names):
[ANY_PERMISSION], ANY_PERMISSION) and perm_names):
query = query.filter(
cls.models_proxy.GroupResourcePermission.perm_name.in_(
perm_names))
Expand Down
9 changes: 9 additions & 0 deletions ziggurat_foundations/models/services/group_permission.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@


class GroupPermissionService(BaseService):
@classmethod
def get(cls, group_id, perm_name, db_session=None):
db_session = get_db_session(db_session)
# this is implemented this way because order of columns can change
# on runtime
lvars = locals()
pkey = [lvars[c.name] for c in cls.model.get_primary_key()]
return db_session.query(cls.model).get(pkey)

@classmethod
def by_group_and_perm(cls, group_id, perm_name, db_session=None):
"""" return by by_user_and_perm and permission name """
Expand Down
12 changes: 11 additions & 1 deletion ziggurat_foundations/models/services/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@


class ResourceService(BaseService):
@classmethod
def get(cls, resource_id, db_session=None):
db_session = get_db_session(db_session)
return db_session.query(cls.model).get(resource_id)

@classmethod
def perms_for_user(cls, instance, user, db_session=None):
""" returns all permissions that given user has for this resource
Expand Down Expand Up @@ -219,7 +224,8 @@ def subtree_deeper(cls, object_id, limit_depth=1000000, flat=True,
"""
db_session = get_db_session(db_session)
text_obj = sa.text(raw_q)
q = db_session.query(cls.model, 'depth', 'sorting', 'path').from_statement(
q = db_session.query(cls.model, 'depth', 'sorting',
'path').from_statement(
text_obj).params(
resource_id=object_id, depth=limit_depth)
return q
Expand Down Expand Up @@ -263,3 +269,7 @@ def path_upper(cls, object_id, limit_depth=1000000, flat=True,
q = db_session.query(cls.model).from_statement(sa.text(raw_q)).params(
resource_id=object_id, depth=limit_depth)
return q

@classmethod
def move_before_position(cls, object_id, parent_id, position):
pass
5 changes: 5 additions & 0 deletions ziggurat_foundations/models/services/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@


class UserService(BaseService):
@classmethod
def get(cls, user_id, db_session=None):
db_session = get_db_session(db_session)
return db_session.query(cls.model).get(user_id)

@classmethod
def permissions(cls, instance, db_session=None):
""" returns all non-resource permissions based on what groups user
Expand Down
7 changes: 7 additions & 0 deletions ziggurat_foundations/models/services/user_permission.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@


class UserPermissionService(BaseService):
@classmethod
def get(cls, user_id, perm_name, db_session=None):
db_session = get_db_session(db_session)
lvars = locals()
pkey = [lvars[c.name] for c in cls.model.get_primary_key()]
return db_session.query(cls.model).get(pkey)

@classmethod
def by_user_and_perm(cls, user_id, perm_name, db_session=None):
""" return by user and permission name"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@


class UserResourcePermissionService(BaseService):
@classmethod
def get(cls, user_id, resource_id, perm_name, db_session=None):
db_session = get_db_session(db_session)
lvars = locals()
pkey = [lvars[c.name] for c in cls.model.get_primary_key()]
return db_session.query(cls.model).get(pkey)

@classmethod
def by_resource_user_and_perm(cls, user_id, perm_name, resource_id,
db_session=None):
Expand Down
6 changes: 6 additions & 0 deletions ziggurat_foundations/tests/test_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
BaseTestCase)
from ziggurat_foundations.tests.conftest import (
Group)
from ziggurat_foundations.models.services.group import GroupService


class TestGroup(BaseTestCase):
Expand Down Expand Up @@ -65,6 +66,11 @@ def test_users_dynamic(self, db_session):
assert group_users[0] == user1
assert group_users[1] == user2

def test_get(self, db_session):
group1 = add_group(db_session, 'group1')
group = GroupService.get(group_id=group1.id, db_session=db_session)
assert group.id == group1.id

def test_all(self, db_session):
group1 = add_group(db_session, 'group1')
group2 = add_group(db_session, 'group2')
Expand Down
14 changes: 14 additions & 0 deletions ziggurat_foundations/tests/test_other.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ def test_by_external_id_and_provider(self, db_session):
db_session=db_session)
assert identity == found

def test_get(self, db_session):
user = add_user(db_session)
identity = ExternalIdentity(external_user_name='foo',
external_id='foo',
provider_name='facebook')
user.external_identities.append(identity)
db_session.flush()
found = ExternalIdentityService.get(
provider_name='facebook',
external_id='foo',
local_user_id=user.id,
db_session=db_session)
assert identity == found

def test_user_by_external_id_and_provider(self, db_session):
user = add_user(db_session)
identity = ExternalIdentity(external_user_name='foo',
Expand Down
50 changes: 45 additions & 5 deletions ziggurat_foundations/tests/test_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
GroupPermission,
UserResourcePermission,
GroupResourcePermission, TestResourceB)
from ziggurat_foundations.models.services.group_permission import \
GroupPermissionService
from ziggurat_foundations.models.services.user_permission import \
UserPermissionService
from ziggurat_foundations.models.services.user_resource_permission import \
UserResourcePermissionService


class TestUserPermissions(BaseTestCase):
Expand Down Expand Up @@ -670,28 +676,55 @@ def test_resource_users_for_any_perm_limited_group_perms_empty_group(

check_one_in_other(perms, second)

def test_get_resource_permission(self, db_session):
created_user = add_user(db_session)
resource = add_resource(db_session, 1, 'test_resource')
permission = UserResourcePermission(
perm_name='test_perm', user_id=created_user.id,
resource_id=resource.resource_id)
resource.user_permissions.append(permission)
db_session.flush()
perm = UserResourcePermissionService.get(
user_id=created_user.id,
resource_id=resource.resource_id,
perm_name='test_perm',
db_session=db_session
)
assert perm.perm_name == 'test_perm'
assert perm.resource_id == resource.resource_id
assert perm.user_id == created_user.id


class TestGroupPermission(BaseTestCase):
def test_repr(self, db_session):
group_permission = GroupPermission(group_id=1,
perm_name='perm')
assert repr(group_permission) == '<GroupPermission: perm>'

def test_get(self, db_session):
org_group = add_group(db_session, 'group1')
group = GroupPermissionService.get(
group_id=org_group.id, perm_name='manage_apps',
db_session=db_session)
assert group.group_id == 1
assert group.perm_name == 'manage_apps'

def test_by_group_and_perm(self, db_session):
add_group(db_session,)
add_group(db_session, )
queried = GroupPermission.by_group_and_perm(1, 'manage_apps',
db_session=db_session)
assert queried.group_id == 1
assert queried.perm_name == 'manage_apps'

def test_by_group_and_perm_wrong_group(self, db_session):
add_group(db_session,)
add_group(db_session, )
queried = GroupPermission.by_group_and_perm(2,
'manage_apps',
db_session=db_session)
assert queried is None

def test_by_group_and_perm_wrong_perm(self, db_session):
add_group(db_session,)
add_group(db_session, )
queried = GroupPermission.by_group_and_perm(1, 'wrong_perm',
db_session=db_session)
assert queried is None
Expand All @@ -707,7 +740,7 @@ def test_resources_with_possible_perms(self, db_session):

def test_resources_with_possible_perms_group2(self, db_session):
self.set_up_user_group_and_perms(db_session)
resource3 = add_resource_b(db_session,3, 'other resource')
resource3 = add_resource_b(db_session, 3, 'other resource')
self.group2.resources.append(resource3)
group_permission2 = GroupResourcePermission(
perm_name='group_perm2',
Expand All @@ -732,6 +765,13 @@ def test_repr(self, db_session):
user_permission = UserPermission(user_id=1, perm_name='perm')
assert repr(user_permission) == '<UserPermission: perm>'

def test_get(self, db_session):
user = add_user(db_session)
perm = UserPermissionService.get(
user_id=user.id, perm_name='root', db_session=db_session)
assert perm.user_id == user.id
assert perm.perm_name == 'root'

def test_by_user_and_perm(self, db_session):
add_user(db_session)
user_permission = UserPermission.by_user_and_perm(1, 'root',
Expand All @@ -752,4 +792,4 @@ def test_by_user_and_perm_wrong_permname(self, db_session):
user_permission = UserPermission.by_user_and_perm(1, 'wrong',
db_session=db_session)

assert user_permission is None
assert user_permission is None
6 changes: 6 additions & 0 deletions ziggurat_foundations/tests/test_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ def create_default_tree(db_session):


class TestResources(BaseTestCase):
def test_get(self, db_session):
add_resource(db_session, 1, 'root')
resource = ResourceService.get(resource_id=1, db_session=db_session)
assert resource.resource_id == 1
assert resource.resource_name == 'root'

@pytest.mark.skipif(not_postgres, reason="requires postgres")
def test_root_nesting(self, db_session):
root = create_default_tree(db_session)
Expand Down
26 changes: 16 additions & 10 deletions ziggurat_foundations/tests/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@

from ziggurat_foundations.tests import BaseTestCase, add_user
from ziggurat_foundations.tests.conftest import (User)
from ziggurat_foundations.models.services.user import UserService


class TestUser(BaseTestCase):
def test_get(self, db_session):
org_user = add_user(db_session)
user = UserService.get(user_id=org_user.id, db_session=db_session)
assert org_user.id == user.id

def test_add_user(self, db_session):
user = User(user_name='username', email='email', status=0)
db_session.add(user)
Expand Down Expand Up @@ -98,9 +104,9 @@ def test_by_username_andsecurity_code_none(self, db_session):
assert found is None

def test_by_user_names(self, db_session):
user1 = add_user(db_session,'user1', 'email1')
add_user(db_session,'user2', 'email2')
user3 = add_user(db_session,'user3', 'email3')
user1 = add_user(db_session, 'user1', 'email1')
add_user(db_session, 'user2', 'email2')
user3 = add_user(db_session, 'user3', 'email3')

queried_users = User.by_user_names(['user1', 'user3'],
db_session=db_session).all()
Expand All @@ -110,9 +116,9 @@ def test_by_user_names(self, db_session):
assert user3 == queried_users[1]

def test_by_user_names_one_none(self, db_session):
user1 = add_user(db_session,'user1', 'email1')
add_user(db_session,'user2', 'email2')
user3 = add_user(db_session,'user3', 'email3')
user1 = add_user(db_session, 'user1', 'email1')
add_user(db_session, 'user2', 'email2')
user3 = add_user(db_session, 'user3', 'email3')

queried_users = User.by_user_names(['user1', None, 'user3'],
db_session=db_session).all()
Expand All @@ -122,9 +128,9 @@ def test_by_user_names_one_none(self, db_session):
assert user3 == queried_users[1]

def test_by_user_names_like(self, db_session):
user1 = add_user(db_session,'user1', 'email1')
add_user(db_session,'luser2', 'email2')
add_user(db_session,'noname', 'email3')
user1 = add_user(db_session, 'user1', 'email1')
add_user(db_session, 'luser2', 'email2')
add_user(db_session, 'noname', 'email3')

queried_users = User.user_names_like('user%',
db_session=db_session).all()
Expand Down Expand Up @@ -217,4 +223,4 @@ def test_regenerate_security_code(self, db_session):
new_code = created_user.security_code

assert old_code != new_code
assert len(new_code) == 64
assert len(new_code) == 64

0 comments on commit d6c4f51

Please sign in to comment.