Skip to content

Commit

Permalink
[#2037] Make resource_create auth work against package_update
Browse files Browse the repository at this point in the history
Right now it was deferring to package_create, which doesn't make much
sense. Basically if you can update this particular dataset, you should
be able to add a resource to it. Added auth tests.

Also fixed the authorization for resource views creation. This is being
fully tested on #1852
  • Loading branch information
amercader committed Nov 12, 2014
1 parent 814edd8 commit 6988339
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 7 deletions.
37 changes: 30 additions & 7 deletions ckan/logic/auth/create.py
Expand Up @@ -63,17 +63,40 @@ def related_create(context, data_dict=None):


def resource_create(context, data_dict):
# resource_create runs through package_update, no need to
# check users eligibility to add resource to package here.
model = context['model']
user = context.get('user')

package_id = data_dict.get('package_id')
if not package_id and data_dict.get('id'):
# This can happen when auth is deferred, eg from `resource_view_create`
resource = logic_auth.get_resource_object(context, data_dict)
package_id = resource.package_id

if not package_id:
raise logic.NotFound(
_('No dataset id provided, cannot check auth.')
)

# check authentication against package
pkg = model.Package.get(package_id)
if not pkg:
raise logic.NotFound(
_('No package found for this resource, cannot check auth.')
)

pkg_dict = {'id': pkg.id}
authorized = new_authz.is_authorized('package_update', context, pkg_dict).get('success')

# FIXME This is identical behaviour to what existed but feels like we
# should be using package_update permissions and have better errors. I
# am also not sure about the need for the group issue
return new_authz.is_authorized('package_create', context, data_dict)
if not authorized:
return {'success': False,
'msg': _('User %s not authorized to create resources on dataset %s') %
(str(user), package_id)}
else:
return {'success': True}


def resource_view_create(context, data_dict):
return resource_create(context, data_dict)
return resource_create(context, {'id': data_dict['resource_id']})


def package_relationship_create(context, data_dict):
Expand Down
101 changes: 101 additions & 0 deletions ckan/new_tests/logic/auth/test_create.py
Expand Up @@ -105,3 +105,104 @@ def test_user_invite_delegates_correctly_to_group_member_create(self, gmc):
gmc.return_value = {'success': True}
result = helpers.call_auth('user_invite', context=context, **data_dict)
assert result is True


class TestCreateResources(object):

@classmethod
def setup_class(cls):

helpers.reset_db()

def test_authorized_if_user_has_permissions_on_dataset(self):

user = factories.User()

dataset = factories.Dataset(user=user)

resource = {'package_id': dataset['id'],
'title': 'Resource',
'url': 'http://test',
'format': 'csv'}

context = {'user': user['name'], 'model': core_model}
response = helpers.call_auth('resource_create',
context=context, **resource)
assert_equals(response, True)

def test_not_authorized_if_user_has_no_permissions_on_dataset(self):

org = factories.Organization()

user = factories.User()

member = {'username': user['name'],
'role': 'admin',
'id': org['id']}
helpers.call_action('organization_member_create', **member)

user_2 = factories.User()

dataset = factories.Dataset(user=user, owner_org=org['id'])

resource = {'package_id': dataset['id'],
'title': 'Resource',
'url': 'http://test',
'format': 'csv'}

context = {'user': user_2['name'], 'model': core_model}
nose.tools.assert_raises(logic.NotAuthorized, helpers.call_auth,
'resource_create', context=context,
**resource)

def test_not_authorized_if_not_logged_in(self):

resource = {'title': 'Resource',
'url': 'http://test',
'format': 'csv'}

context = {'user': None, 'model': core_model}
nose.tools.assert_raises(logic.NotAuthorized, helpers.call_auth,
'resource_create', context=context,
**resource)

def test_sysadmin_is_authorized(self):

sysadmin = factories.Sysadmin()

resource = {'title': 'Resource',
'url': 'http://test',
'format': 'csv'}

context = {'user': sysadmin['name'], 'model': core_model}
response = helpers.call_auth('resource_create',
context=context, **resource)
assert_equals(response, True)

def test_raises_not_found_if_no_package_id_provided(self):

user = factories.User()

resource = {'title': 'Resource',
'url': 'http://test',
'format': 'csv'}

context = {'user': user['name'], 'model': core_model}
nose.tools.assert_raises(logic.NotFound, helpers.call_auth,
'resource_create', context=context,
**resource)

def test_raises_not_found_if_dataset_was_not_found(self):

user = factories.User()

resource = {'package_id': 'does_not_exist',
'title': 'Resource',
'url': 'http://test',
'format': 'csv'}

context = {'user': user['name'], 'model': core_model}
nose.tools.assert_raises(logic.NotFound, helpers.call_auth,
'resource_create', context=context,
**resource)

0 comments on commit 6988339

Please sign in to comment.