diff --git a/ckan/authz.py b/ckan/authz.py new file mode 100644 index 00000000000..550a88ac429 --- /dev/null +++ b/ckan/authz.py @@ -0,0 +1,442 @@ +import sys +import re +from logging import getLogger + +from pylons import config +from paste.deploy.converters import asbool + +import ckan.plugins as p +import ckan.model as model +from ckan.common import OrderedDict, _, c + +import ckan.lib.maintain as maintain + +log = getLogger(__name__) + + +class AuthFunctions: + ''' This is a private cache used by get_auth_function() and should never be + accessed directly we will create an instance of it and then remove it.''' + _functions = {} + + def clear(self): + ''' clear any stored auth functions. ''' + self._functions.clear() + + def keys(self): + ''' Return a list of known auth functions.''' + if not self._functions: + self._build() + return self._functions.keys() + + def get(self, function): + ''' Return the requested auth function. ''' + if not self._functions: + self._build() + return self._functions.get(function) + + def _build(self): + ''' Gather the auth functions. + + First get the default ones in the ckan/logic/auth directory Rather than + writing them out in full will use __import__ to load anything from + ckan.auth that looks like it might be an authorisation function''' + + module_root = 'ckan.logic.auth' + + for auth_module_name in ['get', 'create', 'update', 'delete', 'patch']: + module_path = '%s.%s' % (module_root, auth_module_name,) + try: + module = __import__(module_path) + except ImportError: + log.debug('No auth module for action "%s"' % auth_module_name) + continue + + for part in module_path.split('.')[1:]: + module = getattr(module, part) + + for key, v in module.__dict__.items(): + if not key.startswith('_'): + # Whitelist all auth functions defined in + # logic/auth/get.py as not requiring an authorized user, + # as well as ensuring that the rest do. In both cases, do + # nothing if a decorator has already been used to define + # the behaviour + if not hasattr(v, 'auth_allow_anonymous_access'): + if auth_module_name == 'get': + v.auth_allow_anonymous_access = True + else: + v.auth_allow_anonymous_access = False + self._functions[key] = v + + # Then overwrite them with any specific ones in the plugins: + resolved_auth_function_plugins = {} + fetched_auth_functions = {} + for plugin in p.PluginImplementations(p.IAuthFunctions): + for name, auth_function in plugin.get_auth_functions().items(): + if name in resolved_auth_function_plugins: + raise Exception( + 'The auth function %r is already implemented in %r' % ( + name, + resolved_auth_function_plugins[name] + ) + ) + log.debug('Auth function {0} from plugin {1} was inserted'.format(name, plugin.name)) + resolved_auth_function_plugins[name] = plugin.name + fetched_auth_functions[name] = auth_function + # Use the updated ones in preference to the originals. + self._functions.update(fetched_auth_functions) + +_AuthFunctions = AuthFunctions() +#remove the class +del AuthFunctions + + +def clear_auth_functions_cache(): + _AuthFunctions.clear() + + +def auth_functions_list(): + '''Returns a list of the names of the auth functions available. Currently + this is to allow the Auth Audit to know if an auth function is available + for a given action.''' + return _AuthFunctions.keys() + + +def is_sysadmin(username): + ''' Returns True is username is a sysadmin ''' + user = _get_user(username) + return user and user.sysadmin + + +def _get_user(username): + ''' Try to get the user from c, if possible, and fallback to using the DB ''' + if not username: + return None + # See if we can get the user without touching the DB + try: + if c.userobj and c.userobj.name == username: + return c.userobj + except TypeError: + # c is not available + pass + # Get user from the DB + return model.User.get(username) + + +def get_group_or_org_admin_ids(group_id): + if not group_id: + return [] + group_id = model.Group.get(group_id).id + q = model.Session.query(model.Member) \ + .filter(model.Member.group_id == group_id) \ + .filter(model.Member.table_name == 'user') \ + .filter(model.Member.state == 'active') \ + .filter(model.Member.capacity == 'admin') + return [a.table_id for a in q.all()] + + +def is_authorized_boolean(action, context, data_dict=None): + ''' runs the auth function but just returns True if allowed else False + ''' + outcome = is_authorized(action, context, data_dict=data_dict) + return outcome.get('success', False) + + +def is_authorized(action, context, data_dict=None): + if context.get('ignore_auth'): + return {'success': True} + + auth_function = _AuthFunctions.get(action) + if auth_function: + username = context.get('user') + user = _get_user(username) + + if user: + # deleted users are always unauthorized + if user.is_deleted(): + return {'success': False} + # sysadmins can do anything unless the auth_sysadmins_check + # decorator was used in which case they are treated like all other + # users. + elif user.sysadmin: + if not getattr(auth_function, 'auth_sysadmins_check', False): + return {'success': True} + + # If the auth function is flagged as not allowing anonymous access, + # and an existing user object is not provided in the context, deny + # access straight away + if not getattr(auth_function, 'auth_allow_anonymous_access', False) \ + and not context.get('auth_user_obj'): + return {'success': False, + 'msg': '{0} requires an authenticated user' + .format(auth_function) + } + + return auth_function(context, data_dict) + else: + raise ValueError(_('Authorization function not found: %s' % action)) + + +# these are the permissions that roles have +ROLE_PERMISSIONS = OrderedDict([ + ('admin', ['admin']), + ('editor', ['read', 'delete_dataset', 'create_dataset', 'update_dataset', 'manage_group']), + ('member', ['read', 'manage_group']), +]) + + +def _trans_role_admin(): + return _('Admin') + + +def _trans_role_editor(): + return _('Editor') + + +def _trans_role_member(): + return _('Member') + + +def trans_role(role): + module = sys.modules[__name__] + return getattr(module, '_trans_role_%s' % role)() + + +def roles_list(): + ''' returns list of roles for forms ''' + roles = [] + for role in ROLE_PERMISSIONS: + roles.append(dict(text=trans_role(role), value=role)) + return roles + + +def roles_trans(): + ''' return dict of roles with translation ''' + roles = {} + for role in ROLE_PERMISSIONS: + roles[role] = trans_role(role) + return roles + + +def get_roles_with_permission(permission): + ''' returns the roles with the permission requested ''' + roles = [] + for role in ROLE_PERMISSIONS: + permissions = ROLE_PERMISSIONS[role] + if permission in permissions or 'admin' in permissions: + roles.append(role) + return roles + + +def has_user_permission_for_group_or_org(group_id, user_name, permission): + ''' Check if the user has the given permissions for the group, allowing for + sysadmin rights and permission cascading down a group hierarchy. + + ''' + if not group_id: + return False + group = model.Group.get(group_id) + if not group: + return False + group_id = group.id + + # Sys admins can do anything + if is_sysadmin(user_name): + return True + + user_id = get_user_id_for_username(user_name, allow_none=True) + if not user_id: + return False + if _has_user_permission_for_groups(user_id, permission, [group_id]): + return True + # Handle when permissions cascade. Check the user's roles on groups higher + # in the group hierarchy for permission. + for capacity in check_config_permission('roles_that_cascade_to_sub_groups'): + parent_groups = group.get_parent_group_hierarchy(type=group.type) + group_ids = [group_.id for group_ in parent_groups] + if _has_user_permission_for_groups(user_id, permission, group_ids, + capacity=capacity): + return True + return False + + +def _has_user_permission_for_groups(user_id, permission, group_ids, + capacity=None): + ''' Check if the user has the given permissions for the particular + group (ignoring permissions cascading in a group hierarchy). + Can also be filtered by a particular capacity. + ''' + if not group_ids: + return False + # get any roles the user has for the group + q = model.Session.query(model.Member) \ + .filter(model.Member.group_id.in_(group_ids)) \ + .filter(model.Member.table_name == 'user') \ + .filter(model.Member.state == 'active') \ + .filter(model.Member.table_id == user_id) + if capacity: + q = q.filter(model.Member.capacity == capacity) + # see if any role has the required permission + # admin permission allows anything for the group + for row in q.all(): + perms = ROLE_PERMISSIONS.get(row.capacity, []) + if 'admin' in perms or permission in perms: + return True + return False + + +def users_role_for_group_or_org(group_id, user_name): + ''' Returns the user's role for the group. (Ignores privileges that cascade + in a group hierarchy.) + + ''' + if not group_id: + return None + group_id = model.Group.get(group_id).id + + user_id = get_user_id_for_username(user_name, allow_none=True) + if not user_id: + return None + # get any roles the user has for the group + q = model.Session.query(model.Member) \ + .filter(model.Member.group_id == group_id) \ + .filter(model.Member.table_name == 'user') \ + .filter(model.Member.state == 'active') \ + .filter(model.Member.table_id == user_id) + # return the first role we find + for row in q.all(): + return row.capacity + return None + + +def has_user_permission_for_some_org(user_name, permission): + ''' Check if the user has the given permission for any organization. ''' + user_id = get_user_id_for_username(user_name, allow_none=True) + if not user_id: + return False + roles = get_roles_with_permission(permission) + + if not roles: + return False + # get any groups the user has with the needed role + q = model.Session.query(model.Member) \ + .filter(model.Member.table_name == 'user') \ + .filter(model.Member.state == 'active') \ + .filter(model.Member.capacity.in_(roles)) \ + .filter(model.Member.table_id == user_id) + group_ids = [] + for row in q.all(): + group_ids.append(row.group_id) + # if not in any groups has no permissions + if not group_ids: + return False + + # see if any of the groups are orgs + q = model.Session.query(model.Group) \ + .filter(model.Group.is_organization == True) \ + .filter(model.Group.state == 'active') \ + .filter(model.Group.id.in_(group_ids)) + + return bool(q.count()) + + +def get_user_id_for_username(user_name, allow_none=False): + ''' Helper function to get user id ''' + # first check if we have the user object already and get from there + try: + if c.userobj and c.userobj.name == user_name: + return c.userobj.id + except TypeError: + # c is not available + pass + user = model.User.get(user_name) + if user: + return user.id + if allow_none: + return None + raise Exception('Not logged in user') + + +CONFIG_PERMISSIONS_DEFAULTS = { + # permission and default + # these are prefixed with ckan.auth. in config to override + 'anon_create_dataset': False, + 'create_dataset_if_not_in_organization': True, + 'create_unowned_dataset': True, + 'user_create_groups': True, + 'user_create_organizations': True, + 'user_delete_groups': True, + 'user_delete_organizations': True, + 'create_user_via_api': False, + 'create_user_via_web': True, + 'roles_that_cascade_to_sub_groups': 'admin', +} + + +def check_config_permission(permission): + '''Returns the configuration value for the provided permission + + Permission is a string indentifying the auth permission (eg + `anon_create_dataset`), optionally prefixed with `ckan.auth.`. + + The possible values for `permission` are the keys of + CONFIG_PERMISSIONS_DEFAULTS. These can be overriden in the config file + by prefixing them with `ckan.auth.`. + + Returns the permission value, generally True or False, except on + `roles_that_cascade_to_sub_groups` which is a list of strings. + + ''' + + key = permission.replace('ckan.auth.', '') + + if key not in CONFIG_PERMISSIONS_DEFAULTS: + return False + + default_value = CONFIG_PERMISSIONS_DEFAULTS.get(key) + + config_key = 'ckan.auth.' + key + + value = config.get(config_key, default_value) + + if key == 'roles_that_cascade_to_sub_groups': + # This permission is set as a list of strings (space separated) + value = value.split() if value else [] + else: + value = asbool(value) + + return value + + +@maintain.deprecated('Use auth_is_loggedin_user instead') +def auth_is_registered_user(): + ''' + This function is deprecated, please use the auth_is_loggedin_user instead + ''' + return auth_is_loggedin_user() + +def auth_is_loggedin_user(): + ''' Do we have a logged in user ''' + try: + context_user = c.user + except TypeError: + context_user = None + return bool(context_user) + +def auth_is_anon_user(context): + ''' Is this an anonymous user? + eg Not logged in if a web request and not user defined in context + if logic functions called directly + + See ckan/lib/base.py:232 for pylons context object logic + ''' + try: + is_anon_user = (not bool(c.user) and bool(c.author)) + except TypeError: + # No c object set, this is not a call done via the web interface, + # but directly, eg from an extension + context_user = context.get('user') + is_anon_user = not bool(context_user) + + return is_anon_user diff --git a/ckan/config/environment.py b/ckan/config/environment.py index 1148726070e..54eaf815fca 100644 --- a/ckan/config/environment.py +++ b/ckan/config/environment.py @@ -20,7 +20,7 @@ import ckan.lib.render as render import ckan.lib.search as search import ckan.logic as logic -import ckan.new_authz as new_authz +import ckan.authz as authz import ckan.lib.jinja_extensions as jinja_extensions from ckan.common import _, ungettext @@ -367,7 +367,7 @@ def template_loaded(template): # clear other caches logic.clear_actions_cache() logic.clear_validators_cache() - new_authz.clear_auth_functions_cache() + authz.clear_auth_functions_cache() # Here we create the site user if they are not already in the database try: diff --git a/ckan/controllers/admin.py b/ckan/controllers/admin.py index 56b0face2b3..3ff7eca70ad 100644 --- a/ckan/controllers/admin.py +++ b/ckan/controllers/admin.py @@ -5,7 +5,6 @@ import ckan.lib.app_globals as app_globals import ckan.model as model import ckan.logic as logic -import ckan.new_authz c = base.c request = base.request diff --git a/ckan/controllers/group.py b/ckan/controllers/group.py index 0f5bc28aad5..ef37375cf6d 100644 --- a/ckan/controllers/group.py +++ b/ckan/controllers/group.py @@ -15,7 +15,7 @@ import ckan.logic as logic import ckan.lib.search as search import ckan.model as model -import ckan.new_authz as new_authz +import ckan.authz as authz import ckan.lib.plugins import ckan.plugins as plugins from ckan.common import OrderedDict, c, g, request, _ @@ -215,7 +215,7 @@ def _read(self, id, limit): # c.group_admins is used by CKAN's legacy (Genshi) templates only, # if we drop support for those then we can delete this line. - c.group_admins = new_authz.get_group_or_org_admin_ids(c.group.id) + c.group_admins = authz.get_group_or_org_admin_ids(c.group.id) page = self._get_page_number(request.params) @@ -675,7 +675,7 @@ def member_new(self, id): user = request.params.get('user') if user: c.user_dict = get_action('user_show')(context, {'id': user}) - c.user_role = new_authz.users_role_for_group_or_org(id, user) or 'member' + c.user_role = authz.users_role_for_group_or_org(id, user) or 'member' else: c.user_role = 'member' except NotAuthorized: @@ -861,7 +861,7 @@ def followers(self, id): def admins(self, id): c.group_dict = self._get_group_dict(id) - c.admins = new_authz.get_group_or_org_admin_ids(id) + c.admins = authz.get_group_or_org_admin_ids(id) return render(self._admins_template(c.group_dict['type'])) def about(self, id): diff --git a/ckan/controllers/user.py b/ckan/controllers/user.py index 1e7692bb027..2c81f6a1bea 100644 --- a/ckan/controllers/user.py +++ b/ckan/controllers/user.py @@ -6,7 +6,7 @@ import ckan.lib.base as base import ckan.model as model import ckan.lib.helpers as h -import ckan.new_authz as new_authz +import ckan.authz as authz import ckan.logic as logic import ckan.logic.schema as schema import ckan.lib.captcha as captcha @@ -63,7 +63,7 @@ def _db_to_edit_form_schema(self): into a format suitable for the form (optional)''' def _setup_template_variables(self, context, data_dict): - c.is_sysadmin = new_authz.is_sysadmin(c.user) + c.is_sysadmin = authz.is_sysadmin(c.user) try: user_dict = get_action('user_show')(context, data_dict) except NotFound: @@ -177,7 +177,7 @@ def new(self, data=None, errors=None, error_summary=None): error_summary = error_summary or {} vars = {'data': data, 'errors': errors, 'error_summary': error_summary} - c.is_sysadmin = new_authz.is_sysadmin(c.user) + c.is_sysadmin = authz.is_sysadmin(c.user) c.form = render(self.new_user_form, extra_vars=vars) return render('user/new.html') @@ -297,7 +297,7 @@ def edit(self, id=None, data=None, errors=None, error_summary=None): user_obj = context.get('user_obj') - if not (new_authz.is_sysadmin(c.user) + if not (authz.is_sysadmin(c.user) or c.user == user_obj.name): abort(401, _('User %s not authorized to edit %s') % (str(c.user), id)) diff --git a/ckan/lib/create_test_data.py b/ckan/lib/create_test_data.py index bdb056dc1e8..42b5a937e92 100644 --- a/ckan/lib/create_test_data.py +++ b/ckan/lib/create_test_data.py @@ -529,7 +529,7 @@ def create(cls, auth_profile="", package_type=None): model.setup_default_user_roles(david, [russianfan]) model.setup_default_user_roles(roger, [russianfan]) - # in new_authz you can't give a visitor permissions to a + # in authz you can't give a visitor permissions to a # group it seems, so this is a bit meaningless model.add_user_to_role(visitor, model.Role.ADMIN, roger) model.repo.commit_and_remove() diff --git a/ckan/lib/dictization/model_dictize.py b/ckan/lib/dictization/model_dictize.py index 26e7149f2aa..3bf56ab6d31 100644 --- a/ckan/lib/dictization/model_dictize.py +++ b/ckan/lib/dictization/model_dictize.py @@ -19,7 +19,7 @@ import ckan.plugins as plugins import ckan.lib.helpers as h import ckan.lib.dictization as d -import ckan.new_authz as new_authz +import ckan.authz as authz import ckan.lib.search as search import ckan.lib.munge as munge @@ -398,7 +398,7 @@ def get_packages_for_this_group(group_, just_the_count=False): # Allow members of organizations to see private datasets. if group_.is_organization: is_group_member = (context.get('user') and - new_authz.has_user_permission_for_group_or_org( + authz.has_user_permission_for_group_or_org( group_.id, context.get('user'), 'read')) if is_group_member: context['ignore_capacity_check'] = True @@ -594,7 +594,7 @@ def user_dictize(user, context): result_dict['email'] = email ## this should not really really be needed but tests need it - if new_authz.is_sysadmin(requester): + if authz.is_sysadmin(requester): result_dict['apikey'] = apikey result_dict['email'] = email diff --git a/ckan/lib/dictization/model_save.py b/ckan/lib/dictization/model_save.py index 8e28d4ec6be..ad0c1e8e454 100644 --- a/ckan/lib/dictization/model_save.py +++ b/ckan/lib/dictization/model_save.py @@ -6,7 +6,7 @@ import ckan.lib.dictization as d import ckan.lib.helpers as h -import ckan.new_authz as new_authz +import ckan.authz as authz log = logging.getLogger(__name__) @@ -228,7 +228,7 @@ def package_membership_list_save(group_dicts, package, context): member_obj = group_member[group] if member_obj and member_obj.state == 'deleted': continue - if new_authz.has_user_permission_for_group_or_org( + if authz.has_user_permission_for_group_or_org( member_obj.group_id, user, 'read'): member_obj.capacity = capacity member_obj.state = 'deleted' @@ -239,7 +239,7 @@ def package_membership_list_save(group_dicts, package, context): member_obj = group_member.get(group) if member_obj and member_obj.state == 'active': continue - if new_authz.has_user_permission_for_group_or_org( + if authz.has_user_permission_for_group_or_org( group.id, user, 'read'): member_obj = group_member.get(group) if member_obj: diff --git a/ckan/lib/helpers.py b/ckan/lib/helpers.py index 25205665eb6..1d5d07e7da1 100644 --- a/ckan/lib/helpers.py +++ b/ckan/lib/helpers.py @@ -41,7 +41,7 @@ import ckan.lib.datapreview as datapreview import ckan.logic as logic import ckan.lib.uploader as uploader -import ckan.new_authz as new_authz +import ckan.authz as authz from ckan.common import ( _, ungettext, g, c, request, session, json, OrderedDict @@ -1984,7 +1984,7 @@ def unified_resource_format(format): return format_new def check_config_permission(permission): - return new_authz.check_config_permission(permission) + return authz.check_config_permission(permission) def get_organization(org=None, include_datasets=False): diff --git a/ckan/lib/plugins.py b/ckan/lib/plugins.py index b6ba6e2ec8f..1aebd98fdeb 100644 --- a/ckan/lib/plugins.py +++ b/ckan/lib/plugins.py @@ -6,7 +6,7 @@ from ckan import logic import logic.schema from ckan import plugins -import ckan.new_authz +import ckan.authz import ckan.plugins.toolkit as toolkit log = logging.getLogger(__name__) @@ -233,7 +233,7 @@ def setup_template_variables(self, context, data_dict): # CS: bad_spelling ignore 2 lines c.licences = c.licenses maintain.deprecate_context_item('licences', 'Use `c.licenses` instead') - c.is_sysadmin = ckan.new_authz.is_sysadmin(c.user) + c.is_sysadmin = ckan.authz.is_sysadmin(c.user) if c.pkg: c.related_count = c.pkg.related_count @@ -435,7 +435,7 @@ def check_data_dict(self, data_dict): pass def setup_template_variables(self, context, data_dict): - c.is_sysadmin = ckan.new_authz.is_sysadmin(c.user) + c.is_sysadmin = ckan.authz.is_sysadmin(c.user) ## This is messy as auths take domain object not data_dict context_group = context.get('group', None) diff --git a/ckan/logic/__init__.py b/ckan/logic/__init__.py index 2e63303b754..204bb0f5dc7 100644 --- a/ckan/logic/__init__.py +++ b/ckan/logic/__init__.py @@ -6,7 +6,7 @@ import formencode.validators import ckan.model as model -import ckan.new_authz as new_authz +import ckan.authz as authz import ckan.lib.navl.dictization_functions as df import ckan.plugins as p @@ -287,8 +287,8 @@ def check_access(action, context, data_dict=None): context = _prepopulate_context(context) - logic_authorization = new_authz.is_authorized(action, context, - data_dict) + logic_authorization = authz.is_authorized(action, context, + data_dict) if not logic_authorization['success']: msg = logic_authorization.get('msg', '') raise NotAuthorized(msg) @@ -425,7 +425,7 @@ def wrapped(context=None, data_dict=None, **kw): try: audit = context['__auth_audit'][-1] if audit[0] == action_name and audit[1] == id(_action): - if action_name not in new_authz.auth_functions_list(): + if action_name not in authz.auth_functions_list(): log.debug('No auth function for %s' % action_name) elif not getattr(_action, 'auth_audit_exempt', False): raise Exception( diff --git a/ckan/logic/action/get.py b/ckan/logic/action/get.py index 45da3e62eb1..6bb193de1d6 100644 --- a/ckan/logic/action/get.py +++ b/ckan/logic/action/get.py @@ -23,7 +23,7 @@ import ckan.lib.plugins as lib_plugins import ckan.lib.activity_streams as activity_streams import ckan.lib.datapreview as datapreview -import ckan.new_authz as new_authz +import ckan.authz as authz from ckan.common import _ @@ -178,7 +178,7 @@ def current_package_list_with_resources(context, data_dict): _check_access('current_package_list_with_resources', context, data_dict) - is_sysadmin = new_authz.is_sysadmin(user) + is_sysadmin = authz.is_sysadmin(user) q = '+capacity:public' if not is_sysadmin else '*:*' context['ignore_capacity_check'] = True search = package_search(context, {'q': q, 'rows': limit, 'start': offset}) @@ -349,7 +349,7 @@ def member_list(context, data_dict=None): if capacity: q = q.filter(model.Member.capacity == capacity) - trans = new_authz.roles_trans() + trans = authz.roles_trans() def translated_capacity(capacity): try: @@ -537,11 +537,11 @@ def group_list_authz(context, data_dict): _check_access('group_list_authz', context, data_dict) - sysadmin = new_authz.is_sysadmin(user) - roles = ckan.new_authz.get_roles_with_permission('manage_group') + sysadmin = authz.is_sysadmin(user) + roles = authz.get_roles_with_permission('manage_group') if not roles: return [] - user_id = new_authz.get_user_id_for_username(user, allow_none=True) + user_id = authz.get_user_id_for_username(user, allow_none=True) if not user_id: return [] @@ -614,7 +614,7 @@ def organization_list_for_user(context, data_dict): user = context['user'] _check_access('organization_list_for_user', context, data_dict) - sysadmin = new_authz.is_sysadmin(user) + sysadmin = authz.is_sysadmin(user) orgs_q = model.Session.query(model.Group) \ .filter(model.Group.is_organization == True) \ @@ -625,11 +625,11 @@ def organization_list_for_user(context, data_dict): permission = data_dict.get('permission', 'edit_group') - roles = ckan.new_authz.get_roles_with_permission(permission) + roles = authz.get_roles_with_permission(permission) if not roles: return [] - user_id = new_authz.get_user_id_for_username(user, allow_none=True) + user_id = authz.get_user_id_for_username(user, allow_none=True) if not user_id: return [] @@ -1339,7 +1339,7 @@ def user_show(context, data_dict): if requester: requester_looking_at_own_account = requester == user_obj.name include_private_and_draft_datasets = \ - new_authz.is_sysadmin(requester) or \ + authz.is_sysadmin(requester) or \ requester_looking_at_own_account else: include_private_and_draft_datasets = False @@ -3301,7 +3301,7 @@ def member_roles_list(context, data_dict): ''' group_type = data_dict.get('group_type', 'organization') - roles_list = new_authz.roles_list() + roles_list = authz.roles_list() if group_type == 'group': roles_list = [role for role in roles_list if role['value'] != 'editor'] diff --git a/ckan/logic/auth/create.py b/ckan/logic/auth/create.py index 25ca975b1d8..f814ac6a4cb 100644 --- a/ckan/logic/auth/create.py +++ b/ckan/logic/auth/create.py @@ -1,5 +1,5 @@ import ckan.logic as logic -import ckan.new_authz as new_authz +import ckan.authz as authz import ckan.logic.auth as logic_auth from ckan.common import _ @@ -8,17 +8,17 @@ def package_create(context, data_dict=None): user = context['user'] - if new_authz.auth_is_anon_user(context): - check1 = all(new_authz.check_config_permission(p) for p in ( + if authz.auth_is_anon_user(context): + check1 = all(authz.check_config_permission(p) for p in ( 'anon_create_dataset', 'create_dataset_if_not_in_organization', 'create_unowned_dataset', )) else: - check1 = all(new_authz.check_config_permission(p) for p in ( + check1 = all(authz.check_config_permission(p) for p in ( 'create_dataset_if_not_in_organization', 'create_unowned_dataset', - )) or new_authz.has_user_permission_for_some_org( + )) or authz.has_user_permission_for_some_org( user, 'create_dataset') if not check1: @@ -31,7 +31,7 @@ def package_create(context, data_dict=None): # If an organization is given are we able to add a dataset to it? data_dict = data_dict or {} org_id = data_dict.get('owner_org') - if org_id and not new_authz.has_user_permission_for_group_or_org( + if org_id and not authz.has_user_permission_for_group_or_org( org_id, user, 'create_dataset'): return {'success': False, 'msg': _('User %s not authorized to add dataset to this organization') % user} return {'success': True} @@ -39,7 +39,7 @@ def package_create(context, data_dict=None): def file_upload(context, data_dict=None): user = context['user'] - if new_authz.auth_is_anon_user(context): + if authz.auth_is_anon_user(context): return {'success': False, 'msg': _('User %s not authorized to create packages') % user} return {'success': True} @@ -85,7 +85,7 @@ def resource_create(context, data_dict): ) pkg_dict = {'id': pkg.id} - authorized = new_authz.is_authorized('package_update', context, pkg_dict).get('success') + authorized = authz.is_authorized('package_update', context, pkg_dict).get('success') if not authorized: return {'success': False, @@ -104,8 +104,8 @@ def resource_create_default_resource_views(context, data_dict): def package_create_default_resource_views(context, data_dict): - return new_authz.is_authorized('package_update', context, - data_dict['package']) + return authz.is_authorized('package_update', context, + data_dict['package']) def package_relationship_create(context, data_dict): @@ -115,9 +115,9 @@ def package_relationship_create(context, data_dict): id2 = data_dict['object'] # If we can update each package we can see the relationships - authorized1 = new_authz.is_authorized_boolean( + authorized1 = authz.is_authorized_boolean( 'package_update', context, {'id': id}) - authorized2 = new_authz.is_authorized_boolean( + authorized2 = authz.is_authorized_boolean( 'package_update', context, {'id': id2}) if not authorized1 and authorized2: @@ -127,9 +127,9 @@ def package_relationship_create(context, data_dict): def group_create(context, data_dict=None): user = context['user'] - user = new_authz.get_user_id_for_username(user, allow_none=True) + user = authz.get_user_id_for_username(user, allow_none=True) - if user and new_authz.check_config_permission('user_create_groups'): + if user and authz.check_config_permission('user_create_groups'): return {'success': True} return {'success': False, 'msg': _('User %s not authorized to create groups') % user} @@ -137,9 +137,9 @@ def group_create(context, data_dict=None): def organization_create(context, data_dict=None): user = context['user'] - user = new_authz.get_user_id_for_username(user, allow_none=True) + user = authz.get_user_id_for_username(user, allow_none=True) - if user and new_authz.check_config_permission('user_create_organizations'): + if user and authz.check_config_permission('user_create_organizations'): return {'success': True} return {'success': False, 'msg': _('User %s not authorized to create organizations') % user} @@ -152,9 +152,9 @@ def rating_create(context, data_dict): @logic.auth_allow_anonymous_access def user_create(context, data_dict=None): using_api = 'api_version' in context - create_user_via_api = new_authz.check_config_permission( + create_user_via_api = authz.check_config_permission( 'create_user_via_api') - create_user_via_web = new_authz.check_config_permission( + create_user_via_web = authz.check_config_permission( 'create_user_via_web') if using_api and not create_user_via_api: @@ -213,7 +213,7 @@ def _check_group_auth(context, data_dict): groups = groups - set(pkg_groups) for group in groups: - if not new_authz.has_user_permission_for_group_or_org(group.id, user, 'update'): + if not authz.has_user_permission_for_group_or_org(group.id, user, 'update'): return False return True @@ -251,7 +251,7 @@ def tag_create(context, data_dict): def _group_or_org_member_create(context, data_dict): user = context['user'] group_id = data_dict['id'] - if not new_authz.has_user_permission_for_group_or_org(group_id, user, 'membership'): + if not authz.has_user_permission_for_group_or_org(group_id, user, 'membership'): return {'success': False, 'msg': _('User %s not authorized to add members') % user} return {'success': True} @@ -271,7 +271,7 @@ def member_create(context, data_dict): if not group.is_organization and data_dict.get('object_type') == 'package': permission = 'manage_group' - authorized = new_authz.has_user_permission_for_group_or_org(group.id, + authorized = authz.has_user_permission_for_group_or_org(group.id, user, permission) if not authorized: diff --git a/ckan/logic/auth/delete.py b/ckan/logic/auth/delete.py index dd72bba911c..1c94918c394 100644 --- a/ckan/logic/auth/delete.py +++ b/ckan/logic/auth/delete.py @@ -1,5 +1,5 @@ import ckan.logic as logic -import ckan.new_authz as new_authz +import ckan.authz as authz from ckan.logic.auth import get_group_object, get_related_object from ckan.logic.auth import get_resource_object import ckan.logic.auth.create as _auth_create @@ -86,7 +86,7 @@ def package_relationship_delete(context, data_dict): relationship = context['relationship'] # If you can create this relationship the you can also delete it - authorized = new_authz.is_authorized_boolean('package_relationship_create', context, data_dict) + authorized = authz.is_authorized_boolean('package_relationship_create', context, data_dict) if not authorized: return {'success': False, 'msg': _('User %s not authorized to delete relationship %s') % (user ,relationship.id)} else: @@ -95,10 +95,10 @@ def package_relationship_delete(context, data_dict): def group_delete(context, data_dict): group = get_group_object(context, data_dict) user = context['user'] - if not new_authz.check_config_permission('user_delete_groups'): + if not authz.check_config_permission('user_delete_groups'): return {'success': False, 'msg': _('User %s not authorized to delete groups') % user} - authorized = new_authz.has_user_permission_for_group_or_org( + authorized = authz.has_user_permission_for_group_or_org( group.id, user, 'delete') if not authorized: return {'success': False, 'msg': _('User %s not authorized to delete group %s') % (user ,group.id)} @@ -116,10 +116,10 @@ def organization_purge(context, data_dict): def organization_delete(context, data_dict): group = get_group_object(context, data_dict) user = context['user'] - if not new_authz.check_config_permission('user_delete_organizations'): + if not authz.check_config_permission('user_delete_organizations'): return {'success': False, 'msg': _('User %s not authorized to delete organizations') % user} - authorized = new_authz.has_user_permission_for_group_or_org( + authorized = authz.has_user_permission_for_group_or_org( group.id, user, 'delete') if not authorized: return {'success': False, 'msg': _('User %s not authorized to delete organization %s') % (user ,group.id)} diff --git a/ckan/logic/auth/get.py b/ckan/logic/auth/get.py index dad9261ee15..4018c683180 100644 --- a/ckan/logic/auth/get.py +++ b/ckan/logic/auth/get.py @@ -1,5 +1,5 @@ import ckan.logic as logic -import ckan.new_authz as new_authz +import ckan.authz as authz from ckan.lib.base import _ from ckan.logic.auth import (get_package_object, get_group_object, get_resource_object, get_related_object) @@ -85,10 +85,10 @@ def package_relationships_list(context, data_dict): id2 = data_dict.get('id2') # If we can see each package we can see the relationships - authorized1 = new_authz.is_authorized_boolean( + authorized1 = authz.is_authorized_boolean( 'package_show', context, {'id': id}) if id2: - authorized2 = new_authz.is_authorized_boolean( + authorized2 = authz.is_authorized_boolean( 'package_show', context, {'id': id2}) else: authorized2 = True @@ -104,7 +104,7 @@ def package_show(context, data_dict): # draft state indicates package is still in the creation process # so we need to check we have creation rights. if package.state.startswith('draft'): - auth = new_authz.is_authorized('package_update', + auth = authz.is_authorized('package_update', context, data_dict) authorized = auth.get('success') elif package.owner_org is None and package.state == 'active': @@ -113,7 +113,7 @@ def package_show(context, data_dict): # anyone can see a public package if not package.private and package.state == 'active': return {'success': True} - authorized = new_authz.has_user_permission_for_group_or_org( + authorized = authz.has_user_permission_for_group_or_org( package.owner_org, user, 'read') if not authorized: return {'success': False, 'msg': _('User %s not authorized to read package %s') % (user, package.id)} @@ -135,7 +135,7 @@ def resource_show(context, data_dict): raise logic.NotFound(_('No package found for this resource, cannot check auth.')) pkg_dict = {'id': pkg.id} - authorized = new_authz.is_authorized('package_show', context, pkg_dict).get('success') + authorized = authz.is_authorized('package_show', context, pkg_dict).get('success') if not authorized: return {'success': False, 'msg': _('User %s not authorized to read resource %s') % (user, resource.id)} @@ -158,7 +158,7 @@ def group_show(context, data_dict): group = get_group_object(context, data_dict) if group.state == 'active': return {'success': True} - authorized = new_authz.has_user_permission_for_group_or_org( + authorized = authz.has_user_permission_for_group_or_org( group.id, user, 'read') if authorized: return {'success': True} @@ -238,7 +238,7 @@ def dashboard_new_activities_count(context, data_dict): # FIXME: This should go through check_access() not call is_authorized() # directly, but wait until 2939-orgs is merged before fixing this. # This is so a better not authourized message can be sent. - return new_authz.is_authorized('dashboard_activity_list', + return authz.is_authorized('dashboard_activity_list', context, data_dict) diff --git a/ckan/logic/auth/update.py b/ckan/logic/auth/update.py index de2f45a6890..383ca8abaef 100644 --- a/ckan/logic/auth/update.py +++ b/ckan/logic/auth/update.py @@ -1,5 +1,5 @@ import ckan.logic as logic -import ckan.new_authz as new_authz +import ckan.authz as authz import ckan.logic.auth as logic_auth from ckan.common import _ @@ -15,22 +15,22 @@ def package_update(context, data_dict): if package.owner_org: # if there is an owner org then we must have update_dataset # permission for that organization - check1 = new_authz.has_user_permission_for_group_or_org( + check1 = authz.has_user_permission_for_group_or_org( package.owner_org, user, 'update_dataset' ) else: # If dataset is not owned then we can edit if config permissions allow - if new_authz.auth_is_anon_user(context): - check1 = all(new_authz.check_config_permission(p) for p in ( + if authz.auth_is_anon_user(context): + check1 = all(authz.check_config_permission(p) for p in ( 'anon_create_dataset', 'create_dataset_if_not_in_organization', 'create_unowned_dataset', )) else: - check1 = all(new_authz.check_config_permission(p) for p in ( + check1 = all(authz.check_config_permission(p) for p in ( 'create_dataset_if_not_in_organization', 'create_unowned_dataset', - )) or new_authz.has_user_permission_for_some_org( + )) or authz.has_user_permission_for_some_org( user, 'create_dataset') if not check1: return {'success': False, @@ -62,7 +62,7 @@ def resource_update(context, data_dict): ) pkg_dict = {'id': pkg.id} - authorized = new_authz.is_authorized('package_update', context, pkg_dict).get('success') + authorized = authz.is_authorized('package_update', context, pkg_dict).get('success') if not authorized: return {'success': False, @@ -79,7 +79,7 @@ def resource_view_reorder(context, data_dict): return resource_update(context, {'id': data_dict['resource_id']}) def package_relationship_update(context, data_dict): - return new_authz.is_authorized('package_relationship_create', + return authz.is_authorized('package_relationship_create', context, data_dict) @@ -89,7 +89,7 @@ def package_change_state(context, data_dict): package = logic_auth.get_package_object(context, data_dict) # use the logic for package_update - authorized = new_authz.is_authorized_boolean('package_update', + authorized = authz.is_authorized_boolean('package_update', context, data_dict) if not authorized: @@ -105,7 +105,7 @@ def package_change_state(context, data_dict): def group_update(context, data_dict): group = logic_auth.get_group_object(context, data_dict) user = context['user'] - authorized = new_authz.has_user_permission_for_group_or_org(group.id, + authorized = authz.has_user_permission_for_group_or_org(group.id, user, 'update') if not authorized: @@ -119,7 +119,7 @@ def group_update(context, data_dict): def organization_update(context, data_dict): group = logic_auth.get_group_object(context, data_dict) user = context['user'] - authorized = new_authz.has_user_permission_for_group_or_org( + authorized = authz.has_user_permission_for_group_or_org( group.id, user, 'update') if not authorized: return {'success': False, @@ -156,7 +156,7 @@ def group_change_state(context, data_dict): group = logic_auth.get_group_object(context, data_dict) # use logic for group_update - authorized = new_authz.is_authorized_boolean('group_update', + authorized = authz.is_authorized_boolean('group_update', context, data_dict) if not authorized: @@ -173,7 +173,7 @@ def group_edit_permissions(context, data_dict): user = context['user'] group = logic_auth.get_group_object(context, data_dict) - authorized = new_authz.has_user_permission_for_group_or_org(group.id, + authorized = authz.has_user_permission_for_group_or_org(group.id, user, 'update') @@ -261,7 +261,7 @@ def term_translation_update(context, data_dict): def dashboard_mark_activities_old(context, data_dict): - return new_authz.is_authorized('dashboard_activity_list', + return authz.is_authorized('dashboard_activity_list', context, data_dict) @@ -280,7 +280,7 @@ def package_update_rest(context, data_dict): return {'success': False, 'msg': _('Valid API key needed to edit a package')} - return new_authz.is_authorized('package_update', context, data_dict) + return authz.is_authorized('package_update', context, data_dict) def group_update_rest(context, data_dict): @@ -301,7 +301,7 @@ def package_owner_org_update(context, data_dict): def bulk_update_private(context, data_dict): org_id = data_dict.get('org_id') user = context['user'] - authorized = new_authz.has_user_permission_for_group_or_org( + authorized = authz.has_user_permission_for_group_or_org( org_id, user, 'update') if not authorized: return {'success': False} @@ -311,7 +311,7 @@ def bulk_update_private(context, data_dict): def bulk_update_public(context, data_dict): org_id = data_dict.get('org_id') user = context['user'] - authorized = new_authz.has_user_permission_for_group_or_org( + authorized = authz.has_user_permission_for_group_or_org( org_id, user, 'update') if not authorized: return {'success': False} @@ -321,7 +321,7 @@ def bulk_update_public(context, data_dict): def bulk_update_delete(context, data_dict): org_id = data_dict.get('org_id') user = context['user'] - authorized = new_authz.has_user_permission_for_group_or_org( + authorized = authz.has_user_permission_for_group_or_org( org_id, user, 'update') if not authorized: return {'success': False} diff --git a/ckan/logic/validators.py b/ckan/logic/validators.py index 178beefc876..49134ea0b27 100644 --- a/ckan/logic/validators.py +++ b/ckan/logic/validators.py @@ -12,7 +12,7 @@ PACKAGE_VERSION_MAX_LENGTH, VOCABULARY_NAME_MAX_LENGTH, VOCABULARY_NAME_MIN_LENGTH) -import ckan.new_authz as new_authz +import ckan.authz as authz from ckan.common import _ @@ -26,7 +26,7 @@ def owner_org_validator(key, data, errors, context): value = data.get(key) if value is missing or value is None: - if not new_authz.check_config_permission('create_unowned_dataset'): + if not authz.check_config_permission('create_unowned_dataset'): raise Invalid(_('A organization must be supplied')) data.pop(key, None) raise df.StopOnError @@ -35,7 +35,7 @@ def owner_org_validator(key, data, errors, context): user = context['user'] user = model.User.get(user) if value == '': - if not new_authz.check_config_permission('create_unowned_dataset'): + if not authz.check_config_permission('create_unowned_dataset'): raise Invalid(_('A organization must be supplied')) return @@ -44,7 +44,7 @@ def owner_org_validator(key, data, errors, context): raise Invalid(_('Organization does not exist')) group_id = group.id if not(user.sysadmin or - new_authz.has_user_permission_for_group_or_org( + authz.has_user_permission_for_group_or_org( group_id, user.name, 'create_dataset')): raise Invalid(_('You cannot add a dataset to this organization')) data[key] = group_id @@ -484,7 +484,7 @@ def ignore_not_package_admin(key, data, errors, context): if 'ignore_auth' in context: return - if user and new_authz.is_sysadmin(user): + if user and authz.is_sysadmin(user): return authorized = False @@ -512,7 +512,7 @@ def ignore_not_sysadmin(key, data, errors, context): user = context.get('user') ignore_auth = context.get('ignore_auth') - if ignore_auth or (user and new_authz.is_sysadmin(user)): + if ignore_auth or (user and authz.is_sysadmin(user)): return data.pop(key) @@ -524,7 +524,7 @@ def ignore_not_group_admin(key, data, errors, context): model = context['model'] user = context.get('user') - if user and new_authz.is_sysadmin(user): + if user and authz.is_sysadmin(user): return authorized = False @@ -717,7 +717,7 @@ def user_name_exists(user_name, context): def role_exists(role, context): - if role not in new_authz.ROLE_PERMISSIONS: + if role not in authz.ROLE_PERMISSIONS: raise Invalid(_('role does not exist.')) return role @@ -836,7 +836,7 @@ def empty_if_not_sysadmin(key, data, errors, context): user = context.get('user') ignore_auth = context.get('ignore_auth') - if ignore_auth or (user and new_authz.is_sysadmin(user)): + if ignore_auth or (user and authz.is_sysadmin(user)): return empty(key, data, errors, context) diff --git a/ckan/model/user.py b/ckan/model/user.py index 17c43ed2f39..2d2a37f4348 100644 --- a/ckan/model/user.py +++ b/ckan/model/user.py @@ -273,8 +273,8 @@ def search(cls, querystr, sqlalchemy_query=None, user_name=None): cls.openid.ilike(qstr), ] # sysadmins can search on user emails - import ckan.new_authz as new_authz - if user_name and new_authz.is_sysadmin(user_name): + import ckan.authz as authz + if user_name and authz.is_sysadmin(user_name): filters.append(cls.email.ilike(qstr)) query = query.filter(or_(*filters)) diff --git a/ckan/new_authz.py b/ckan/new_authz.py index 550a88ac429..7ea3f0f1194 100644 --- a/ckan/new_authz.py +++ b/ckan/new_authz.py @@ -1,442 +1,8 @@ -import sys -import re -from logging import getLogger +import warnings -from pylons import config -from paste.deploy.converters import asbool +warnings.warn( + "ckan.new_authz has been renamed to ckan.authz. " + "The ckan.new_authz module will be removed in a future release.", + FutureWarning) -import ckan.plugins as p -import ckan.model as model -from ckan.common import OrderedDict, _, c - -import ckan.lib.maintain as maintain - -log = getLogger(__name__) - - -class AuthFunctions: - ''' This is a private cache used by get_auth_function() and should never be - accessed directly we will create an instance of it and then remove it.''' - _functions = {} - - def clear(self): - ''' clear any stored auth functions. ''' - self._functions.clear() - - def keys(self): - ''' Return a list of known auth functions.''' - if not self._functions: - self._build() - return self._functions.keys() - - def get(self, function): - ''' Return the requested auth function. ''' - if not self._functions: - self._build() - return self._functions.get(function) - - def _build(self): - ''' Gather the auth functions. - - First get the default ones in the ckan/logic/auth directory Rather than - writing them out in full will use __import__ to load anything from - ckan.auth that looks like it might be an authorisation function''' - - module_root = 'ckan.logic.auth' - - for auth_module_name in ['get', 'create', 'update', 'delete', 'patch']: - module_path = '%s.%s' % (module_root, auth_module_name,) - try: - module = __import__(module_path) - except ImportError: - log.debug('No auth module for action "%s"' % auth_module_name) - continue - - for part in module_path.split('.')[1:]: - module = getattr(module, part) - - for key, v in module.__dict__.items(): - if not key.startswith('_'): - # Whitelist all auth functions defined in - # logic/auth/get.py as not requiring an authorized user, - # as well as ensuring that the rest do. In both cases, do - # nothing if a decorator has already been used to define - # the behaviour - if not hasattr(v, 'auth_allow_anonymous_access'): - if auth_module_name == 'get': - v.auth_allow_anonymous_access = True - else: - v.auth_allow_anonymous_access = False - self._functions[key] = v - - # Then overwrite them with any specific ones in the plugins: - resolved_auth_function_plugins = {} - fetched_auth_functions = {} - for plugin in p.PluginImplementations(p.IAuthFunctions): - for name, auth_function in plugin.get_auth_functions().items(): - if name in resolved_auth_function_plugins: - raise Exception( - 'The auth function %r is already implemented in %r' % ( - name, - resolved_auth_function_plugins[name] - ) - ) - log.debug('Auth function {0} from plugin {1} was inserted'.format(name, plugin.name)) - resolved_auth_function_plugins[name] = plugin.name - fetched_auth_functions[name] = auth_function - # Use the updated ones in preference to the originals. - self._functions.update(fetched_auth_functions) - -_AuthFunctions = AuthFunctions() -#remove the class -del AuthFunctions - - -def clear_auth_functions_cache(): - _AuthFunctions.clear() - - -def auth_functions_list(): - '''Returns a list of the names of the auth functions available. Currently - this is to allow the Auth Audit to know if an auth function is available - for a given action.''' - return _AuthFunctions.keys() - - -def is_sysadmin(username): - ''' Returns True is username is a sysadmin ''' - user = _get_user(username) - return user and user.sysadmin - - -def _get_user(username): - ''' Try to get the user from c, if possible, and fallback to using the DB ''' - if not username: - return None - # See if we can get the user without touching the DB - try: - if c.userobj and c.userobj.name == username: - return c.userobj - except TypeError: - # c is not available - pass - # Get user from the DB - return model.User.get(username) - - -def get_group_or_org_admin_ids(group_id): - if not group_id: - return [] - group_id = model.Group.get(group_id).id - q = model.Session.query(model.Member) \ - .filter(model.Member.group_id == group_id) \ - .filter(model.Member.table_name == 'user') \ - .filter(model.Member.state == 'active') \ - .filter(model.Member.capacity == 'admin') - return [a.table_id for a in q.all()] - - -def is_authorized_boolean(action, context, data_dict=None): - ''' runs the auth function but just returns True if allowed else False - ''' - outcome = is_authorized(action, context, data_dict=data_dict) - return outcome.get('success', False) - - -def is_authorized(action, context, data_dict=None): - if context.get('ignore_auth'): - return {'success': True} - - auth_function = _AuthFunctions.get(action) - if auth_function: - username = context.get('user') - user = _get_user(username) - - if user: - # deleted users are always unauthorized - if user.is_deleted(): - return {'success': False} - # sysadmins can do anything unless the auth_sysadmins_check - # decorator was used in which case they are treated like all other - # users. - elif user.sysadmin: - if not getattr(auth_function, 'auth_sysadmins_check', False): - return {'success': True} - - # If the auth function is flagged as not allowing anonymous access, - # and an existing user object is not provided in the context, deny - # access straight away - if not getattr(auth_function, 'auth_allow_anonymous_access', False) \ - and not context.get('auth_user_obj'): - return {'success': False, - 'msg': '{0} requires an authenticated user' - .format(auth_function) - } - - return auth_function(context, data_dict) - else: - raise ValueError(_('Authorization function not found: %s' % action)) - - -# these are the permissions that roles have -ROLE_PERMISSIONS = OrderedDict([ - ('admin', ['admin']), - ('editor', ['read', 'delete_dataset', 'create_dataset', 'update_dataset', 'manage_group']), - ('member', ['read', 'manage_group']), -]) - - -def _trans_role_admin(): - return _('Admin') - - -def _trans_role_editor(): - return _('Editor') - - -def _trans_role_member(): - return _('Member') - - -def trans_role(role): - module = sys.modules[__name__] - return getattr(module, '_trans_role_%s' % role)() - - -def roles_list(): - ''' returns list of roles for forms ''' - roles = [] - for role in ROLE_PERMISSIONS: - roles.append(dict(text=trans_role(role), value=role)) - return roles - - -def roles_trans(): - ''' return dict of roles with translation ''' - roles = {} - for role in ROLE_PERMISSIONS: - roles[role] = trans_role(role) - return roles - - -def get_roles_with_permission(permission): - ''' returns the roles with the permission requested ''' - roles = [] - for role in ROLE_PERMISSIONS: - permissions = ROLE_PERMISSIONS[role] - if permission in permissions or 'admin' in permissions: - roles.append(role) - return roles - - -def has_user_permission_for_group_or_org(group_id, user_name, permission): - ''' Check if the user has the given permissions for the group, allowing for - sysadmin rights and permission cascading down a group hierarchy. - - ''' - if not group_id: - return False - group = model.Group.get(group_id) - if not group: - return False - group_id = group.id - - # Sys admins can do anything - if is_sysadmin(user_name): - return True - - user_id = get_user_id_for_username(user_name, allow_none=True) - if not user_id: - return False - if _has_user_permission_for_groups(user_id, permission, [group_id]): - return True - # Handle when permissions cascade. Check the user's roles on groups higher - # in the group hierarchy for permission. - for capacity in check_config_permission('roles_that_cascade_to_sub_groups'): - parent_groups = group.get_parent_group_hierarchy(type=group.type) - group_ids = [group_.id for group_ in parent_groups] - if _has_user_permission_for_groups(user_id, permission, group_ids, - capacity=capacity): - return True - return False - - -def _has_user_permission_for_groups(user_id, permission, group_ids, - capacity=None): - ''' Check if the user has the given permissions for the particular - group (ignoring permissions cascading in a group hierarchy). - Can also be filtered by a particular capacity. - ''' - if not group_ids: - return False - # get any roles the user has for the group - q = model.Session.query(model.Member) \ - .filter(model.Member.group_id.in_(group_ids)) \ - .filter(model.Member.table_name == 'user') \ - .filter(model.Member.state == 'active') \ - .filter(model.Member.table_id == user_id) - if capacity: - q = q.filter(model.Member.capacity == capacity) - # see if any role has the required permission - # admin permission allows anything for the group - for row in q.all(): - perms = ROLE_PERMISSIONS.get(row.capacity, []) - if 'admin' in perms or permission in perms: - return True - return False - - -def users_role_for_group_or_org(group_id, user_name): - ''' Returns the user's role for the group. (Ignores privileges that cascade - in a group hierarchy.) - - ''' - if not group_id: - return None - group_id = model.Group.get(group_id).id - - user_id = get_user_id_for_username(user_name, allow_none=True) - if not user_id: - return None - # get any roles the user has for the group - q = model.Session.query(model.Member) \ - .filter(model.Member.group_id == group_id) \ - .filter(model.Member.table_name == 'user') \ - .filter(model.Member.state == 'active') \ - .filter(model.Member.table_id == user_id) - # return the first role we find - for row in q.all(): - return row.capacity - return None - - -def has_user_permission_for_some_org(user_name, permission): - ''' Check if the user has the given permission for any organization. ''' - user_id = get_user_id_for_username(user_name, allow_none=True) - if not user_id: - return False - roles = get_roles_with_permission(permission) - - if not roles: - return False - # get any groups the user has with the needed role - q = model.Session.query(model.Member) \ - .filter(model.Member.table_name == 'user') \ - .filter(model.Member.state == 'active') \ - .filter(model.Member.capacity.in_(roles)) \ - .filter(model.Member.table_id == user_id) - group_ids = [] - for row in q.all(): - group_ids.append(row.group_id) - # if not in any groups has no permissions - if not group_ids: - return False - - # see if any of the groups are orgs - q = model.Session.query(model.Group) \ - .filter(model.Group.is_organization == True) \ - .filter(model.Group.state == 'active') \ - .filter(model.Group.id.in_(group_ids)) - - return bool(q.count()) - - -def get_user_id_for_username(user_name, allow_none=False): - ''' Helper function to get user id ''' - # first check if we have the user object already and get from there - try: - if c.userobj and c.userobj.name == user_name: - return c.userobj.id - except TypeError: - # c is not available - pass - user = model.User.get(user_name) - if user: - return user.id - if allow_none: - return None - raise Exception('Not logged in user') - - -CONFIG_PERMISSIONS_DEFAULTS = { - # permission and default - # these are prefixed with ckan.auth. in config to override - 'anon_create_dataset': False, - 'create_dataset_if_not_in_organization': True, - 'create_unowned_dataset': True, - 'user_create_groups': True, - 'user_create_organizations': True, - 'user_delete_groups': True, - 'user_delete_organizations': True, - 'create_user_via_api': False, - 'create_user_via_web': True, - 'roles_that_cascade_to_sub_groups': 'admin', -} - - -def check_config_permission(permission): - '''Returns the configuration value for the provided permission - - Permission is a string indentifying the auth permission (eg - `anon_create_dataset`), optionally prefixed with `ckan.auth.`. - - The possible values for `permission` are the keys of - CONFIG_PERMISSIONS_DEFAULTS. These can be overriden in the config file - by prefixing them with `ckan.auth.`. - - Returns the permission value, generally True or False, except on - `roles_that_cascade_to_sub_groups` which is a list of strings. - - ''' - - key = permission.replace('ckan.auth.', '') - - if key not in CONFIG_PERMISSIONS_DEFAULTS: - return False - - default_value = CONFIG_PERMISSIONS_DEFAULTS.get(key) - - config_key = 'ckan.auth.' + key - - value = config.get(config_key, default_value) - - if key == 'roles_that_cascade_to_sub_groups': - # This permission is set as a list of strings (space separated) - value = value.split() if value else [] - else: - value = asbool(value) - - return value - - -@maintain.deprecated('Use auth_is_loggedin_user instead') -def auth_is_registered_user(): - ''' - This function is deprecated, please use the auth_is_loggedin_user instead - ''' - return auth_is_loggedin_user() - -def auth_is_loggedin_user(): - ''' Do we have a logged in user ''' - try: - context_user = c.user - except TypeError: - context_user = None - return bool(context_user) - -def auth_is_anon_user(context): - ''' Is this an anonymous user? - eg Not logged in if a web request and not user defined in context - if logic functions called directly - - See ckan/lib/base.py:232 for pylons context object logic - ''' - try: - is_anon_user = (not bool(c.user) and bool(c.author)) - except TypeError: - # No c object set, this is not a call done via the web interface, - # but directly, eg from an extension - context_user = context.get('user') - is_anon_user = not bool(context_user) - - return is_anon_user +from ckan.authz import * diff --git a/ckan/new_tests/helpers.py b/ckan/new_tests/helpers.py index 7cb632aa120..e9cb98f6bfe 100644 --- a/ckan/new_tests/helpers.py +++ b/ckan/new_tests/helpers.py @@ -25,7 +25,7 @@ import ckan.config.middleware import ckan.model as model import ckan.logic as logic -import ckan.new_authz as new_authz +import ckan.authz as authz try: diff --git a/ckan/new_tests/test_authz.py b/ckan/new_tests/test_authz.py index e963821afbc..0c57ca398a6 100644 --- a/ckan/new_tests/test_authz.py +++ b/ckan/new_tests/test_authz.py @@ -1,6 +1,6 @@ import nose -from ckan import new_authz as auth +from ckan import authz as auth from ckan.new_tests import helpers diff --git a/ckan/tests/legacy/functional/api/test_user.py b/ckan/tests/legacy/functional/api/test_user.py index dc14caa51b6..48d2b6f53b0 100644 --- a/ckan/tests/legacy/functional/api/test_user.py +++ b/ckan/tests/legacy/functional/api/test_user.py @@ -3,7 +3,7 @@ from nose.tools import assert_equal import ckan.logic as logic -import ckan.new_authz as new_authz +import ckan.authz as authz from ckan import model from ckan.lib.create_test_data import CreateTestData from ckan.tests.legacy import TestController as ControllerTestCase diff --git a/ckan/tests/legacy/logic/test_auth.py b/ckan/tests/legacy/logic/test_auth.py index 1741630a8f8..1b70c62d93e 100644 --- a/ckan/tests/legacy/logic/test_auth.py +++ b/ckan/tests/legacy/logic/test_auth.py @@ -4,7 +4,7 @@ import ckan.tests.legacy as tests from ckan.logic import get_action import ckan.model as model -import ckan.new_authz as new_authz +import ckan.authz as authz from ckan.lib.create_test_data import CreateTestData import json @@ -75,8 +75,8 @@ def test_only_sysadmins_can_delete_users(self): def test_auth_deleted_users_are_always_unauthorized(self): always_success = lambda x,y: {'success': True} - new_authz._AuthFunctions._build() - new_authz._AuthFunctions._functions['always_success'] = always_success + authz._AuthFunctions._build() + authz._AuthFunctions._functions['always_success'] = always_success # We can't reuse the username with the other tests because we can't # rebuild_db(), because in the setup_class we get the sysadmin. If we # rebuild the DB, we would delete the sysadmin as well. @@ -84,8 +84,8 @@ def test_auth_deleted_users_are_always_unauthorized(self): self.create_user(username) user = model.User.get(username) user.delete() - assert not new_authz.is_authorized_boolean('always_success', {'user': username}) - del new_authz._AuthFunctions._functions['always_success'] + assert not authz.is_authorized_boolean('always_success', {'user': username}) + del authz._AuthFunctions._functions['always_success'] class TestAuthOrgs(TestAuth): diff --git a/ckan/tests/legacy/test_coding_standards.py b/ckan/tests/legacy/test_coding_standards.py index b5dccc1bb49..1dc7fdbe78c 100644 --- a/ckan/tests/legacy/test_coding_standards.py +++ b/ckan/tests/legacy/test_coding_standards.py @@ -290,6 +290,7 @@ class TestImportStar(object): 'ckan/migration/versions/063_org_changes.py', 'ckan/migration/versions/064_add_email_last_sent_column.py', 'ckan/migration/versions/065_add_email_notifications_preference.py', + 'ckan/new_authz.py', 'ckan/plugins/__init__.py', 'ckan/tests/__init__.py', 'ckan/tests/functional/__init__.py', @@ -547,7 +548,7 @@ class TestPep8(object): 'ckan/model/types.py', 'ckan/model/user.py', 'ckan/model/vocabulary.py', - 'ckan/new_authz.py', + 'ckan/authz.py', 'ckan/pastertemplates/__init__.py', 'ckan/plugins/interfaces.py', 'ckan/plugins/toolkit.py', @@ -915,7 +916,7 @@ class TestBadExceptions(object): 'ckan/logic/auth/create.py', 'ckan/logic/auth/delete.py', 'ckan/logic/auth/get.py', - 'ckan/new_authz.py', + 'ckan/authz.py', 'ckanext/datastore/logic/action.py', ] fails = {} diff --git a/ckan/tests/legacy/test_plugins.py b/ckan/tests/legacy/test_plugins.py index 319dfff2154..401e6eb5f13 100644 --- a/ckan/tests/legacy/test_plugins.py +++ b/ckan/tests/legacy/test_plugins.py @@ -7,7 +7,7 @@ from pylons import config import ckan.logic as logic -import ckan.new_authz as new_authz +import ckan.authz as authz import ckan.plugins as plugins from ckan.plugins.core import find_system_plugins from ckan.lib.create_test_data import CreateTestData @@ -161,10 +161,10 @@ def test_action_plugin_override(self): assert logic.get_action('status_show')(None, {}) == status_show_original def test_auth_plugin_override(self): - package_list_original = new_authz.is_authorized('package_list', {}) + package_list_original = authz.is_authorized('package_list', {}) with plugins.use_plugin('auth_plugin'): - assert new_authz.is_authorized('package_list', {}) != package_list_original - assert new_authz.is_authorized('package_list', {}) == package_list_original + assert authz.is_authorized('package_list', {}) != package_list_original + assert authz.is_authorized('package_list', {}) == package_list_original @raises(plugins.PluginNotFoundException) def test_inexistent_plugin_loading(self):