Skip to content

Commit

Permalink
[#2838] fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
calexandr committed Jan 27, 2020
1 parent cb0e838 commit aef247b
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 34 deletions.
28 changes: 27 additions & 1 deletion ckan/cli/db.py
Expand Up @@ -4,9 +4,10 @@
import logging
import ckan.migration as migration_repo
import click
from itertools import groupby

from ckan.cli import error_shout

import ckan.model as model
log = logging.getLogger(__name__)


Expand Down Expand Up @@ -86,6 +87,31 @@ def version(hash):
)


@db.command(u"duplicate_emails", short_help=u"Check users email for duplicate")
def duplicate_emails():
u'''Check users email for duplicate'''
log.info(u"Searching for accounts with duplicate emails.")

q = model.Session.query(model.User.email,
model.User.name) \
.filter(model.User.state == u"active") \
.filter(model.User.email != u"") \
.order_by(model.User.email).all()

if not q:
log.info(u"No duplicate emails found")
try:
for k, grp in groupby(q, lambda x: x[0]):
users = [user[1] for user in grp]
if len(users) > 1:
s = u"{} appears {} time(s). Users: {}"
click.secho(
s.format(k, len(users), u", ".join(users)),
fg=u"green", bold=True)
except Exception as e:
error_shout(e)


def _version_hash_to_ordinal(version):
if u'base' == version:
return 0
Expand Down
16 changes: 11 additions & 5 deletions ckan/logic/schema.py
Expand Up @@ -384,7 +384,7 @@ def default_update_relationship_schema(
@validator_args
def default_user_schema(
ignore_missing, unicode_safe, name_validator, user_name_validator,
user_password_validator, user_password_not_empty,
user_password_validator, user_password_not_empty, email_is_unique,
ignore_not_sysadmin, not_empty, email_validator,
user_about_validator, ignore, boolean_validator):
return {
Expand All @@ -395,7 +395,7 @@ def default_user_schema(
'password': [user_password_validator, user_password_not_empty,
ignore_missing, unicode_safe],
'password_hash': [ignore_missing, ignore_not_sysadmin, unicode_safe],
'email': [not_empty, unicode_safe, email_validator],
'email': [not_empty, email_validator, email_is_unique, unicode_safe],
'about': [ignore_missing, user_about_validator, unicode_safe],
'created': [ignore],
'sysadmin': [ignore_missing, ignore_not_sysadmin],
Expand All @@ -410,9 +410,11 @@ def default_user_schema(
@validator_args
def user_new_form_schema(
unicode_safe, user_both_passwords_entered,
user_password_validator, user_passwords_match):
user_password_validator, user_passwords_match,
email_is_unique):
schema = default_user_schema()

schema['email'] = [email_is_unique]
schema['password1'] = [text_type, user_both_passwords_entered,
user_password_validator, user_passwords_match]
schema['password2'] = [text_type]
Expand All @@ -423,9 +425,10 @@ def user_new_form_schema(
@validator_args
def user_edit_form_schema(
ignore_missing, unicode_safe, user_both_passwords_entered,
user_password_validator, user_passwords_match):
user_password_validator, user_passwords_match, email_is_unique):
schema = default_user_schema()

schema['email'] = [email_is_unique]
schema['password'] = [ignore_missing]
schema['password1'] = [ignore_missing, unicode_safe,
user_password_validator, user_passwords_match]
Expand All @@ -437,11 +440,14 @@ def user_edit_form_schema(
@validator_args
def default_update_user_schema(
ignore_missing, name_validator, user_name_validator,
unicode_safe, user_password_validator):
unicode_safe, user_password_validator, email_is_unique,
not_empty, email_validator):
schema = default_user_schema()

schema['name'] = [
ignore_missing, name_validator, user_name_validator, unicode_safe]
schema['email'] = [
not_empty, email_validator, email_is_unique, unicode_safe]
schema['password'] = [
user_password_validator, ignore_missing, unicode_safe]

Expand Down
20 changes: 20 additions & 0 deletions ckan/logic/validators.py
Expand Up @@ -859,6 +859,26 @@ def email_validator(value, context):
raise Invalid(_('Email {email} is not a valid format').format(email=value))
return value

def email_is_unique(key, data, errors, context):
'''Validate email is unique'''
model = context['model']
session = context['session']

users = session.query(model.User) \
.filter(model.User.email == data[key]).all()
# is there is no users with this email it's free
if not users:
return
else:
# allow user to update their own email
for user in users:
if user.id == data[("id",)]:
return

raise Invalid(
_('The email address \'{email}\' \
belongs to a registered user.').
format(email=data[key]))

def one_of(list_of_value):
''' Checks if the provided value is present in a list '''
Expand Down
31 changes: 15 additions & 16 deletions ckan/tests/controllers/test_user.py
Expand Up @@ -259,6 +259,21 @@ def test_email_change_with_password(self, app):
})
assert "Profile updated" in response

def test_email_change_on_existed_email(self, app):
user1 = factories.User(email='existed@email.com')
user2 = factories.User()
env = {"REMOTE_USER": six.ensure_str(user2["name"])}

response = app.post(url=url_for("user.edit"), extra_environ=env, data={
"email": "existed@email.com",
"save": "",
"old_password": "RandomPassword123",
"password1": "",
"password2": "",
"name": user2['name'],
})
assert 'belongs to a registered user' in response

def test_edit_user_logged_in_username_change(self, app):

user_pass = "TestPassword1"
Expand Down Expand Up @@ -769,22 +784,6 @@ def test_request_reset_by_name(self, send_reset_link, app):
assert "A reset link has been emailed to you" in response
assert send_reset_link.call_args[0][0].id == user["id"]

@mock.patch("ckan.lib.mailer.send_reset_link")
def test_request_reset_when_duplicate_emails(self, send_reset_link, app):
user_a = factories.User(email="me@example.com")
user_b = factories.User(email="me@example.com")

offset = url_for("user.request_reset")
response = app.post(
offset, data=dict(user="me@example.com")
)

assert "A reset link has been emailed to you" in response
emailed_users = [
call[0][0].name for call in send_reset_link.call_args_list
]
assert sorted(emailed_users) == sorted([user_a["name"], user_b["name"]])

def test_request_reset_without_param(self, app):

offset = url_for("user.request_reset")
Expand Down
2 changes: 1 addition & 1 deletion ckan/tests/legacy/logic/test_action.py
Expand Up @@ -306,7 +306,7 @@ def test_01_package_list(self, app):
sysadmin_user_dict = {
"id": self.sysadmin_user.id,
"fullname": "Updated sysadmin user full name",
"email": "me@test.org",
"email": "sys@test.org",
"about": "Updated sysadmin user about",
}

Expand Down
2 changes: 1 addition & 1 deletion ckan/tests/legacy/logic/test_auth.py
Expand Up @@ -64,7 +64,7 @@ def create(name):
user = {
"name": name,
"password": "TestPassword1",
"email": "moo@moo.com",
"email": "{}@moo.com".format(name),
}
res = call_api("user_create", user, "sysadmin", 200)
apikeys[name] = str(json.loads(res.body)["result"]["apikey"])
Expand Down
3 changes: 2 additions & 1 deletion ckan/tests/logic/action/test_create.py
Expand Up @@ -88,7 +88,8 @@ def test_works_even_if_username_already_exists(self, rand, _):
rand.return_value.choice.side_effect = "TestPassword1" * 3

for _ in range(3):
invited_user = self._invite_user_to_group(email="same@email.com")
invited_user = self._invite_user_to_group(
email="same{}@email.com".format(_))
assert invited_user is not None, invited_user

@mock.patch("ckan.lib.mailer.send_invite")
Expand Down
15 changes: 6 additions & 9 deletions ckan/tests/logic/action/test_update.py
Expand Up @@ -160,7 +160,6 @@ def test_user_update_with_invalid_name(self, name):
user = factories.User()
user["name"] = name
with pytest.raises(logic.ValidationError):

helpers.call_action("user_update", **user)

def test_user_update_to_name_that_already_exists(self):
Expand All @@ -182,7 +181,8 @@ def test_user_update_password(self):
# we're not updating it, otherwise validation fails.
helpers.call_action(
"user_update",
id=user["name"],
id=user["id"],
name=user["name"],
email=user["email"],
password="new password",
)
Expand Down Expand Up @@ -270,7 +270,8 @@ def test_user_update_activity_stream(self):
# fails.
helpers.call_action(
"user_update",
id=user["name"],
id=user["id"],
name=user["name"],
email=user["email"],
password=factories.User.attributes()["password"],
fullname="updated full name",
Expand Down Expand Up @@ -317,17 +318,13 @@ def test_user_update_with_custom_schema(self):
helpers.call_action(
"user_update",
context={"schema": schema},
id=user["name"],
id=user["id"],
name=user["name"],
email=user["email"],
password=factories.User.attributes()["password"],
fullname="updated full name",
)

# Since we passed user['name'] to user_update as the 'id' param,
# our mock validator method should have been called once with
# user['name'] as arg.
mock_validator.assert_called_once_with(user["name"])

def test_user_update_multiple(self):
"""Test that updating multiple user attributes at once works."""

Expand Down
50 changes: 50 additions & 0 deletions ckan/tests/logic/test_validators.py
Expand Up @@ -10,12 +10,15 @@
import mock
import pytest

from collections import namedtuple

import ckan.lib.navl.dictization_functions as df
import ckan.logic.validators as validators
import ckan.model as model
import ckan.tests.factories as factories
import ckan.tests.helpers as helpers
import ckan.tests.lib.navl.test_validators as t
import ckan.logic as logic


def returns_arg(function):
Expand Down Expand Up @@ -160,6 +163,53 @@ def call_and_assert(key, data, errors, context):
return decorator


@pytest.mark.usefixtures("clean_db")
def test_email_is_unique_validator_with_existed_value():
user1 = factories.User(username="user01", email="user01@email.com")

# try to create new user with occupied email
with pytest.raises(logic.ValidationError):
factories.User(username="new_user", email="user01@email.com")


@pytest.mark.usefixtures("clean_db")
def test_email_is_unique_validator_user_update_email_unchanged():
user = factories.User(username="user01", email="user01@email.com")

# try to update user1 and leave email unchanged
old_email = "user01@email.com"

helpers.call_action("user_update", **user)
updated_user = model.User.get(user["id"])

assert updated_user.email == old_email


@pytest.mark.usefixtures("clean_db")
def test_email_is_unique_validator_user_update_email_new():
user = factories.User(username="user01", email="user01@email.com")

# try to update user1 email to unoccupied one
new_email = "user_new@email.com"
user["email"] = new_email

helpers.call_action("user_update", **user)
updated_user = model.User.get(user["id"])

assert updated_user.email == new_email


@pytest.mark.usefixtures("clean_db")
def test_email_is_unique_validator_user_update_to_existed_email():
user1 = factories.User(username="user01", email="user01@email.com")
user2 = factories.User(username="user02", email="user02@email.com")

# try to update user1 email to existed one
user1["email"] = user2["email"]
with pytest.raises(logic.ValidationError):
helpers.call_action("user_update", **user1)


def test_name_validator_with_invalid_value():
"""If given an invalid value name_validator() should do raise Invalid.
Expand Down

0 comments on commit aef247b

Please sign in to comment.