From e2fedb6a6870b4c7329899f10c897addd216d9b5 Mon Sep 17 00:00:00 2001 From: David Read Date: Thu, 10 Sep 2015 17:28:42 +0100 Subject: [PATCH 1/5] [#2631] Also purge Members by table_id. * purge members instead of delete - I see no reason to keep the Member as state=deleted. * a blank revision was being created - comment explains the removal. --- ckan/logic/action/delete.py | 12 ++++-- ckan/tests/logic/action/test_delete.py | 52 ++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/ckan/logic/action/delete.py b/ckan/logic/action/delete.py index dd562201f19..791b34a3d3e 100644 --- a/ckan/logic/action/delete.py +++ b/ckan/logic/action/delete.py @@ -1,5 +1,7 @@ '''API functions for deleting data from CKAN.''' +import sqlalchemy as sqla + import ckan.logic import ckan.logic.action import ckan.plugins as plugins @@ -397,12 +399,14 @@ def _group_or_org_purge(context, data_dict, is_org=False): else: _check_access('group_purge', context, data_dict) - members = model.Session.query(model.Member) - members = members.filter(model.Member.group_id == group.id) + members = model.Session.query(model.Member) \ + .filter(sqla.or_(model.Member.group_id == group.id, + model.Member.table_id == group.id)) if members.count() > 0: - model.repo.new_revision() + # no need to do new_revision() because Member is not revisioned, nor + # does it cascade delete any revisioned objects for m in members.all(): - m.delete() + m.purge() model.repo.commit_and_remove() group = model.Group.get(id) diff --git a/ckan/tests/logic/action/test_delete.py b/ckan/tests/logic/action/test_delete.py index d8079968bee..0fec0ee58cc 100644 --- a/ckan/tests/logic/action/test_delete.py +++ b/ckan/tests/logic/action/test_delete.py @@ -140,3 +140,55 @@ def test_tag_delete_with_unicode_returns_unicode_error(self): assert u'Delta symbol: \u0394' in unicode(e) else: assert 0, 'Should have raised NotFound' + + +class TestGroupPurge(object): + def setup(self): + helpers.reset_db() + + def test_purged_group_does_not_show(self): + group = factories.Group() + + helpers.call_action('group_purge', + context={'ignore_auth': True}, + id=group['name']) + + assert_raises(logic.NotFound, helpers.call_action, 'group_show', + context={}, id=group['name']) + + def test_purged_group_leaves_no_trace_in_the_model(self): + factories.Group(name='parent') + user = factories.User() + group1 = factories.Group(name='group1', + extras=[{'key': 'key1', 'value': 'val1'}], + users=[{'name': user['name']}], + groups=[{'name': 'parent'}]) + factories.Group(name='child', groups=[{'name': 'group1'}]) + num_revisions_before = model.Session.query(model.Revision).count() + + helpers.call_action('group_purge', + context={'ignore_auth': True}, + id=group1['name']) + num_revisions_after = model.Session.query(model.Revision).count() + + # the Group and related objects are gone + assert_equals(sorted([g.name for g in + model.Session.query(model.Group).all()]), + ['child', 'parent']) + assert_equals(model.Session.query(model.GroupExtra).all(), []) + # the only members left are the users for the parent and child + assert_equals(sorted([ + (m.table_name, m.group.name) + for m in model.Session.query(model.Member).join(model.Group)]), + [('user', 'child'), ('user', 'parent')]) + + # the group's object revisions were purged too + assert_equals(sorted( + [gr.name for gr in model.Session.query(model.GroupRevision)]), + ['child', 'parent']) + assert_equals(model.Session.query(model.GroupExtraRevision).all(), + []) + # Member is not revisioned + + # No Revision objects were purged, in fact 1 is created for the purge + assert_equals(num_revisions_after - num_revisions_before, 1) From 3bbef1b029dcec224c54bafddc00655ac234916d Mon Sep 17 00:00:00 2001 From: David Read Date: Thu, 10 Sep 2015 17:39:36 +0100 Subject: [PATCH 2/5] [#2631] Tests added for auth and orgs. --- ckan/tests/logic/action/test_delete.py | 73 ++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/ckan/tests/logic/action/test_delete.py b/ckan/tests/logic/action/test_delete.py index 0fec0ee58cc..1303fd3b1cc 100644 --- a/ckan/tests/logic/action/test_delete.py +++ b/ckan/tests/logic/action/test_delete.py @@ -146,6 +146,16 @@ class TestGroupPurge(object): def setup(self): helpers.reset_db() + def test_a_non_sysadmin_cant_purge_group(self): + user = factories.User() + group = factories.Group(user=user) + + assert_raises(logic.NotAuthorized, + helpers.call_action, + 'group_purge', + context={'user': user['name'], 'ignore_auth': False}, + id=group['name']) + def test_purged_group_does_not_show(self): group = factories.Group() @@ -192,3 +202,66 @@ def test_purged_group_leaves_no_trace_in_the_model(self): # No Revision objects were purged, in fact 1 is created for the purge assert_equals(num_revisions_after - num_revisions_before, 1) + + +class TestOrganizationPurge(object): + def setup(self): + helpers.reset_db() + + def test_a_non_sysadmin_cant_purge_org(self): + user = factories.User() + org = factories.Organization(user=user) + + assert_raises(logic.NotAuthorized, + helpers.call_action, + 'organization_purge', + context={'user': user['name'], 'ignore_auth': False}, + id=org['name']) + + def test_purged_org_does_not_show(self): + org = factories.Organization() + + helpers.call_action('organization_purge', + context={'ignore_auth': True}, + id=org['name']) + + assert_raises(logic.NotFound, helpers.call_action, 'organization_show', + context={}, id=org['name']) + + def test_purged_organization_leaves_no_trace_in_the_model(self): + factories.Organization(name='parent') + user = factories.User() + org1 = factories.Organization( + name='org1', + extras=[{'key': 'key1', 'value': 'val1'}], + users=[{'name': user['name']}], + groups=[{'name': 'parent'}]) + factories.Organization(name='child', groups=[{'name': 'group1'}]) + num_revisions_before = model.Session.query(model.Revision).count() + + helpers.call_action('organization_purge', + context={'ignore_auth': True}, + id=org1['name']) + num_revisions_after = model.Session.query(model.Revision).count() + + # the Organization and related objects are gone + assert_equals(sorted([o.name for o in + model.Session.query(model.Group).all()]), + ['child', 'parent']) + assert_equals(model.Session.query(model.GroupExtra).all(), []) + # the only members left are the users for the parent and child + assert_equals(sorted([ + (m.table_name, m.group.name) + for m in model.Session.query(model.Member).join(model.Group)]), + [('user', 'child'), ('user', 'parent')]) + + # the organization's object revisions were purged too + assert_equals(sorted( + [gr.name for gr in model.Session.query(model.GroupRevision)]), + ['child', 'parent']) + assert_equals(model.Session.query(model.GroupExtraRevision).all(), + []) + # Member is not revisioned + + # No Revision objects were purged, in fact 1 is created for the purge + assert_equals(num_revisions_after - num_revisions_before, 1) From 2f6b9039533ad2c218e22153c19b04edf693cea3 Mon Sep 17 00:00:00 2001 From: David Read Date: Thu, 10 Sep 2015 21:22:36 +0100 Subject: [PATCH 3/5] [#2632] Delete the owner_org field of orphaned datasets. More tests. --- ckan/logic/action/delete.py | 26 ++++++++ ckan/tests/logic/action/test_delete.py | 89 ++++++++++++++++++++++---- 2 files changed, 102 insertions(+), 13 deletions(-) diff --git a/ckan/logic/action/delete.py b/ckan/logic/action/delete.py index 791b34a3d3e..bf4b4f14416 100644 --- a/ckan/logic/action/delete.py +++ b/ckan/logic/action/delete.py @@ -6,6 +6,7 @@ import ckan.logic.action import ckan.plugins as plugins import ckan.lib.dictization.model_dictize as model_dictize +from ckan import authz from ckan.common import _ @@ -399,6 +400,26 @@ def _group_or_org_purge(context, data_dict, is_org=False): else: _check_access('group_purge', context, data_dict) + if is_org: + # Clear the owner_org field + datasets = model.Session.query(model.Package) \ + .filter_by(owner_org=group.id) \ + .filter(model.Package.state != 'deleted') \ + .count() + if datasets: + if not authz.check_config_permission('ckan.auth.create_unowned_dataset'): + raise ValidationError('Organization cannot be purged while it ' + 'still has datasets') + pkg_table = model.package_table + # using Core SQLA instead of the ORM should be faster + model.Session.execute( + pkg_table.update().where( + sqla.and_(pkg_table.c.owner_org == group.id, + pkg_table.c.state != 'deleted') + ).values(owner_org=None) + ) + + # Delete related Memberships members = model.Session.query(model.Member) \ .filter(sqla.or_(model.Member.group_id == group.id, model.Member.table_id == group.id)) @@ -423,6 +444,8 @@ def group_purge(context, data_dict): whereas deleting a group simply marks the group as deleted (it will no longer show up in the frontend, but is still in the db). + Datasets in the organization will remain, just not in the purged group. + You must be authorized to purge the group. :param id: the name or id of the group to be purged @@ -441,6 +464,9 @@ def organization_purge(context, data_dict): deleted (it will no longer show up in the frontend, but is still in the db). + Datasets owned by the organization will remain, just not in an + organization any more. + You must be authorized to purge the organization. :param id: the name or id of the organization to be purged diff --git a/ckan/tests/logic/action/test_delete.py b/ckan/tests/logic/action/test_delete.py index 1303fd3b1cc..b7640854160 100644 --- a/ckan/tests/logic/action/test_delete.py +++ b/ckan/tests/logic/action/test_delete.py @@ -5,6 +5,7 @@ import ckan.logic as logic import ckan.model as model import ckan.plugins as p +import ckan.lib.search as search assert_equals = nose.tools.assert_equals assert_raises = nose.tools.assert_raises @@ -159,13 +160,42 @@ def test_a_non_sysadmin_cant_purge_group(self): def test_purged_group_does_not_show(self): group = factories.Group() - helpers.call_action('group_purge', - context={'ignore_auth': True}, - id=group['name']) + helpers.call_action('group_purge', id=group['name']) assert_raises(logic.NotFound, helpers.call_action, 'group_show', context={}, id=group['name']) + def test_purged_group_is_not_listed(self): + group = factories.Group() + + helpers.call_action('group_purge', id=group['name']) + + assert_equals(helpers.call_action('group_list', context={}), []) + + def test_dataset_in_a_purged_group_no_longer_shows_that_group(self): + group = factories.Group() + dataset = factories.Dataset(groups=[{'name': group['name']}]) + + helpers.call_action('group_purge', id=group['name']) + + dataset_shown = helpers.call_action('package_show', context={}, + id=dataset['id']) + assert_equals(dataset_shown['groups'], []) + + def test_purged_group_is_not_in_search_results_for_its_ex_dataset(self): + search.clear() + group = factories.Group() + dataset = factories.Dataset(groups=[{'name': group['name']}]) + def get_search_result_groups(): + results = helpers.call_action('package_search', + q=dataset['title'])['results'] + return [g['name'] for g in results[0]['groups']] + assert_equals(get_search_result_groups(), [group['name']]) + + helpers.call_action('group_purge', id=group['name']) + + assert_equals(get_search_result_groups(), []) + def test_purged_group_leaves_no_trace_in_the_model(self): factories.Group(name='parent') user = factories.User() @@ -173,12 +203,11 @@ def test_purged_group_leaves_no_trace_in_the_model(self): extras=[{'key': 'key1', 'value': 'val1'}], users=[{'name': user['name']}], groups=[{'name': 'parent'}]) + factories.Dataset(name='ds', groups=[{'name': 'group1'}]) factories.Group(name='child', groups=[{'name': 'group1'}]) num_revisions_before = model.Session.query(model.Revision).count() - helpers.call_action('group_purge', - context={'ignore_auth': True}, - id=group1['name']) + helpers.call_action('group_purge', id=group1['name']) num_revisions_after = model.Session.query(model.Revision).count() # the Group and related objects are gone @@ -191,6 +220,9 @@ def test_purged_group_leaves_no_trace_in_the_model(self): (m.table_name, m.group.name) for m in model.Session.query(model.Member).join(model.Group)]), [('user', 'child'), ('user', 'parent')]) + # the dataset is still there though + assert_equals([p.name for p in model.Session.query(model.Package)], + ['ds']) # the group's object revisions were purged too assert_equals(sorted( @@ -221,13 +253,42 @@ def test_a_non_sysadmin_cant_purge_org(self): def test_purged_org_does_not_show(self): org = factories.Organization() - helpers.call_action('organization_purge', - context={'ignore_auth': True}, - id=org['name']) + helpers.call_action('organization_purge', id=org['name']) assert_raises(logic.NotFound, helpers.call_action, 'organization_show', context={}, id=org['name']) + def test_purged_org_is_not_listed(self): + org = factories.Organization() + + helpers.call_action('organization_purge', id=org['name']) + + assert_equals(helpers.call_action('organization_list', context={}), []) + + def test_dataset_in_a_purged_org_no_longer_shows_that_org(self): + org = factories.Organization() + dataset = factories.Dataset(owner_org=org['id']) + + helpers.call_action('organization_purge', id=org['name']) + + dataset_shown = helpers.call_action('package_show', context={}, + id=dataset['id']) + assert_equals(dataset_shown['owner_org'], None) + + def test_purged_org_is_not_in_search_results_for_its_ex_dataset(self): + search.clear() + org = factories.Organization() + dataset = factories.Dataset(owner_org=org['id']) + def get_search_result_owner_org(): + results = helpers.call_action('package_search', + q=dataset['title'])['results'] + return results[0]['owner_org'] + assert_equals(get_search_result_owner_org(), org['id']) + + helpers.call_action('organization_purge', id=org['name']) + + assert_equals(get_search_result_owner_org(), None) + def test_purged_organization_leaves_no_trace_in_the_model(self): factories.Organization(name='parent') user = factories.User() @@ -236,12 +297,11 @@ def test_purged_organization_leaves_no_trace_in_the_model(self): extras=[{'key': 'key1', 'value': 'val1'}], users=[{'name': user['name']}], groups=[{'name': 'parent'}]) - factories.Organization(name='child', groups=[{'name': 'group1'}]) + factories.Dataset(name='ds', owner_org=org1['id']) + factories.Organization(name='child', groups=[{'name': 'org1'}]) num_revisions_before = model.Session.query(model.Revision).count() - helpers.call_action('organization_purge', - context={'ignore_auth': True}, - id=org1['name']) + helpers.call_action('organization_purge', id=org1['name']) num_revisions_after = model.Session.query(model.Revision).count() # the Organization and related objects are gone @@ -254,6 +314,9 @@ def test_purged_organization_leaves_no_trace_in_the_model(self): (m.table_name, m.group.name) for m in model.Session.query(model.Member).join(model.Group)]), [('user', 'child'), ('user', 'parent')]) + # the dataset is still there though + assert_equals([p.name for p in model.Session.query(model.Package)], + ['ds']) # the organization's object revisions were purged too assert_equals(sorted( From d5d275d399be99664c653b2cc5d1ca198823326f Mon Sep 17 00:00:00 2001 From: David Read Date: Thu, 10 Sep 2015 21:30:22 +0100 Subject: [PATCH 4/5] [#2632/#2631] All tests migrated now from legacy. --- .../test_group_and_organization_purge.py | 409 ------------------ ckan/tests/logic/action/test_delete.py | 16 + 2 files changed, 16 insertions(+), 409 deletions(-) delete mode 100644 ckan/tests/legacy/functional/api/model/test_group_and_organization_purge.py diff --git a/ckan/tests/legacy/functional/api/model/test_group_and_organization_purge.py b/ckan/tests/legacy/functional/api/model/test_group_and_organization_purge.py deleted file mode 100644 index b7f6b626851..00000000000 --- a/ckan/tests/legacy/functional/api/model/test_group_and_organization_purge.py +++ /dev/null @@ -1,409 +0,0 @@ -'''Functional tests for the group_ and organization_purge APIs. - -''' -import ckan.model as model -import ckan.tests.legacy as tests - -import paste -import pylons.test - - -class TestGroupAndOrganizationPurging(object): - '''Tests for the group_ and organization_purge APIs. - - ''' - @classmethod - def setup_class(cls): - cls.app = paste.fixture.TestApp(pylons.test.pylonsapp) - - # Make a sysadmin user. - cls.sysadmin = model.User(name='test_sysadmin', sysadmin=True) - model.Session.add(cls.sysadmin) - model.Session.commit() - model.Session.remove() - - # A package that will be added to our test groups and organizations. - cls.package = tests.call_action_api(cls.app, 'package_create', - name='test_package', - apikey=cls.sysadmin.apikey) - - # A user who will not be a member of our test groups or organizations. - cls.visitor = tests.call_action_api(cls.app, 'user_create', - name='non_member', - email='blah', - password='farm', - apikey=cls.sysadmin.apikey) - - # A user who will become a member of our test groups and organizations. - cls.member = tests.call_action_api(cls.app, 'user_create', - name='member', - email='blah', - password='farm', - apikey=cls.sysadmin.apikey) - - # A user who will become an editor of our test groups and - # organizations. - cls.editor = tests.call_action_api(cls.app, 'user_create', - name='editor', - email='blah', - password='farm', - apikey=cls.sysadmin.apikey) - - # A user who will become an admin of our test groups and organizations. - cls.admin = tests.call_action_api(cls.app, 'user_create', - name='admin', - email='blah', - password='farm', - apikey=cls.sysadmin.apikey) - - @classmethod - def teardown_class(cls): - model.repo.rebuild_db() - - def _organization_create(self, organization_name): - '''Return an organization with some users and a dataset.''' - - # Make an organization with some users. - users = [{'name': self.member['name'], 'capacity': 'member'}, - {'name': self.editor['name'], 'capacity': 'editor'}, - {'name': self.admin['name'], 'capacity': 'admin'}] - organization = tests.call_action_api(self.app, 'organization_create', - apikey=self.sysadmin.apikey, - name=organization_name, - users=users) - - # Add a dataset to the organization (have to do this separately - # because the packages param of organization_create doesn't work). - tests.call_action_api(self.app, 'package_update', - name=self.package['name'], - owner_org=organization['name'], - apikey=self.sysadmin.apikey) - - return organization - - def _group_create(self, group_name): - '''Return a group with some users and a dataset.''' - - # Make a group with some users and a dataset. - group = tests.call_action_api(self.app, 'group_create', - apikey=self.sysadmin.apikey, - name=group_name, - users=[ - {'name': self.member['name'], - 'capacity': 'member', - }, - {'name': self.editor['name'], - 'capacity': 'editor', - }, - {'name': self.admin['name'], - 'capacity': 'admin', - }], - packages=[ - {'id': self.package['name']}], - ) - - return group - - def _test_group_or_organization_purge(self, name, by_id, is_org): - '''Create a group or organization with the given name, and test - purging it. - - :param name: the name of the group or organization to create and purge - :param by_id: if True, pass the organization's id to - organization_purge, otherwise pass its name - :type by_id: boolean - :param is_org: if True create and purge an organization, if False a - group - :type is_org: boolean - - ''' - if is_org: - group_or_org = self._organization_create(name) - else: - group_or_org = self._group_create(name) - - # Purge the group or organization. - if is_org: - action = 'organization_purge' - else: - action = 'group_purge' - if by_id: - identifier = group_or_org['id'] - else: - identifier = group_or_org['name'] - result = tests.call_action_api(self.app, action, id=identifier, - apikey=self.sysadmin.apikey, - ) - assert result is None - - # Now trying to show the group or organization should give a 404. - if is_org: - action = 'organization_show' - else: - action = 'group_show' - result = tests.call_action_api(self.app, action, id=name, status=404) - assert result == {'__type': 'Not Found Error', 'message': 'Not found'} - - # The group or organization should not appear in group_list or - # organization_list. - if is_org: - action = 'organization_list' - else: - action = 'group_list' - assert name not in tests.call_action_api(self.app, action) - - # The package should no longer belong to the group or organization. - package = tests.call_action_api(self.app, 'package_show', - id=self.package['name']) - if is_org: - assert package['organization'] is None - else: - assert group_or_org['name'] not in [group_['name'] for group_ - in package['groups']] - - # TODO: Also want to assert that user is not in group or organization - # anymore, but how to get a user's groups or organizations? - - # It should be possible to create a new group or organization with the - # same name as the purged one (you would not be able to do this if you - # had merely deleted the original group or organization). - if is_org: - action = 'organization_create' - else: - action = 'group_create' - new_group_or_org = tests.call_action_api(self.app, action, name=name, - apikey=self.sysadmin.apikey, - ) - assert new_group_or_org['name'] == name - - # TODO: Should we do a model-level check, to check that the group or - # org is really purged? - - def test_organization_purge_by_name(self): - '''A sysadmin should be able to purge an organization by name.''' - - self._test_group_or_organization_purge('organization-to-be-purged', - by_id=False, is_org=True) - - def test_group_purge_by_name(self): - '''A sysadmin should be able to purge a group by name.''' - self._test_group_or_organization_purge('group-to-be-purged', - by_id=False, is_org=False) - - def test_organization_purge_by_id(self): - '''A sysadmin should be able to purge an organization by id.''' - self._test_group_or_organization_purge('organization-to-be-purged-2', - by_id=True, is_org=True) - - def test_group_purge_by_id(self): - '''A sysadmin should be able to purge a group by id.''' - self._test_group_or_organization_purge('group-to-be-purged-2', - by_id=True, is_org=False) - - def _test_group_or_org_purge_with_invalid_id(self, is_org): - - if is_org: - action = 'organization_purge' - else: - action = 'group_purge' - - for name in ('foo', 'invalid name', None, ''): - # Try to purge an organization, but pass an invalid name. - result = tests.call_action_api(self.app, action, - apikey=self.sysadmin.apikey, - id=name, - status=404, - ) - if is_org: - message = 'Not found: Organization was not found' - else: - message = 'Not found: Group was not found' - assert result == {'__type': 'Not Found Error', 'message': message} - - def test_organization_purge_with_invalid_id(self): - ''' - Trying to purge an organization with an invalid ID should give a 404. - - ''' - self._test_group_or_org_purge_with_invalid_id(is_org=True) - - def test_group_purge_with_invalid_id(self): - '''Trying to purge a group with an invalid ID should give a 404.''' - self._test_group_or_org_purge_with_invalid_id(is_org=False) - - def _test_group_or_org_purge_with_missing_id(self, is_org): - if is_org: - action = 'organization_purge' - else: - action = 'group_purge' - result = tests.call_action_api(self.app, action, - apikey=self.sysadmin.apikey, - status=409, - ) - assert result == {'__type': 'Validation Error', - 'id': ['Missing value']} - - def test_organization_purge_with_missing_id(self): - '''Trying to purge an organization without passing an id should give - a 409.''' - self._test_group_or_org_purge_with_missing_id(is_org=True) - - def test_group_purge_with_missing_id(self): - '''Trying to purge a group without passing an id should give a 409.''' - self._test_group_or_org_purge_with_missing_id(is_org=False) - - def _test_visitors_cannot_purge_groups_or_orgs(self, is_org): - if is_org: - group_or_org = self._organization_create('org-to-be-purged-3') - else: - group_or_org = self._group_create('group-to-be-purged-3') - - # Try to purge the group or organization without an API key. - if is_org: - action = 'organization_purge' - else: - action = 'group_purge' - result = tests.call_action_api(self.app, action, id=group_or_org['id'], - status=403, - ) - assert result['__type'] == 'Authorization Error' - - def test_visitors_cannot_purge_organizations(self): - '''Visitors (who aren't logged in) should not be authorized to purge - organizations. - - ''' - self._test_visitors_cannot_purge_groups_or_orgs(is_org=True) - - def test_visitors_cannot_purge_groups(self): - '''Visitors (who aren't logged in) should not be authorized to purge - groups. - - ''' - self._test_visitors_cannot_purge_groups_or_orgs(is_org=False) - - def _test_users_cannot_purge_groups_or_orgs(self, is_org): - if is_org: - group_or_org = self._organization_create('org-to-be-purged-4') - else: - group_or_org = self._group_create('group-to-be-purged-4') - - # Try to purge the group or organization with a non-member's API key. - if is_org: - action = 'organization_purge' - else: - action = 'group_purge' - result = tests.call_action_api(self.app, action, id=group_or_org['id'], - apikey=self.visitor['apikey'], - status=403, - ) - assert result == {'__type': 'Authorization Error', - 'message': 'Access denied'} - - def test_users_cannot_purge_organizations(self): - '''Users who are not members of an organization should not be - authorized to purge the organization. - - ''' - self._test_users_cannot_purge_groups_or_orgs(is_org=True) - - def test_users_cannot_purge_groups(self): - '''Users who are not members of a group should not be authorized to - purge the group. - - ''' - self._test_users_cannot_purge_groups_or_orgs(is_org=False) - - def _test_members_cannot_purge_groups_or_orgs(self, is_org): - if is_org: - group_or_org = self._organization_create('org-to-be-purged-5') - else: - group_or_org = self._group_create('group-to-be-purged-5') - - # Try to purge the organization with an organization member's API key. - if is_org: - action = 'organization_purge' - else: - action = 'group_purge' - result = tests.call_action_api(self.app, action, id=group_or_org['id'], - apikey=self.member['apikey'], - status=403, - ) - assert result == {'__type': 'Authorization Error', - 'message': 'Access denied'} - - def test_members_cannot_purge_organizations(self): - '''Members of an organization should not be authorized to purge the - organization. - - ''' - self._test_members_cannot_purge_groups_or_orgs(is_org=True) - - def test_members_cannot_purge_groups(self): - '''Members of a group should not be authorized to purge the group. - - ''' - self._test_members_cannot_purge_groups_or_orgs(is_org=False) - - def _test_editors_cannot_purge_groups_or_orgs(self, is_org): - if is_org: - group_or_org = self._organization_create('org-to-be-purged-6') - else: - group_or_org = self._group_create('group-to-be-purged-6') - - # Try to purge the group or organization with an editor's API key. - if is_org: - action = 'organization_purge' - else: - action = 'group_purge' - result = tests.call_action_api(self.app, action, id=group_or_org['id'], - apikey=self.editor['apikey'], - status=403, - ) - assert result == {'__type': 'Authorization Error', - 'message': 'Access denied'} - - def test_editors_cannot_purge_organizations(self): - '''Editors of an organization should not be authorized to purge the - organization. - - ''' - self._test_editors_cannot_purge_groups_or_orgs(is_org=True) - - def test_editors_cannot_purge_groups(self): - '''Editors of a group should not be authorized to purge the group. - - ''' - self._test_editors_cannot_purge_groups_or_orgs(is_org=False) - - def _test_admins_cannot_purge_groups_or_orgs(self, is_org): - if is_org: - group_or_org = self._organization_create('org-to-be-purged-7') - else: - group_or_org = self._group_create('group-to-be-purged-7') - - # Try to purge the group or organization with an admin's API key. - if is_org: - action = 'organization_purge' - else: - action = 'group_purge' - result = tests.call_action_api(self.app, action, - id=group_or_org['id'], - apikey=self.admin['apikey'], - status=403, - ) - assert result == {'__type': 'Authorization Error', - 'message': 'Access denied'} - - def test_admins_cannot_purge_organizations(self): - '''Admins of an organization should not be authorized to purge the - organization. - - ''' - self._test_admins_cannot_purge_groups_or_orgs(is_org=True) - - def test_admins_cannot_purge_groups(self): - '''Admins of a group should not be authorized to purge the group. - - ''' - self._test_admins_cannot_purge_groups_or_orgs(is_org=False) diff --git a/ckan/tests/logic/action/test_delete.py b/ckan/tests/logic/action/test_delete.py index b7640854160..00d263032bb 100644 --- a/ckan/tests/logic/action/test_delete.py +++ b/ckan/tests/logic/action/test_delete.py @@ -235,6 +235,14 @@ def test_purged_group_leaves_no_trace_in_the_model(self): # No Revision objects were purged, in fact 1 is created for the purge assert_equals(num_revisions_after - num_revisions_before, 1) + def test_missing_id_returns_error(self): + assert_raises(logic.ValidationError, + helpers.call_action, 'group_purge') + + def test_bad_id_returns_404(self): + assert_raises(logic.NotFound, + helpers.call_action, 'group_purge', id='123') + class TestOrganizationPurge(object): def setup(self): @@ -328,3 +336,11 @@ def test_purged_organization_leaves_no_trace_in_the_model(self): # No Revision objects were purged, in fact 1 is created for the purge assert_equals(num_revisions_after - num_revisions_before, 1) + + def test_missing_id_returns_error(self): + assert_raises(logic.ValidationError, + helpers.call_action, 'organization_purge') + + def test_bad_id_returns_404(self): + assert_raises(logic.NotFound, + helpers.call_action, 'organization_purge', id='123') From 7ca003ff0bbd156d2b8ba44f872a48d1de39b3a4 Mon Sep 17 00:00:00 2001 From: David Read Date: Thu, 10 Sep 2015 23:14:51 +0100 Subject: [PATCH 5/5] [#2632] PEP8 --- ckan/tests/logic/action/test_delete.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ckan/tests/logic/action/test_delete.py b/ckan/tests/logic/action/test_delete.py index 00d263032bb..6d3935a34ed 100644 --- a/ckan/tests/logic/action/test_delete.py +++ b/ckan/tests/logic/action/test_delete.py @@ -186,6 +186,7 @@ def test_purged_group_is_not_in_search_results_for_its_ex_dataset(self): search.clear() group = factories.Group() dataset = factories.Dataset(groups=[{'name': group['name']}]) + def get_search_result_groups(): results = helpers.call_action('package_search', q=dataset['title'])['results'] @@ -287,6 +288,7 @@ def test_purged_org_is_not_in_search_results_for_its_ex_dataset(self): search.clear() org = factories.Organization() dataset = factories.Dataset(owner_org=org['id']) + def get_search_result_owner_org(): results = helpers.call_action('package_search', q=dataset['title'])['results']