From 69883390a99590d5ef0305a0faf527ab24958c47 Mon Sep 17 00:00:00 2001 From: amercader Date: Wed, 12 Nov 2014 10:28:37 +0000 Subject: [PATCH] [#2037] Make resource_create auth work against package_update Right now it was deferring to package_create, which doesn't make much sense. Basically if you can update this particular dataset, you should be able to add a resource to it. Added auth tests. Also fixed the authorization for resource views creation. This is being fully tested on #1852 --- ckan/logic/auth/create.py | 37 +++++++-- ckan/new_tests/logic/auth/test_create.py | 101 +++++++++++++++++++++++ 2 files changed, 131 insertions(+), 7 deletions(-) diff --git a/ckan/logic/auth/create.py b/ckan/logic/auth/create.py index 8df9c964fcf..d7f9f598cb2 100644 --- a/ckan/logic/auth/create.py +++ b/ckan/logic/auth/create.py @@ -63,17 +63,40 @@ def related_create(context, data_dict=None): def resource_create(context, data_dict): - # resource_create runs through package_update, no need to - # check users eligibility to add resource to package here. + model = context['model'] + user = context.get('user') + + package_id = data_dict.get('package_id') + if not package_id and data_dict.get('id'): + # This can happen when auth is deferred, eg from `resource_view_create` + resource = logic_auth.get_resource_object(context, data_dict) + package_id = resource.package_id + + if not package_id: + raise logic.NotFound( + _('No dataset id provided, cannot check auth.') + ) + + # check authentication against package + pkg = model.Package.get(package_id) + if not pkg: + raise logic.NotFound( + _('No package found for this resource, cannot check auth.') + ) + + pkg_dict = {'id': pkg.id} + authorized = new_authz.is_authorized('package_update', context, pkg_dict).get('success') - # FIXME This is identical behaviour to what existed but feels like we - # should be using package_update permissions and have better errors. I - # am also not sure about the need for the group issue - return new_authz.is_authorized('package_create', context, data_dict) + if not authorized: + return {'success': False, + 'msg': _('User %s not authorized to create resources on dataset %s') % + (str(user), package_id)} + else: + return {'success': True} def resource_view_create(context, data_dict): - return resource_create(context, data_dict) + return resource_create(context, {'id': data_dict['resource_id']}) def package_relationship_create(context, data_dict): diff --git a/ckan/new_tests/logic/auth/test_create.py b/ckan/new_tests/logic/auth/test_create.py index 030ebed5f0c..15a0f8266c2 100644 --- a/ckan/new_tests/logic/auth/test_create.py +++ b/ckan/new_tests/logic/auth/test_create.py @@ -105,3 +105,104 @@ def test_user_invite_delegates_correctly_to_group_member_create(self, gmc): gmc.return_value = {'success': True} result = helpers.call_auth('user_invite', context=context, **data_dict) assert result is True + + +class TestCreateResources(object): + + @classmethod + def setup_class(cls): + + helpers.reset_db() + + def test_authorized_if_user_has_permissions_on_dataset(self): + + user = factories.User() + + dataset = factories.Dataset(user=user) + + resource = {'package_id': dataset['id'], + 'title': 'Resource', + 'url': 'http://test', + 'format': 'csv'} + + context = {'user': user['name'], 'model': core_model} + response = helpers.call_auth('resource_create', + context=context, **resource) + assert_equals(response, True) + + def test_not_authorized_if_user_has_no_permissions_on_dataset(self): + + org = factories.Organization() + + user = factories.User() + + member = {'username': user['name'], + 'role': 'admin', + 'id': org['id']} + helpers.call_action('organization_member_create', **member) + + user_2 = factories.User() + + dataset = factories.Dataset(user=user, owner_org=org['id']) + + resource = {'package_id': dataset['id'], + 'title': 'Resource', + 'url': 'http://test', + 'format': 'csv'} + + context = {'user': user_2['name'], 'model': core_model} + nose.tools.assert_raises(logic.NotAuthorized, helpers.call_auth, + 'resource_create', context=context, + **resource) + + def test_not_authorized_if_not_logged_in(self): + + resource = {'title': 'Resource', + 'url': 'http://test', + 'format': 'csv'} + + context = {'user': None, 'model': core_model} + nose.tools.assert_raises(logic.NotAuthorized, helpers.call_auth, + 'resource_create', context=context, + **resource) + + def test_sysadmin_is_authorized(self): + + sysadmin = factories.Sysadmin() + + resource = {'title': 'Resource', + 'url': 'http://test', + 'format': 'csv'} + + context = {'user': sysadmin['name'], 'model': core_model} + response = helpers.call_auth('resource_create', + context=context, **resource) + assert_equals(response, True) + + def test_raises_not_found_if_no_package_id_provided(self): + + user = factories.User() + + resource = {'title': 'Resource', + 'url': 'http://test', + 'format': 'csv'} + + context = {'user': user['name'], 'model': core_model} + nose.tools.assert_raises(logic.NotFound, helpers.call_auth, + 'resource_create', context=context, + **resource) + + def test_raises_not_found_if_dataset_was_not_found(self): + + user = factories.User() + + resource = {'package_id': 'does_not_exist', + 'title': 'Resource', + 'url': 'http://test', + 'format': 'csv'} + + context = {'user': user['name'], 'model': core_model} + nose.tools.assert_raises(logic.NotFound, helpers.call_auth, + 'resource_create', context=context, + **resource) +