From cd286bcf9114900138fb81893f75e6900cf6bcde Mon Sep 17 00:00:00 2001 From: Ross Jones Date: Wed, 1 Feb 2012 10:07:20 +0000 Subject: [PATCH] [1669] New test_publisher_auth functions (currently for Group) added and fixes to auth based on failing tests. --- ckan/logic/auth/publisher/create.py | 13 ++- ckan/logic/auth/publisher/delete.py | 11 +- ckan/logic/auth/publisher/update.py | 1 - ckan/tests/functional/test_publisher_auth.py | 102 +++++++++++++++++++ 4 files changed, 119 insertions(+), 8 deletions(-) create mode 100644 ckan/tests/functional/test_publisher_auth.py diff --git a/ckan/logic/auth/publisher/create.py b/ckan/logic/auth/publisher/create.py index 7cd515ca600..5e9c1613c8e 100644 --- a/ckan/logic/auth/publisher/create.py +++ b/ckan/logic/auth/publisher/create.py @@ -33,16 +33,21 @@ def package_relationship_create(context, data_dict): def group_create(context, data_dict=None): model = context['model'] - user = context['user'] + user = context['user'] + + if not user: + return {'success': False, 'msg': _('User is not authorized to create groups') } - # TODO: We need to check whether this group is being created within another group try: group = get_group_object( context ) except NotFound: return { 'success' : True } - usergrps = User.get( user ).get_groups('publisher') - authorized = _groups_intersect( usergrps, group.get_groups('publisher') ) + userobj = model.User.get( user ) + if not userobj: + return {'success': False, 'msg': _('User %s not authorized to create groups') % str(user)} + + authorized = _groups_intersect( userobj.get_groups('publisher'), [group] ) if not authorized: return {'success': False, 'msg': _('User %s not authorized to create groups') % str(user)} else: diff --git a/ckan/logic/auth/publisher/delete.py b/ckan/logic/auth/publisher/delete.py index d0196d6d96f..be08033ff56 100644 --- a/ckan/logic/auth/publisher/delete.py +++ b/ckan/logic/auth/publisher/delete.py @@ -39,11 +39,16 @@ def relationship_delete(context, data_dict): def group_delete(context, data_dict): model = context['model'] user = context['user'] + + if not user: + return {'success': False, 'msg': _('Only members of this group are authorized to delete this group')} group = get_group_object(context, data_dict) - usergrps = model.User.get( user ).get_groups('publisher', 'admin') - - authorized = _groups_intersect( usergrps, group.get_groups('publisher') ) + userobj = model.User.get( user ) + if not userobj: + return {'success': False, 'msg': _('Only members of this group are authorized to delete this group')} + + authorized = _groups_intersect( userobj.get_groups('publisher', 'admin'), [group] ) if not authorized: return {'success': False, 'msg': _('User %s not authorized to delete group %s') % (str(user),group.id)} else: diff --git a/ckan/logic/auth/publisher/update.py b/ckan/logic/auth/publisher/update.py index 691d965d502..ea88ea87d93 100644 --- a/ckan/logic/auth/publisher/update.py +++ b/ckan/logic/auth/publisher/update.py @@ -53,7 +53,6 @@ def group_update(context, data_dict): # Only allow package update if the user and package groups intersect userobj = model.User.get( user ) - if not userobj: return {'success': False, 'msg': _('Could not find user %s') % str(user)} if not _groups_intersect( userobj.get_groups('publisher', 'admin'), [group] ): diff --git a/ckan/tests/functional/test_publisher_auth.py b/ckan/tests/functional/test_publisher_auth.py new file mode 100644 index 00000000000..614c6446a5f --- /dev/null +++ b/ckan/tests/functional/test_publisher_auth.py @@ -0,0 +1,102 @@ +import re + +from nose.tools import assert_equal + +import ckan.model as model +from ckan.lib.create_test_data import CreateTestData +from ckan.logic import NotAuthorized + + +from ckan.tests import * +from ckan.tests import setup_test_search_index +from base import FunctionalTestCase +from ckan.tests import search_related, is_search_supported + + +class TestPublisherGroups(FunctionalTestCase): + + @classmethod + def setup_class(self): + from ckan.tests.mock_publisher_auth import MockPublisherAuth + self.auth = MockPublisherAuth() + + model.Session.remove() + CreateTestData.create(auth_profile='publisher') + self.groupname = u'david' + self.packagename = u'testpkg' + model.repo.new_revision() + model.Session.add(model.Package(name=self.packagename)) + model.repo.commit_and_remove() + + @classmethod + def teardown_class(self): + model.Session.remove() + model.repo.rebuild_db() + model.Session.remove() + + def _run_fail_test( self, username, action): + grp = model.Group.by_name(self.groupname) + context = { 'group': grp, 'model': model, 'user': username } + try: + self.auth.check_access(action,context, {}) + assert False, "The user should not have access" + except NotAuthorized, e: + pass + + def _run_success_test( self, username, action): + userobj = model.User.get(username) + grp = model.Group.by_name(self.groupname) + f = model.User.get_groups + def gg(*args, **kwargs): + return [grp] + model.User.get_groups = gg + + context = { 'group': grp, 'model': model, 'user': username } + try: + self.auth.check_access(action, context, {}) + except NotAuthorized, e: + assert False, "The user should have %s access: %r." % (action, e.extra_msg) + model.User.get_groups = f + + def test_new_success(self): + self._run_success_test( 'russianfan', 'group_create' ) + + def test_new_fail(self): + self._run_fail_test( 'russianfan', 'group_create' ) + + def test_new_anon_fail(self): + self._run_fail_test( '', 'group_create' ) + + def test_new_unknown_fail(self): + self._run_fail_test( 'nosuchuser', 'group_create' ) + + def test_edit_success(self): + """ Success because user in group """ + self._run_success_test( 'russianfan', 'group_update' ) + + def test_edit_fail(self): + """ Fail because user not in group """ + self._run_fail_test( 'russianfan', 'group_update' ) + + def test_edit_anon_fail(self): + """ Fail because user is anon """ + self._run_fail_test( '', 'group_update' ) + + def test_edit_unknown_fail(self): + self._run_fail_test( 'nosuchuser', 'group_update' ) + + def test_delete_success(self): + """ Success because user in group """ + self._run_success_test( 'russianfan', 'group_delete' ) + + def test_delete_fail(self): + """ Fail because user not in group """ + self._run_fail_test( 'russianfan', 'group_delete' ) + + def test_delete_anon_fail(self): + """ Fail because user is anon """ + self._run_fail_test( '', 'group_delete' ) + + def test_delete_unknown_fail(self): + self._run_fail_test( 'nosuchuser', 'group_delete' ) +