Skip to content
Permalink
Browse files Browse the repository at this point in the history
Perform checks on provided id when creating user
  • Loading branch information
amercader committed Oct 20, 2022
1 parent 5f6849a commit 4c22c13
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 1 deletion.
8 changes: 7 additions & 1 deletion ckan/logic/action/create.py
Expand Up @@ -1038,6 +1038,13 @@ def user_create(context, data_dict):

_check_access('user_create', context, data_dict)

author_obj = model.User.get(context.get('user'))
if data_dict.get("id"):
is_sysadmin = (author_obj and author_obj.sysadmin)
if not is_sysadmin or model.User.get(data_dict["id"]):
data_dict.pop("id", None)
context.pop("user_obj", None)

upload = uploader.get_uploader('user')
upload.update_data_dict(data_dict, 'image_url',
'image_upload', 'clear_upload')
Expand Down Expand Up @@ -1085,7 +1092,6 @@ def user_create(context, data_dict):
user_dictize_context['keep_apikey'] = True
user_dictize_context['keep_email'] = True

author_obj = model.User.get(context.get('user'))
include_plugin_extras = False
if author_obj:
include_plugin_extras = author_obj.sysadmin and 'plugin_extras' in data
Expand Down
104 changes: 104 additions & 0 deletions ckan/tests/logic/action/test_create.py
Expand Up @@ -1100,6 +1100,110 @@ def test_user_create_password_hash_not_for_normal_users(self):
user_obj = model.User.get(user["id"])
assert user_obj.password != "pretend-this-is-a-valid-hash"

def test_anon_user_create_does_not_update(self):
user1 = factories.User(about="This is user 1")
user_dict = {
"id": user1["id"],
"name": "some_name",
"email": "some_email@example.com",
"password": "test1234",
}

context = {
"user": None,
"ignore_auth": False,
}

user2 = helpers.call_action("user_create", context=context, **user_dict)
assert user2["id"] != user1["id"]
assert user2["about"] != "This is user 1"

def test_normal_user_create_does_not_update(self):
user1 = factories.User(about="This is user 1")
user_dict = {
"id": user1["id"],
"name": "some_name",
"email": "some_email@example.com",
"password": "test1234",
}

context = {
"user": factories.User()["name"],
"ignore_auth": False,
}

user2 = helpers.call_action("user_create", context=context, **user_dict)
assert user2["id"] != user1["id"]
assert user2["about"] != "This is user 1"

def test_sysadmin_user_create_does_not_update(self):
user1 = factories.User(about="This is user 1")
user_dict = {
"id": user1["id"],
"name": "some_name",
"email": "some_email@example.com",
"password": "test1234",
}

context = {
"user": factories.Sysadmin()["name"],
"ignore_auth": False,
}

user2 = helpers.call_action("user_create", context=context, **user_dict)
assert user2["id"] != user1["id"]
assert user2["about"] != "This is user 1"

def test_anon_users_can_not_provide_custom_id(self):

user_dict = {
"id": "custom_id",
"name": "some_name",
"email": "some_email@example.com",
"password": "test1234",
}

context = {
"user": None,
"ignore_auth": False,
}

user = helpers.call_action("user_create", context=context, **user_dict)
assert user["id"] != "custom_id"

def test_normal_users_can_not_provide_custom_id(self):

user_dict = {
"id": "custom_id",
"name": "some_name",
"email": "some_email@example.com",
"password": "test1234",
}

context = {
"user": factories.User()["name"],
"ignore_auth": False,
}

user = helpers.call_action("user_create", context=context, **user_dict)
assert user["id"] != "custom_id"

def test_sysadmin_can_provide_custom_id(self):

user_dict = {
"id": "custom_id",
"name": "some_name",
"email": "some_email@example.com",
"password": "test1234",
}
context = {
"user": factories.Sysadmin()["name"],
"ignore_auth": False,
}

user = helpers.call_action("user_create", context=context, **user_dict)
assert user["id"] == "custom_id"


def _clear_activities():
from ckan import model
Expand Down

0 comments on commit 4c22c13

Please sign in to comment.