Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 94 additions & 9 deletions mongoengine/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import decimal
import itertools
import re
import socket
import time
import uuid
import warnings
Expand Down Expand Up @@ -154,21 +155,105 @@ class EmailField(StringField):

.. versionadded:: 0.4
"""
USER_REGEX = re.compile(
# `dot-atom` defined in RFC 5322 Section 3.2.3.
r"(^[-!#$%&'*+/=?^_`{}|~0-9A-Z]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z]+)*\Z"
# `quoted-string` defined in RFC 5322 Section 3.2.4.
r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-\011\013\014\016-\177])*"\Z)',
re.IGNORECASE
)

UTF8_USER_REGEX = re.compile(
six.u(
# RFC 6531 Section 3.3 extends `atext` (used by dot-atom) to
# include `UTF8-non-ascii`.
r"(^[-!#$%&'*+/=?^_`{}|~0-9A-Z\u0080-\U0010FFFF]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z\u0080-\U0010FFFF]+)*\Z"
# `quoted-string`
r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-\011\013\014\016-\177])*"\Z)'
), re.IGNORECASE | re.UNICODE
)

EMAIL_REGEX = re.compile(
# dot-atom
r"(^[-!#$%&'*+/=?^_`{}|~0-9A-Z]+(\.[-!#$%&'*+/=?^_`{}|~0-9A-Z]+)*"
# quoted-string
r'|^"([\001-\010\013\014\016-\037!#-\[\]-\177]|\\[\001-011\013\014\016-\177])*"'
# domain (max length of an ICAAN TLD is 22 characters)
r')@(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}|[A-Z0-9-]{2,}(?<!-))$', re.IGNORECASE
DOMAIN_REGEX = re.compile(
r'((?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+)(?:[A-Z0-9-]{2,63}(?<!-))\Z',
re.IGNORECASE
)

error_msg = u'Invalid email address: %s'

def __init__(self, domain_whitelist=None, allow_utf8_user=False,
allow_ip_domain=False, *args, **kwargs):
"""Initialize the EmailField.

Args:
domain_whitelist (list) - list of otherwise invalid domain
names which you'd like to support.
allow_utf8_user (bool) - if True, the user part of the email
address can contain UTF8 characters.
False by default.
allow_ip_domain (bool) - if True, the domain part of the email
can be a valid IPv4 or IPv6 address.
"""
self.domain_whitelist = domain_whitelist or []
self.allow_utf8_user = allow_utf8_user
self.allow_ip_domain = allow_ip_domain
super(EmailField, self).__init__(*args, **kwargs)

def validate_user_part(self, user_part):
"""Validate the user part of the email address. Return True if
valid and False otherwise.
"""
if self.allow_utf8_user:
return self.UTF8_USER_REGEX.match(user_part)
return self.USER_REGEX.match(user_part)

def validate_domain_part(self, domain_part):
"""Validate the domain part of the email address. Return True if
valid and False otherwise.
"""
# Skip domain validation if it's in the whitelist.
if domain_part in self.domain_whitelist:
return True

if self.DOMAIN_REGEX.match(domain_part):
return True

# Validate IPv4/IPv6, e.g. user@[192.168.0.1]
if (
self.allow_ip_domain and
domain_part[0] == '[' and
domain_part[-1] == ']'
):
for addr_family in (socket.AF_INET, socket.AF_INET6):
try:
socket.inet_pton(addr_family, domain_part[1:-1])
return True
except (socket.error, UnicodeEncodeError):
pass

return False

def validate(self, value):
if not EmailField.EMAIL_REGEX.match(value):
self.error('Invalid email address: %s' % value)
super(EmailField, self).validate(value)

if '@' not in value:
self.error(self.error_msg % value)

user_part, domain_part = value.rsplit('@', 1)

# Validate the user part.
if not self.validate_user_part(user_part):
self.error(self.error_msg % value)

# Validate the domain and, if invalid, see if it's IDN-encoded.
if not self.validate_domain_part(domain_part):
try:
domain_part = domain_part.encode('idna').decode('ascii')
except UnicodeError:
self.error(self.error_msg % value)
else:
if not self.validate_domain_part(domain_part):
self.error(self.error_msg % value)


class IntField(BaseField):
"""32-bit integer field."""
Expand Down
2 changes: 1 addition & 1 deletion tests/document/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ def test_save(self):
class Recipient(Document):
email = EmailField(required=True)

recipient = Recipient(email='root@localhost')
recipient = Recipient(email='not-an-email')
self.assertRaises(ValidationError, recipient.save)
recipient.save(validate=False)

Expand Down
101 changes: 87 additions & 14 deletions tests/fields/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import itertools
import re
import pymongo
import sys

from nose.plugins.skip import SkipTest
from collections import OrderedDict
Expand Down Expand Up @@ -342,8 +343,6 @@ def test_url_validation(self):
class Link(Document):
url = URLField()

Link.drop_collection()

link = Link()
link.url = 'google'
self.assertRaises(ValidationError, link.validate)
Expand All @@ -356,8 +355,6 @@ def test_unicode_url_validation(self):
class Link(Document):
url = URLField()

Link.drop_collection()

link = Link()
link.url = u'http://привет.com'

Expand Down Expand Up @@ -3456,23 +3453,99 @@ def test_email_field(self):
class User(Document):
email = EmailField()

user = User(email="ross@example.com")
self.assertTrue(user.validate() is None)
user = User(email='ross@example.com')
user.validate()

user = User(email="ross@example.co.uk")
self.assertTrue(user.validate() is None)
user = User(email='ross@example.co.uk')
user.validate()

user = User(email=("Kofq@rhom0e4klgauOhpbpNdogawnyIKvQS0wk2mjqrgGQ5S"
"aJIazqqWkm7.net"))
self.assertTrue(user.validate() is None)
user = User(email=('Kofq@rhom0e4klgauOhpbpNdogawnyIKvQS0wk2mjqrgGQ5S'
'aJIazqqWkm7.net'))
user.validate()

user = User(email="new-tld@example.technology")
self.assertTrue(user.validate() is None)
user = User(email='new-tld@example.technology')
user.validate()

user = User(email='ross@example.com.')
self.assertRaises(ValidationError, user.validate)

# unicode domain
user = User(email=u'user@пример.рф')
user.validate()

# invalid unicode domain
user = User(email=u'user@пример')
self.assertRaises(ValidationError, user.validate)

# invalid data type
user = User(email=123)
self.assertRaises(ValidationError, user.validate)

def test_email_field_unicode_user(self):
# Don't run this test on pypy3, which doesn't support unicode regex:
# https://bitbucket.org/pypy/pypy/issues/1821/regular-expression-doesnt-find-unicode
if sys.version_info[:2] == (3, 2):
raise SkipTest('unicode email addresses are not supported on PyPy 3')

class User(Document):
email = EmailField()

# unicode user shouldn't validate by default...
user = User(email=u'Dörte@Sörensen.example.com')
self.assertRaises(ValidationError, user.validate)

# ...but it should be fine with allow_utf8_user set to True
class User(Document):
email = EmailField(allow_utf8_user=True)

user = User(email=u'Dörte@Sörensen.example.com')
user.validate()

def test_email_field_domain_whitelist(self):
class User(Document):
email = EmailField()

# localhost domain shouldn't validate by default...
user = User(email='me@localhost')
self.assertRaises(ValidationError, user.validate)

user = User(email="ross@example.com.")
# ...but it should be fine if it's whitelisted
class User(Document):
email = EmailField(domain_whitelist=['localhost'])

user = User(email='me@localhost')
user.validate()

def test_email_field_ip_domain(self):
class User(Document):
email = EmailField()

valid_ipv4 = 'email@[127.0.0.1]'
valid_ipv6 = 'email@[2001:dB8::1]'
invalid_ip = 'email@[324.0.0.1]'

# IP address as a domain shouldn't validate by default...
user = User(email=valid_ipv4)
self.assertRaises(ValidationError, user.validate)

user = User(email=valid_ipv6)
self.assertRaises(ValidationError, user.validate)

user = User(email=invalid_ip)
self.assertRaises(ValidationError, user.validate)

# ...but it should be fine with allow_ip_domain set to True
class User(Document):
email = EmailField(allow_ip_domain=True)

user = User(email=valid_ipv4)
user.validate()

user = User(email=valid_ipv6)
user.validate()

# invalid IP should still fail validation
user = User(email=invalid_ip)
self.assertRaises(ValidationError, user.validate)

def test_email_field_honors_regex(self):
Expand Down