Skip to content

Commit

Permalink
account: Added bulk_create_from_emails().
Browse files Browse the repository at this point in the history
Needed for #787.
  • Loading branch information
espenak committed Aug 28, 2015
1 parent 18671e2 commit 3ccb40e
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 43 deletions.
6 changes: 6 additions & 0 deletions devilry/devilry_account/exceptions.py
@@ -0,0 +1,6 @@
class IllegalOperationError(Exception):
"""
Raised if performing an illegal operation with
the ``devilry.devilry_account`` models.
"""
pass
92 changes: 92 additions & 0 deletions devilry/devilry_account/models.py
@@ -1,9 +1,12 @@
from django.conf import settings
from django.contrib.auth.models import AbstractBaseUser, BaseUserManager, PermissionsMixin
from django.core.exceptions import ValidationError
from django.db import models
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _

from devilry.devilry_account.exceptions import IllegalOperationError


class UserQuerySet(models.QuerySet):
def prefetch_related_notification_emails(self):
Expand Down Expand Up @@ -206,6 +209,67 @@ def get_by_username(self, username):
"""
return self.get_queryset().filter(username__username=username).get()

def __create_primary_useremail_objects_from_users(self, users):
"""
Create :class:`.UserEmail` objects for the given iterable of
:class:`.User` objects.
Uses the ``shortname`` as email, and the UserEmail objects
is all marked as primary emails, and as notification
emails.
"""
new_useremail_objects = []
for user in users:
new_username_object = UserEmail(user=user,
email=user.shortname,
is_primary=True,
use_for_notifications=True)
new_useremail_objects.append(new_username_object)
UserEmail.objects.bulk_create(new_useremail_objects)

def bulk_create_from_emails(self, emails):
"""
Bulk create users for all the emails
in the given ``emails`` iterator.
All users is created with unusable password.
We create a :class:`.UserEmail` object for each of the created
users. This UserEmail object has ``is_primary`` set to ``True``.
Raises:
devilry_account.exceptions.ConfigurationError: If the
``DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND``-setting is ``False``.
Returns:
A ``(created_users, excluded_emails)``-tuple.
``created_users`` is a queryset with the created users.
``excluded_email`` is a set of the emails that already existed.
"""
if not settings.DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND:
raise IllegalOperationError('You can not use bulk_create_from_emails() when '
'DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND is False.')
existing_emails = set(UserEmail.objects.filter(email__in=emails).values_list('email', flat=True))
existing_shortnames = set(User.objects.filter(shortname__in=emails).values_list('shortname', flat=True))
existing_emails = existing_emails.union(existing_shortnames)

all_emails_set = set(emails)
new_emails_set = all_emails_set.difference(existing_emails)

new_user_objects = []
for email in new_emails_set:
new_user = User(shortname=email)
new_user.set_unusable_password()
new_user_objects.append(new_user)
User.objects.bulk_create(new_user_objects)
created_users = User.objects.filter(shortname__in=new_emails_set)

self.__create_primary_useremail_objects_from_users(created_users)

return created_users, existing_emails

def __create_primary_username_objects_from_users(self, users):
"""
Create :class:`.UserName` objects for the given iterable of
Expand All @@ -220,6 +284,25 @@ def __create_primary_username_objects_from_users(self, users):
new_username_objects.append(new_username_object)
UserName.objects.bulk_create(new_username_objects)

def __create_primary_useremail_objects_from_users_via_suffix(self, users):
"""
Create :class:`.UserEmail` objects for the given iterable of
:class:`.User` objects.
Uses the ``shortname@<settings.DEVILRY_DEFAULT_EMAIL_SUFFIX>`` as email,
and the UserEmail objects is all marked as primary emails, and as notification
emails.
"""
new_useremail_objects = []
for user in users:
new_username_object = UserEmail(
user=user,
email=u'{}{}'.format(user.shortname, settings.DEVILRY_DEFAULT_EMAIL_SUFFIX),
is_primary=True,
use_for_notifications=True)
new_useremail_objects.append(new_username_object)
UserEmail.objects.bulk_create(new_useremail_objects)

def bulk_create_from_usernames(self, usernames):
"""
Bulk create users for all the usernames
Expand All @@ -230,13 +313,20 @@ def bulk_create_from_usernames(self, usernames):
We create a :class:`.UserName` object for each of the created
users. This UserName object has ``is_primary`` set to ``True``.
Raises:
devilry_account.exceptions.IllegalOperationError: If the
``DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND``-setting is ``True``.
Returns:
A ``(created_users, excluded_usernames)``-tuple.
``created_users`` is a queryset with the created users.
``excluded_username`` is a set of the usernames that already existed.
"""
if settings.DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND:
raise IllegalOperationError('You can not use bulk_create_from_usernames() when '
'DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND is True.')
existing_usernames = set(UserName.objects.filter(username__in=usernames).values_list('username', flat=True))
existing_shortnames = set(User.objects.filter(shortname__in=usernames).values_list('shortname', flat=True))
existing_usernames = existing_usernames.union(existing_shortnames)
Expand All @@ -253,6 +343,8 @@ def bulk_create_from_usernames(self, usernames):
created_users = User.objects.filter(shortname__in=new_usernames_set)

self.__create_primary_username_objects_from_users(created_users)
if settings.DEVILRY_DEFAULT_EMAIL_SUFFIX:
self.__create_primary_useremail_objects_from_users_via_suffix(created_users)

return created_users, existing_usernames

Expand Down
182 changes: 139 additions & 43 deletions devilry/devilry_account/tests/test_models.py
Expand Up @@ -3,6 +3,7 @@
from django.test import TestCase
from django.utils import timezone
from model_mommy import mommy
from devilry.devilry_account.exceptions import IllegalOperationError

from devilry.devilry_account.models import User, UserEmail, UserName

Expand Down Expand Up @@ -206,59 +207,154 @@ def test_get_by_username_doesnotexist(self):
with self.assertRaises(User.DoesNotExist):
User.objects.get_by_username(username='test@example.com')

def test_bulk_create_from_usernames_not_allowed_with_email_auth_backend(self):
with self.settings(DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND=True):
with self.assertRaises(IllegalOperationError):
User.objects.bulk_create_from_usernames([])

def test_bulk_create_from_usernames_empty_input_list(self):
created_users, existing_usernames = User.objects.bulk_create_from_usernames([])
self.assertEqual(created_users.count(), 0)
self.assertEqual(User.objects.count(), 0)
self.assertEqual(UserName.objects.count(), 0)
self.assertEqual(set(), existing_usernames)
with self.settings(DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND=False):
created_users, existing_usernames = User.objects.bulk_create_from_usernames([])
self.assertEqual(created_users.count(), 0)
self.assertEqual(User.objects.count(), 0)
self.assertEqual(UserName.objects.count(), 0)
self.assertEqual(set(), existing_usernames)

def test_bulk_create_from_usernames_single_new_user(self):
created_users, existing_usernames = User.objects.bulk_create_from_usernames(['testuser'])

self.assertEqual(created_users.count(), 1)
self.assertEqual(User.objects.count(), 1)
created_user = created_users.first()
self.assertEqual(created_user.shortname, 'testuser')

self.assertEqual(UserName.objects.count(), 1)
created_username_object = UserName.objects.first()
self.assertEqual(created_username_object.username, 'testuser')
self.assertTrue(created_username_object.is_primary)

self.assertEqual(set(), existing_usernames)
with self.settings(DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND=False):
created_users, existing_usernames = User.objects.bulk_create_from_usernames(['testuser'])

self.assertEqual(created_users.count(), 1)
self.assertEqual(User.objects.count(), 1)
created_user = created_users.first()
self.assertEqual(created_user.shortname, 'testuser')
self.assertFalse(created_user.has_usable_password())

self.assertEqual(UserName.objects.count(), 1)
created_username_object = UserName.objects.first()
self.assertEqual(created_username_object.username, 'testuser')
self.assertTrue(created_username_object.is_primary)

self.assertEqual(set(), existing_usernames)

def test_bulk_create_from_usernames_create_no_useremail_if_default_email_suffix_is_not_defined(self):
with self.settings(DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND=False,
DEVILRY_DEFAULT_EMAIL_SUFFIX=''):
User.objects.bulk_create_from_usernames(['testuser'])
self.assertEqual(UserEmail.objects.count(), 0)

def test_bulk_create_from_usernames_create_useremail_if_a_default_email_suffix_is_defined(self):
with self.settings(DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND=False,
DEVILRY_DEFAULT_EMAIL_SUFFIX='@example.com'):
User.objects.bulk_create_from_usernames(['testuser'])
self.assertEqual(UserEmail.objects.count(), 1)
created_useremail_object = UserEmail.objects.first()
self.assertEqual(created_useremail_object.email, 'testuser@example.com')
self.assertTrue(created_useremail_object.is_primary)
self.assertTrue(created_useremail_object.use_for_notifications)

def test_bulk_create_from_usernames_multiple_new_users(self):
created_users, existing_usernames = User.objects.bulk_create_from_usernames(
['testuser1', 'testuser2', 'testuser3'])
with self.settings(DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND=False):
created_users, existing_usernames = User.objects.bulk_create_from_usernames(
['testuser1', 'testuser2', 'testuser3'])

self.assertEqual(created_users.count(), 3)
self.assertEqual(User.objects.count(), 3)
self.assertEqual({'testuser1', 'testuser2', 'testuser3'},
set(User.objects.all().values_list('shortname', flat=True)))
self.assertEqual(created_users.count(), 3)
self.assertEqual(User.objects.count(), 3)
self.assertEqual({'testuser1', 'testuser2', 'testuser3'},
set(User.objects.all().values_list('shortname', flat=True)))

self.assertEqual(UserName.objects.count(), 3)
self.assertEqual({'testuser1', 'testuser2', 'testuser3'},
set(UserName.objects.all().values_list('username', flat=True)))
self.assertEqual(3, UserName.objects.filter(is_primary=True).count())
self.assertEqual(UserName.objects.count(), 3)
self.assertEqual({'testuser1', 'testuser2', 'testuser3'},
set(UserName.objects.all().values_list('username', flat=True)))
self.assertEqual(3, UserName.objects.filter(is_primary=True).count())

self.assertEqual(set(), existing_usernames)
self.assertEqual(set(), existing_usernames)

def test_bulk_create_from_usernames_exclude_existing(self):
mommy.make('devilry_account.User', shortname='testuser1')
mommy.make('devilry_account.UserName', username='testuser2')

created_users, existing_usernames = User.objects.bulk_create_from_usernames(
['testuser1', 'testuser2', 'testuser3'])

self.assertEqual(User.objects.count(), 3)
self.assertEqual(created_users.count(), 1)
self.assertEqual(created_users.first().shortname, 'testuser3')

self.assertEqual(UserName.objects.count(), 2) # We only have 2, since ``testuser1`` did not have a UserName
self.assertEqual(created_users.first().username_set.first().username, 'testuser3')

self.assertEqual({'testuser1', 'testuser2'}, existing_usernames)
with self.settings(DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND=False):
mommy.make('devilry_account.User', shortname='testuser1')
mommy.make('devilry_account.UserName', username='testuser2')

created_users, existing_usernames = User.objects.bulk_create_from_usernames(
['testuser1', 'testuser2', 'testuser3'])

self.assertEqual(User.objects.count(), 3)
self.assertEqual(created_users.count(), 1)
self.assertEqual(created_users.first().shortname, 'testuser3')

# We only have 2, since ``testuser1`` did not have a UserName
self.assertEqual(UserName.objects.count(), 2)
self.assertEqual(created_users.first().username_set.first().username, 'testuser3')

self.assertEqual({'testuser1', 'testuser2'}, existing_usernames)

def test_bulk_create_from_emails_not_allowed_with_username_auth_backend(self):
with self.settings(DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND=False):
with self.assertRaises(IllegalOperationError):
User.objects.bulk_create_from_emails([])

def test_bulk_create_from_emails_empty_input_list(self):
with self.settings(DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND=True):
created_users, existing_emails = User.objects.bulk_create_from_emails([])
self.assertEqual(created_users.count(), 0)
self.assertEqual(User.objects.count(), 0)
self.assertEqual(UserEmail.objects.count(), 0)
self.assertEqual(set(), existing_emails)

def test_bulk_create_from_emails_single_new_user(self):
with self.settings(DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND=True):
created_users, existing_emails = User.objects.bulk_create_from_emails(
['testuser@example.com'])

self.assertEqual(created_users.count(), 1)
self.assertEqual(User.objects.count(), 1)
created_user = created_users.first()
self.assertEqual(created_user.shortname, 'testuser@example.com')
self.assertFalse(created_user.has_usable_password())

self.assertEqual(UserEmail.objects.count(), 1)
created_useremail_object = UserEmail.objects.first()
self.assertEqual(created_useremail_object.email, 'testuser@example.com')
self.assertTrue(created_useremail_object.is_primary)
self.assertTrue(created_useremail_object.use_for_notifications)

self.assertEqual(UserName.objects.count(), 0)
self.assertEqual(set(), existing_emails)

def test_bulk_create_from_emails_multiple_new_users(self):
with self.settings(DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND=True):
created_users, existing_emails = User.objects.bulk_create_from_emails(
['testuser1@example.com', 'testuser2@example.com', 'testuser3@example.com'])

self.assertEqual(created_users.count(), 3)
self.assertEqual(User.objects.count(), 3)
self.assertEqual({'testuser1@example.com', 'testuser2@example.com', 'testuser3@example.com'},
set(User.objects.all().values_list('shortname', flat=True)))

self.assertEqual(UserEmail.objects.count(), 3)
self.assertEqual({'testuser1@example.com', 'testuser2@example.com', 'testuser3@example.com'},
set(UserEmail.objects.all().values_list('email', flat=True)))
self.assertEqual(3, UserEmail.objects.filter(is_primary=True).count())

self.assertEqual(set(), existing_emails)

def test_bulk_create_from_emails_exclude_existing(self):
with self.settings(DJANGO_CRADMIN_USE_EMAIL_AUTH_BACKEND=True):
mommy.make('devilry_account.User', shortname='testuser1@example.com')
mommy.make('devilry_account.UserEmail', email='testuser2@example.com')

created_users, existing_emails = User.objects.bulk_create_from_emails(
['testuser1@example.com', 'testuser2@example.com', 'testuser3@example.com'])

self.assertEqual(User.objects.count(), 3)
self.assertEqual(created_users.count(), 1)
self.assertEqual(created_users.first().shortname, 'testuser3@example.com')

# We only have 2, since ``testuser1@example.com`` did not have a UserEmail
self.assertEqual(UserEmail.objects.count(), 2)
self.assertEqual(created_users.first().useremail_set.first().email, 'testuser3@example.com')

self.assertEqual({'testuser1@example.com', 'testuser2@example.com'}, existing_emails)


class TestUserEmail(TestCase):
Expand Down

0 comments on commit 3ccb40e

Please sign in to comment.