Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Fixed #15042 -- Ensured that email addresses without a domain can sti…

…ll be mail recipients. Patch also improves the IDN handling introduced by r15006, and refactors the test suite to ensure even feature coverage. Thanks to net147 for the report, and to Łukasz Rekucki for the awesome patch.

git-svn-id: http://code.djangoproject.com/svn/django/trunk@15211 bcc190cf-cafb-0310-a4f2-bffc1f526a37
  • Loading branch information...
commit 11997218eeed02c8be7a81b87db321464839a2cf 1 parent 0d70d29
Russell Keith-Magee authored January 15, 2011
13  django/core/mail/backends/smtp.py
... ...
@@ -1,5 +1,4 @@
1 1
 """SMTP email backend class."""
2  
-
3 2
 import smtplib
4 3
 import socket
5 4
 import threading
@@ -7,6 +6,8 @@
7 6
 from django.conf import settings
8 7
 from django.core.mail.backends.base import BaseEmailBackend
9 8
 from django.core.mail.utils import DNS_NAME
  9
+from django.core.mail.message import sanitize_address
  10
+
10 11
 
11 12
 class EmailBackend(BaseEmailBackend):
12 13
     """
@@ -91,17 +92,13 @@ def send_messages(self, email_messages):
91 92
             self._lock.release()
92 93
         return num_sent
93 94
 
94  
-    def _sanitize(self, email):
95  
-        name, domain = email.split('@', 1)
96  
-        email = '@'.join([name, domain.encode('idna')])
97  
-        return email
98  
-
99 95
     def _send(self, email_message):
100 96
         """A helper method that does the actual sending."""
101 97
         if not email_message.recipients():
102 98
             return False
103  
-        from_email = self._sanitize(email_message.from_email)
104  
-        recipients = map(self._sanitize, email_message.recipients())
  99
+        from_email = sanitize_address(email_message.from_email, email_message.encoding)
  100
+        recipients = [sanitize_address(addr, email_message.encoding)
  101
+                      for addr in email_message.recipients()]
105 102
         try:
106 103
             self.connection.sendmail(from_email, recipients,
107 104
                     email_message.message().as_string())
65  django/core/mail/message.py
@@ -12,6 +12,7 @@
12 12
 from django.conf import settings
13 13
 from django.core.mail.utils import DNS_NAME
14 14
 from django.utils.encoding import smart_str, force_unicode
  15
+from email.Utils import parseaddr
15 16
 
16 17
 # Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from
17 18
 # some spam filters.
@@ -54,6 +55,22 @@ def make_msgid(idstring=None):
54 55
     return msgid
55 56
 
56 57
 
  58
+# Header names that contain structured address data (RFC #5322)
  59
+ADDRESS_HEADERS = set([
  60
+    'from',
  61
+    'sender',
  62
+    'reply-to',
  63
+    'to',
  64
+    'cc',
  65
+    'bcc',
  66
+    'resent-from',
  67
+    'resent-sender',
  68
+    'resent-to',
  69
+    'resent-cc',
  70
+    'resent-bcc',
  71
+])
  72
+
  73
+
57 74
 def forbid_multi_line_headers(name, val, encoding):
58 75
     """Forbids multi-line headers, to prevent header injection."""
59 76
     encoding = encoding or settings.DEFAULT_CHARSET
@@ -63,43 +80,57 @@ def forbid_multi_line_headers(name, val, encoding):
63 80
     try:
64 81
         val = val.encode('ascii')
65 82
     except UnicodeEncodeError:
66  
-        if name.lower() in ('to', 'from', 'cc'):
67  
-            result = []
68  
-            for nm, addr in getaddresses((val,)):
69  
-                nm = str(Header(nm.encode(encoding), encoding))
70  
-                try:
71  
-                    addr = addr.encode('ascii')
72  
-                except UnicodeEncodeError:  # IDN
73  
-                    addr = str(Header(addr.encode(encoding), encoding))
74  
-                result.append(formataddr((nm, addr)))
75  
-            val = ', '.join(result)
  83
+        if name.lower() in ADDRESS_HEADERS:
  84
+            val = ', '.join(sanitize_address(addr, encoding)
  85
+                for addr in getaddresses((val,)))
76 86
         else:
77  
-            val = Header(val.encode(encoding), encoding)
  87
+            val = str(Header(val, encoding))
78 88
     else:
79 89
         if name.lower() == 'subject':
80 90
             val = Header(val)
81 91
     return name, val
82 92
 
  93
+
  94
+def sanitize_address(addr, encoding):
  95
+    if isinstance(addr, basestring):
  96
+        addr = parseaddr(force_unicode(addr))
  97
+    nm, addr = addr
  98
+    nm = str(Header(nm, encoding))
  99
+    try:
  100
+        addr = addr.encode('ascii')
  101
+    except UnicodeEncodeError:  # IDN
  102
+        if u'@' in addr:
  103
+            localpart, domain = addr.split(u'@', 1)
  104
+            localpart = str(Header(localpart, encoding))
  105
+            domain = domain.encode('idna')
  106
+            addr = '@'.join([localpart, domain])
  107
+        else:
  108
+            addr = str(Header(addr, encoding))
  109
+    return formataddr((nm, addr))
  110
+
  111
+
83 112
 class SafeMIMEText(MIMEText):
84  
-    
  113
+
85 114
     def __init__(self, text, subtype, charset):
86 115
         self.encoding = charset
87 116
         MIMEText.__init__(self, text, subtype, charset)
88  
-    
89  
-    def __setitem__(self, name, val):    
  117
+
  118
+    def __setitem__(self, name, val):
90 119
         name, val = forbid_multi_line_headers(name, val, self.encoding)
91 120
         MIMEText.__setitem__(self, name, val)
92 121
 
  122
+
93 123
 class SafeMIMEMultipart(MIMEMultipart):
94  
-    
  124
+
95 125
     def __init__(self, _subtype='mixed', boundary=None, _subparts=None, encoding=None, **_params):
96 126
         self.encoding = encoding
97 127
         MIMEMultipart.__init__(self, _subtype, boundary, _subparts, **_params)
98  
-        
  128
+
99 129
     def __setitem__(self, name, val):
100 130
         name, val = forbid_multi_line_headers(name, val, self.encoding)
101 131
         MIMEMultipart.__setitem__(self, name, val)
102 132
 
  133
+
103 134
 class EmailMessage(object):
104 135
     """
105 136
     A container for email information.
@@ -274,7 +305,7 @@ def __init__(self, subject='', body='', from_email=None, to=None, bcc=None,
274 305
         conversions.
275 306
         """
276 307
         super(EmailMultiAlternatives, self).__init__(subject, body, from_email, to, bcc, connection, attachments, headers, cc)
277  
-        self.alternatives=alternatives or []
  308
+        self.alternatives = alternatives or []
278 309
 
279 310
     def attach_alternative(self, content, mimetype):
280 311
         """Attach an alternative content representation."""
644  tests/regressiontests/mail/tests.py
... ...
@@ -1,21 +1,62 @@
1 1
 # coding: utf-8
  2
+import asyncore
2 3
 import email
3 4
 import os
4 5
 import shutil
  6
+import smtpd
5 7
 import sys
6  
-import tempfile
7 8
 from StringIO import StringIO
  9
+import tempfile
  10
+import threading
  11
+
8 12
 from django.conf import settings
9 13
 from django.core import mail
10 14
 from django.core.mail import EmailMessage, mail_admins, mail_managers, EmailMultiAlternatives
11 15
 from django.core.mail import send_mail, send_mass_mail
12  
-from django.core.mail.backends.base import BaseEmailBackend
13 16
 from django.core.mail.backends import console, dummy, locmem, filebased, smtp
14 17
 from django.core.mail.message import BadHeaderError
15 18
 from django.test import TestCase
16 19
 from django.utils.translation import ugettext_lazy
  20
+from django.utils.functional import wraps
  21
+
  22
+
  23
+def alter_django_settings(**kwargs):
  24
+    oldvalues = {}
  25
+    nonexistant = []
  26
+    for setting, newvalue in kwargs.iteritems():
  27
+        try:
  28
+            oldvalues[setting] = getattr(settings, setting)
  29
+        except AttributeError:
  30
+            nonexistant.append(setting)
  31
+        setattr(settings, setting, newvalue)
  32
+    return oldvalues, nonexistant
  33
+
  34
+
  35
+def restore_django_settings(state):
  36
+    oldvalues, nonexistant = state
  37
+    for setting, oldvalue in oldvalues.iteritems():
  38
+        setattr(settings, setting, oldvalue)
  39
+    for setting in nonexistant:
  40
+        delattr(settings, setting)
  41
+
  42
+
  43
+def with_django_settings(**kwargs):
  44
+    def decorator(test):
  45
+        @wraps(test)
  46
+        def decorated_test(self):
  47
+            state = alter_django_settings(**kwargs)
  48
+            try:
  49
+                return test(self)
  50
+            finally:
  51
+                restore_django_settings(state)
  52
+        return decorated_test
  53
+    return decorator
  54
+
17 55
 
18 56
 class MailTests(TestCase):
  57
+    """
  58
+    Non-backend specific tests.
  59
+    """
19 60
 
20 61
     def test_ascii(self):
21 62
         email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'])
@@ -26,7 +67,7 @@ def test_ascii(self):
26 67
         self.assertEqual(message['To'], 'to@example.com')
27 68
 
28 69
     def test_multiple_recipients(self):
29  
-        email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com','other@example.com'])
  70
+        email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'])
30 71
         message = email.message()
31 72
         self.assertEqual(message['Subject'].encode(), 'Subject')
32 73
         self.assertEqual(message.get_payload(), 'Content')
@@ -40,14 +81,6 @@ def test_cc(self):
40 81
         self.assertEqual(message['Cc'], 'cc@example.com')
41 82
         self.assertEqual(email.recipients(), ['to@example.com', 'cc@example.com'])
42 83
 
43  
-        # Verify headers
44  
-        old_stdout = sys.stdout
45  
-        sys.stdout = StringIO()
46  
-        connection = console.EmailBackend()
47  
-        connection.send_messages([email])
48  
-        self.assertTrue(sys.stdout.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nCc: cc@example.com\nDate: '))
49  
-        sys.stdout = old_stdout
50  
-
51 84
         # Test multiple CC with multiple To
52 85
         email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com', 'other@example.com'], cc=['cc@example.com', 'cc.other@example.com'])
53 86
         message = email.message()
@@ -83,33 +116,6 @@ def test_message_header_overrides(self):
83 116
         email = EmailMessage('subject', 'content', 'from@example.com', ['to@example.com'], headers=headers)
84 117
         self.assertEqual(email.message().as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: subject\nFrom: from@example.com\nTo: to@example.com\ndate: Fri, 09 Nov 2001 01:08:47 -0000\nMessage-ID: foo\n\ncontent')
85 118
 
86  
-    def test_empty_admins(self):
87  
-        """
88  
-        Test that mail_admins/mail_managers doesn't connect to the mail server
89  
-        if there are no recipients (#9383)
90  
-        """
91  
-        old_admins = settings.ADMINS
92  
-        old_managers = settings.MANAGERS
93  
-
94  
-        settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')]
95  
-        mail.outbox = []
96  
-        mail_admins('hi', 'there')
97  
-        self.assertEqual(len(mail.outbox), 1)
98  
-        mail.outbox = []
99  
-        mail_managers('hi', 'there')
100  
-        self.assertEqual(len(mail.outbox), 1)
101  
-
102  
-        settings.ADMINS = settings.MANAGERS = []
103  
-        mail.outbox = []
104  
-        mail_admins('hi', 'there')
105  
-        self.assertEqual(len(mail.outbox), 0)
106  
-        mail.outbox = []
107  
-        mail_managers('hi', 'there')
108  
-        self.assertEqual(len(mail.outbox), 0)
109  
-
110  
-        settings.ADMINS = old_admins
111  
-        settings.MANAGERS = old_managers
112  
-
113 119
     def test_from_header(self):
114 120
         """
115 121
         Make sure we can manually set the From header (#9214)
@@ -129,17 +135,26 @@ def test_multiple_message_call(self):
129 135
         message = email.message()
130 136
         self.assertEqual(message['From'], 'from@example.com')
131 137
 
132  
-    def test_unicode_header(self):
  138
+    def test_unicode_address_header(self):
133 139
         """
134 140
         Regression for #11144 - When a to/from/cc header contains unicode,
135 141
         make sure the email addresses are parsed correctly (especially with
136 142
         regards to commas)
137 143
         """
138  
-        email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" <to@example.com>','other@example.com'])
  144
+        email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Firstname Sürname" <to@example.com>', 'other@example.com'])
139 145
         self.assertEqual(email.message()['To'], '=?utf-8?q?Firstname_S=C3=BCrname?= <to@example.com>, other@example.com')
140  
-        email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" <to@example.com>','other@example.com'])
  146
+        email = EmailMessage('Subject', 'Content', 'from@example.com', ['"Sürname, Firstname" <to@example.com>', 'other@example.com'])
141 147
         self.assertEqual(email.message()['To'], '=?utf-8?q?S=C3=BCrname=2C_Firstname?= <to@example.com>, other@example.com')
142 148
 
  149
+    def test_unicode_headers(self):
  150
+        email = EmailMessage(u"Gżegżółka", "Content", "from@example.com", ["to@example.com"],
  151
+                             headers={"Sender": '"Firstname Sürname" <sender@example.com>',
  152
+                                      "Comments": 'My Sürname is non-ASCII'})
  153
+        message = email.message()
  154
+        self.assertEqual(message['Subject'], '=?utf-8?b?R8W8ZWfFvMOzxYJrYQ==?=')
  155
+        self.assertEqual(message['Sender'], '=?utf-8?q?Firstname_S=C3=BCrname?= <sender@example.com>')
  156
+        self.assertEqual(message['Comments'], '=?utf-8?q?My_S=C3=BCrname_is_non-ASCII?=')
  157
+
143 158
     def test_safe_mime_multipart(self):
144 159
         """
145 160
         Make sure headers can be set with a different encoding than utf-8 in
@@ -193,26 +208,7 @@ def test_attachments(self):
193 208
         self.assertEqual(payload[0].get_content_type(), 'multipart/alternative')
194 209
         self.assertEqual(payload[1].get_content_type(), 'application/pdf')
195 210
 
196  
-    def test_arbitrary_stream(self):
197  
-        """
198  
-        Test that the console backend can be pointed at an arbitrary stream.
199  
-        """
200  
-        s = StringIO()
201  
-        connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s)
202  
-        send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection)
203  
-        self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: '))
204  
-
205  
-    def test_stdout(self):
206  
-        """Make sure that the console backend writes to stdout by default"""
207  
-        old_stdout = sys.stdout
208  
-        sys.stdout = StringIO()
209  
-        connection = console.EmailBackend()
210  
-        email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
211  
-        connection.send_messages([email])
212  
-        self.assertTrue(sys.stdout.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: '))
213  
-        sys.stdout = old_stdout
214  
-
215  
-    def test_dummy(self):
  211
+    def test_dummy_backend(self):
216 212
         """
217 213
         Make sure that dummy backends returns correct number of sent messages
218 214
         """
@@ -220,52 +216,6 @@ def test_dummy(self):
220 216
         email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
221 217
         self.assertEqual(connection.send_messages([email, email, email]), 3)
222 218
 
223  
-    def test_locmem(self):
224  
-        """
225  
-        Make sure that the locmen backend populates the outbox.
226  
-        """
227  
-        mail.outbox = []
228  
-        connection = locmem.EmailBackend()
229  
-        email1 = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
230  
-        email2 = EmailMessage('Subject 2', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
231  
-        connection.send_messages([email1, email2])
232  
-        self.assertEqual(len(mail.outbox), 2)
233  
-        self.assertEqual(mail.outbox[0].subject, 'Subject')
234  
-        self.assertEqual(mail.outbox[1].subject, 'Subject 2')
235  
-
236  
-        # Make sure that multiple locmem connections share mail.outbox
237  
-        mail.outbox = []
238  
-        connection2 = locmem.EmailBackend()
239  
-        email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
240  
-        connection.send_messages([email])
241  
-        connection2.send_messages([email])
242  
-        self.assertEqual(len(mail.outbox), 2)
243  
-
244  
-    def test_file_backend(self):
245  
-        tmp_dir = tempfile.mkdtemp()
246  
-        connection = filebased.EmailBackend(file_path=tmp_dir)
247  
-        email1 = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
248  
-        connection.send_messages([email1])
249  
-        self.assertEqual(len(os.listdir(tmp_dir)), 1)
250  
-        message = email.message_from_file(open(os.path.join(tmp_dir, os.listdir(tmp_dir)[0])))
251  
-        self.assertEqual(message.get_content_type(), 'text/plain')
252  
-        self.assertEqual(message.get('subject'), 'Subject')
253  
-        self.assertEqual(message.get('from'), 'from@example.com')
254  
-        self.assertEqual(message.get('to'), 'to@example.com')
255  
-        connection2 = filebased.EmailBackend(file_path=tmp_dir)
256  
-        connection2.send_messages([email1])
257  
-        self.assertEqual(len(os.listdir(tmp_dir)), 2)
258  
-        connection.send_messages([email1])
259  
-        self.assertEqual(len(os.listdir(tmp_dir)), 2)
260  
-        email1.connection = filebased.EmailBackend(file_path=tmp_dir)
261  
-        connection_created = connection.open()
262  
-        email1.send()
263  
-        self.assertEqual(len(os.listdir(tmp_dir)), 3)
264  
-        email1.send()
265  
-        self.assertEqual(len(os.listdir(tmp_dir)), 3)
266  
-        connection.close()
267  
-        shutil.rmtree(tmp_dir)
268  
-
269 219
     def test_arbitrary_keyword(self):
270 220
         """
271 221
         Make sure that get_connection() accepts arbitrary keyword that might be
@@ -289,144 +239,392 @@ def test_backend_arg(self):
289 239
         self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.dummy.EmailBackend'), dummy.EmailBackend))
290 240
         self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.console.EmailBackend'), console.EmailBackend))
291 241
         tmp_dir = tempfile.mkdtemp()
292  
-        self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend))
293  
-        shutil.rmtree(tmp_dir)
  242
+        try:
  243
+            self.assertTrue(isinstance(mail.get_connection('django.core.mail.backends.filebased.EmailBackend', file_path=tmp_dir), filebased.EmailBackend))
  244
+        finally:
  245
+            shutil.rmtree(tmp_dir)
294 246
         self.assertTrue(isinstance(mail.get_connection(), locmem.EmailBackend))
295 247
 
  248
+    @with_django_settings(
  249
+        EMAIL_BACKEND='django.core.mail.backends.locmem.EmailBackend',
  250
+        ADMINS=[('nobody', 'nobody@example.com')],
  251
+        MANAGERS=[('nobody', 'nobody@example.com')])
296 252
     def test_connection_arg(self):
297 253
         """Test connection argument to send_mail(), et. al."""
298  
-        connection = mail.get_connection('django.core.mail.backends.locmem.EmailBackend')
299  
-
300 254
         mail.outbox = []
  255
+
  256
+        # Send using non-default connection
  257
+        connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
301 258
         send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection)
302  
-        self.assertEqual(len(mail.outbox), 1)
303  
-        message = mail.outbox[0]
304  
-        self.assertEqual(message.subject, 'Subject')
305  
-        self.assertEqual(message.from_email, 'from@example.com')
306  
-        self.assertEqual(message.to, ['to@example.com'])
  259
+        self.assertEqual(mail.outbox, [])
  260
+        self.assertEqual(len(connection.test_outbox), 1)
  261
+        self.assertEqual(connection.test_outbox[0].subject, 'Subject')
307 262
 
308  
-        mail.outbox = []
  263
+        connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
309 264
         send_mass_mail([
310 265
                 ('Subject1', 'Content1', 'from1@example.com', ['to1@example.com']),
311  
-                ('Subject2', 'Content2', 'from2@example.com', ['to2@example.com'])
  266
+                ('Subject2', 'Content2', 'from2@example.com', ['to2@example.com']),
312 267
             ], connection=connection)
313  
-        self.assertEqual(len(mail.outbox), 2)
314  
-        message = mail.outbox[0]
315  
-        self.assertEqual(message.subject, 'Subject1')
316  
-        self.assertEqual(message.from_email, 'from1@example.com')
317  
-        self.assertEqual(message.to, ['to1@example.com'])
318  
-        message = mail.outbox[1]
319  
-        self.assertEqual(message.subject, 'Subject2')
320  
-        self.assertEqual(message.from_email, 'from2@example.com')
321  
-        self.assertEqual(message.to, ['to2@example.com'])
322  
-
323  
-        old_admins = settings.ADMINS
324  
-        old_managers = settings.MANAGERS
325  
-        settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')]
  268
+        self.assertEqual(mail.outbox, [])
  269
+        self.assertEqual(len(connection.test_outbox), 2)
  270
+        self.assertEqual(connection.test_outbox[0].subject, 'Subject1')
  271
+        self.assertEqual(connection.test_outbox[1].subject, 'Subject2')
326 272
 
327  
-        mail.outbox = []
328  
-        mail_admins('Subject', 'Content', connection=connection)
329  
-        self.assertEqual(len(mail.outbox), 1)
330  
-        message = mail.outbox[0]
331  
-        self.assertEqual(message.subject, '[Django] Subject')
332  
-        self.assertEqual(message.from_email, 'root@localhost')
333  
-        self.assertEqual(message.to, ['nobody@example.com'])
  273
+        connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
  274
+        mail_admins('Admin message', 'Content', connection=connection)
  275
+        self.assertEqual(mail.outbox, [])
  276
+        self.assertEqual(len(connection.test_outbox), 1)
  277
+        self.assertEqual(connection.test_outbox[0].subject, '[Django] Admin message')
334 278
 
335  
-        mail.outbox = []
336  
-        mail_managers('Subject', 'Content', connection=connection)
337  
-        self.assertEqual(len(mail.outbox), 1)
338  
-        message = mail.outbox[0]
339  
-        self.assertEqual(message.subject, '[Django] Subject')
340  
-        self.assertEqual(message.from_email, 'root@localhost')
341  
-        self.assertEqual(message.to, ['nobody@example.com'])
342  
-
343  
-        settings.ADMINS = old_admins
344  
-        settings.MANAGERS = old_managers
345  
-
346  
-    def test_mail_prefix(self):
347  
-        """Test prefix argument in manager/admin mail."""
348  
-        # Regression for #13494.
349  
-        old_admins = settings.ADMINS
350  
-        old_managers = settings.MANAGERS
351  
-        settings.ADMINS = settings.MANAGERS = [('nobody','nobody@example.com')]
  279
+        connection = mail.get_connection('regressiontests.mail.custombackend.EmailBackend')
  280
+        mail_managers('Manager message', 'Content', connection=connection)
  281
+        self.assertEqual(mail.outbox, [])
  282
+        self.assertEqual(len(connection.test_outbox), 1)
  283
+        self.assertEqual(connection.test_outbox[0].subject, '[Django] Manager message')
  284
+
  285
+
  286
+class BaseEmailBackendTests(object):
  287
+    email_backend = None
  288
+
  289
+    def setUp(self):
  290
+        self.__settings_state = alter_django_settings(EMAIL_BACKEND=self.email_backend)
  291
+
  292
+    def tearDown(self):
  293
+        restore_django_settings(self.__settings_state)
352 294
 
  295
+    def assertStartsWith(self, first, second):
  296
+        if not first.startswith(second):
  297
+            self.longMessage = True
  298
+            self.assertEqual(first[:len(second)], second, "First string doesn't start with the second.")
  299
+
  300
+    def get_mailbox_content(self):
  301
+        raise NotImplementedError
  302
+
  303
+    def flush_mailbox(self):
  304
+        raise NotImplementedError
  305
+
  306
+    def get_the_message(self):
  307
+        mailbox = self.get_mailbox_content()
  308
+        self.assertEqual(len(mailbox), 1,
  309
+            "Expected exactly one message, got %d.\n%r" % (len(mailbox), [
  310
+                m.as_string() for m in mailbox]))
  311
+        return mailbox[0]
  312
+
  313
+    def test_send(self):
  314
+        email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'])
  315
+        num_sent = mail.get_connection().send_messages([email])
  316
+        self.assertEqual(num_sent, 1)
  317
+        message = self.get_the_message()
  318
+        self.assertEqual(message["subject"], "Subject")
  319
+        self.assertEqual(message.get_payload(), "Content")
  320
+        self.assertEqual(message["from"], "from@example.com")
  321
+        self.assertEqual(message.get_all("to"), ["to@example.com"])
  322
+
  323
+    def test_send_many(self):
  324
+        email1 = EmailMessage('Subject', 'Content1', 'from@example.com', ['to@example.com'])
  325
+        email2 = EmailMessage('Subject', 'Content2', 'from@example.com', ['to@example.com'])
  326
+        num_sent = mail.get_connection().send_messages([email1, email2])
  327
+        self.assertEqual(num_sent, 2)
  328
+        messages = self.get_mailbox_content()
  329
+        self.assertEquals(len(messages), 2)
  330
+        self.assertEqual(messages[0].get_payload(), "Content1")
  331
+        self.assertEqual(messages[1].get_payload(), "Content2")
  332
+
  333
+    def test_send_verbose_name(self):
  334
+        email = EmailMessage("Subject", "Content", '"Firstname Sürname" <from@example.com>',
  335
+                             ["to@example.com"])
  336
+        email.send()
  337
+        message = self.get_the_message()
  338
+        self.assertEqual(message["subject"], "Subject")
  339
+        self.assertEqual(message.get_payload(), "Content")
  340
+        self.assertEqual(message["from"], "=?utf-8?q?Firstname_S=C3=BCrname?= <from@example.com>")
  341
+
  342
+    @with_django_settings(MANAGERS=[('nobody', 'nobody@example.com')])
  343
+    def test_html_mail_managers(self):
  344
+        """Test html_message argument to mail_managers"""
  345
+        mail_managers('Subject', 'Content', html_message='HTML Content')
  346
+        message = self.get_the_message()
  347
+
  348
+        self.assertEqual(message.get('subject'), '[Django] Subject')
  349
+        self.assertEqual(message.get_all('to'), ['nobody@example.com'])
  350
+        self.assertTrue(message.is_multipart())
  351
+        self.assertEqual(len(message.get_payload()), 2)
  352
+        self.assertEqual(message.get_payload(0).get_payload(), 'Content')
  353
+        self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain')
  354
+        self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content')
  355
+        self.assertEqual(message.get_payload(1).get_content_type(), 'text/html')
  356
+
  357
+    @with_django_settings(ADMINS=[('nobody', 'nobody@example.com')])
  358
+    def test_html_mail_admins(self):
  359
+        """Test html_message argument to mail_admins """
  360
+        mail_admins('Subject', 'Content', html_message='HTML Content')
  361
+        message = self.get_the_message()
  362
+
  363
+        self.assertEqual(message.get('subject'), '[Django] Subject')
  364
+        self.assertEqual(message.get_all('to'), ['nobody@example.com'])
  365
+        self.assertTrue(message.is_multipart())
  366
+        self.assertEqual(len(message.get_payload()), 2)
  367
+        self.assertEqual(message.get_payload(0).get_payload(), 'Content')
  368
+        self.assertEqual(message.get_payload(0).get_content_type(), 'text/plain')
  369
+        self.assertEqual(message.get_payload(1).get_payload(), 'HTML Content')
  370
+        self.assertEqual(message.get_payload(1).get_content_type(), 'text/html')
  371
+
  372
+    @with_django_settings(ADMINS=[('nobody', 'nobody+admin@example.com')],
  373
+                         MANAGERS=[('nobody', 'nobody+manager@example.com')])
  374
+    def test_manager_and_admin_mail_prefix(self):
  375
+        """
  376
+        String prefix + lazy translated subject = bad output
  377
+        Regression for #13494
  378
+        """
353 379
         mail_managers(ugettext_lazy('Subject'), 'Content')
354  
-        self.assertEqual(len(mail.outbox), 1)
355  
-        message = mail.outbox[0]
356  
-        self.assertEqual(message.subject, '[Django] Subject')
  380
+        message = self.get_the_message()
  381
+        self.assertEqual(message.get('subject'), '[Django] Subject')
357 382
 
358  
-        mail.outbox = []
  383
+        self.flush_mailbox()
359 384
         mail_admins(ugettext_lazy('Subject'), 'Content')
360  
-        self.assertEqual(len(mail.outbox), 1)
361  
-        message = mail.outbox[0]
362  
-        self.assertEqual(message.subject, '[Django] Subject')
  385
+        message = self.get_the_message()
  386
+        self.assertEqual(message.get('subject'), '[Django] Subject')
363 387
 
364  
-        settings.ADMINS = old_admins
365  
-        settings.MANAGERS = old_managers
  388
+    @with_django_settings(ADMINS=(), MANAGERS=())
  389
+    def test_empty_admins(self):
  390
+        """
  391
+        Test that mail_admins/mail_managers doesn't connect to the mail server
  392
+        if there are no recipients (#9383)
  393
+        """
  394
+        mail_admins('hi', 'there')
  395
+        self.assertEqual(self.get_mailbox_content(), [])
  396
+        mail_managers('hi', 'there')
  397
+        self.assertEqual(self.get_mailbox_content(), [])
366 398
 
367  
-    def test_html_mail_admins(self):
368  
-        """Test html_message argument to mail_admins and mail_managers"""
369  
-        old_admins = settings.ADMINS
370  
-        settings.ADMINS = [('nobody','nobody@example.com')]
  399
+    def test_message_cc_header(self):
  400
+        """
  401
+        Regression test for #7722
  402
+        """
  403
+        email = EmailMessage('Subject', 'Content', 'from@example.com', ['to@example.com'], cc=['cc@example.com'])
  404
+        mail.get_connection().send_messages([email])
  405
+        message = self.get_the_message()
  406
+        self.assertStartsWith(message.as_string(), 'Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nCc: cc@example.com\nDate: ')
371 407
 
372  
-        mail.outbox = []
373  
-        mail_admins('Subject', 'Content', html_message='HTML Content')
374  
-        self.assertEqual(len(mail.outbox), 1)
375  
-        message = mail.outbox[0]
376  
-        self.assertEqual(message.subject, '[Django] Subject')
377  
-        self.assertEqual(message.body, 'Content')
378  
-        self.assertEqual(message.alternatives, [('HTML Content', 'text/html')])
  408
+    def test_idn_send(self):
  409
+        """
  410
+        Regression test for #14301
  411
+        """
  412
+        self.assertTrue(send_mail('Subject', 'Content', 'from@öäü.com', [u'to@öäü.com']))
  413
+        message = self.get_the_message()
  414
+        self.assertEqual(message.get('subject'), 'Subject')
  415
+        self.assertEqual(message.get('from'), 'from@xn--4ca9at.com')
  416
+        self.assertEqual(message.get('to'), 'to@xn--4ca9at.com')
  417
+
  418
+        self.flush_mailbox()
  419
+        m = EmailMessage('Subject', 'Content', 'from@öäü.com',
  420
+                     [u'to@öäü.com'], cc=[u'cc@öäü.com'])
  421
+        m.send()
  422
+        message = self.get_the_message()
  423
+        self.assertEqual(message.get('subject'), 'Subject')
  424
+        self.assertEqual(message.get('from'), 'from@xn--4ca9at.com')
  425
+        self.assertEqual(message.get('to'), 'to@xn--4ca9at.com')
  426
+        self.assertEqual(message.get('cc'), 'cc@xn--4ca9at.com')
  427
+
  428
+    def test_recipient_without_domain(self):
  429
+        """
  430
+        Regression test for #15042
  431
+        """
  432
+        self.assertTrue(send_mail("Subject", "Content", "tester", ["django"]))
  433
+        message = self.get_the_message()
  434
+        self.assertEqual(message.get('subject'), 'Subject')
  435
+        self.assertEqual(message.get('from'), "tester")
  436
+        self.assertEqual(message.get('to'), "django")
379 437
 
380  
-        settings.ADMINS = old_admins
381 438
 
382  
-    def test_html_mail_managers(self):
383  
-        """Test html_message argument to mail_admins and mail_managers"""
384  
-        old_managers = settings.MANAGERS
385  
-        settings.MANAGERS = [('nobody','nobody@example.com')]
  439
+class LocmemBackendTests(BaseEmailBackendTests, TestCase):
  440
+    email_backend = 'django.core.mail.backends.locmem.EmailBackend'
386 441
 
387  
-        mail.outbox = []
388  
-        mail_managers('Subject', 'Content', html_message='HTML Content')
389  
-        self.assertEqual(len(mail.outbox), 1)
390  
-        message = mail.outbox[0]
391  
-        self.assertEqual(message.subject, '[Django] Subject')
392  
-        self.assertEqual(message.body, 'Content')
393  
-        self.assertEqual(message.alternatives, [('HTML Content', 'text/html')])
  442
+    def get_mailbox_content(self):
  443
+        return [m.message() for m in mail.outbox]
394 444
 
395  
-        settings.MANAGERS = old_managers
  445
+    def flush_mailbox(self):
  446
+        mail.outbox = []
396 447
 
397  
-    def test_idn_validation(self):
398  
-        """Test internationalized email adresses"""
399  
-        # Regression for #14301.
  448
+    def tearDown(self):
  449
+        super(LocmemBackendTests, self).tearDown()
400 450
         mail.outbox = []
401  
-        from_email = u'fröm@öäü.com'
402  
-        to_email = u'tö@öäü.com'
403  
-        connection = mail.get_connection('django.core.mail.backends.locmem.EmailBackend')
404  
-        send_mail('Subject', 'Content', from_email, [to_email], connection=connection)
405  
-        self.assertEqual(len(mail.outbox), 1)
406  
-        message = mail.outbox[0]
407  
-        self.assertEqual(message.subject, 'Subject')
408  
-        self.assertEqual(message.from_email, from_email)
409  
-        self.assertEqual(message.to, [to_email])
410  
-        self.assertTrue(message.message().as_string().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: =?utf-8?b?ZnLDtm1Aw7bDpMO8LmNvbQ==?=\nTo: =?utf-8?b?dMO2QMO2w6TDvC5jb20=?='))
411  
-
412  
-    def test_idn_smtp_send(self):
413  
-        import smtplib
414  
-        smtplib.SMTP = MockSMTP
415  
-        from_email = u'fröm@öäü.com'
416  
-        to_email = u'tö@öäü.com'
417  
-        connection = mail.get_connection('django.core.mail.backends.smtp.EmailBackend')
418  
-        self.assertTrue(send_mail('Subject', 'Content', from_email, [to_email], connection=connection))
419  
-
420  
-class MockSMTP(object):
421  
-    def __init__(self, host='', port=0, local_hostname=None,
422  
-                 timeout=1):
423  
-        pass
424  
-
425  
-    def sendmail(self, from_addr, to_addrs, msg, mail_options=[],
426  
-                 rcpt_options=[]):
427  
-        for addr in to_addrs:
428  
-            str(addr.split('@', 1)[-1])
429  
-        return {}
430  
-
431  
-    def quit(self):
432  
-        return 0
  451
+
  452
+    def test_locmem_shared_messages(self):
  453
+        """
  454
+        Make sure that the locmen backend populates the outbox.
  455
+        """
  456
+        connection = locmem.EmailBackend()
  457
+        connection2 = locmem.EmailBackend()
  458
+        email = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
  459
+        connection.send_messages([email])
  460
+        connection2.send_messages([email])
  461
+        self.assertEqual(len(mail.outbox), 2)
  462
+
  463
+
  464
+class FileBackendTests(BaseEmailBackendTests, TestCase):
  465
+    email_backend = 'django.core.mail.backends.filebased.EmailBackend'
  466
+
  467
+    def setUp(self):
  468
+        super(FileBackendTests, self).setUp()
  469
+        self.tmp_dir = tempfile.mkdtemp()
  470
+        self.__settings_state = alter_django_settings(EMAIL_FILE_PATH=self.tmp_dir)
  471
+
  472
+    def tearDown(self):
  473
+        restore_django_settings(self.__settings_state)
  474
+        shutil.rmtree(self.tmp_dir)
  475
+        super(FileBackendTests, self).tearDown()
  476
+
  477
+    def flush_mailbox(self):
  478
+        for filename in os.listdir(self.tmp_dir):
  479
+            os.unlink(os.path.join(self.tmp_dir, filename))
  480
+
  481
+    def get_mailbox_content(self):
  482
+        messages = []
  483
+        for filename in os.listdir(self.tmp_dir):
  484
+            session = open(os.path.join(self.tmp_dir, filename)).read().split('\n' + ('-' * 79) + '\n')
  485
+            messages.extend(email.message_from_string(m) for m in session if m)
  486
+        return messages
  487
+
  488
+    def test_file_sessions(self):
  489
+        """Make sure opening a connection creates a new file"""
  490
+        msg = EmailMessage('Subject', 'Content', 'bounce@example.com', ['to@example.com'], headers={'From': 'from@example.com'})
  491
+        connection = mail.get_connection()
  492
+        connection.send_messages([msg])
  493
+
  494
+        self.assertEqual(len(os.listdir(self.tmp_dir)), 1)
  495
+        message = email.message_from_file(open(os.path.join(self.tmp_dir, os.listdir(self.tmp_dir)[0])))
  496
+        self.assertEqual(message.get_content_type(), 'text/plain')
  497
+        self.assertEqual(message.get('subject'), 'Subject')
  498
+        self.assertEqual(message.get('from'), 'from@example.com')
  499
+        self.assertEqual(message.get('to'), 'to@example.com')
  500
+
  501
+        connection2 = mail.get_connection()
  502
+        connection2.send_messages([msg])
  503
+        self.assertEqual(len(os.listdir(self.tmp_dir)), 2)
  504
+
  505
+        connection.send_messages([msg])
  506
+        self.assertEqual(len(os.listdir(self.tmp_dir)), 2)
  507
+
  508
+        msg.connection = mail.get_connection()
  509
+        self.assertTrue(connection.open())
  510
+        msg.send()
  511
+        self.assertEqual(len(os.listdir(self.tmp_dir)), 3)
  512
+        msg.send()
  513
+        self.assertEqual(len(os.listdir(self.tmp_dir)), 3)
  514
+
  515
+
  516
+class ConsoleBackendTests(BaseEmailBackendTests, TestCase):
  517
+    email_backend = 'django.core.mail.backends.console.EmailBackend'
  518
+
  519
+    def setUp(self):
  520
+        super(ConsoleBackendTests, self).setUp()
  521
+        self.__stdout = sys.stdout
  522
+        self.stream = sys.stdout = StringIO()
  523
+
  524
+    def tearDown(self):
  525
+        del self.stream
  526
+        sys.stdout = self.__stdout
  527
+        del self.__stdout
  528
+        super(ConsoleBackendTests, self).tearDown()
  529
+
  530
+    def flush_mailbox(self):
  531
+        self.stream = sys.stdout = StringIO()
  532
+
  533
+    def get_mailbox_content(self):
  534
+        messages = self.stream.getvalue().split('\n' + ('-' * 79) + '\n')
  535
+        return [email.message_from_string(m) for m in messages if m]
  536
+
  537
+    def test_console_stream_kwarg(self):
  538
+        """
  539
+        Test that the console backend can be pointed at an arbitrary stream.
  540
+        """
  541
+        s = StringIO()
  542
+        connection = mail.get_connection('django.core.mail.backends.console.EmailBackend', stream=s)
  543
+        send_mail('Subject', 'Content', 'from@example.com', ['to@example.com'], connection=connection)
  544
+        self.assertTrue(s.getvalue().startswith('Content-Type: text/plain; charset="utf-8"\nMIME-Version: 1.0\nContent-Transfer-Encoding: quoted-printable\nSubject: Subject\nFrom: from@example.com\nTo: to@example.com\nDate: '))
  545
+
  546
+
  547
+class FakeSMTPServer(smtpd.SMTPServer, threading.Thread):
  548
+    """
  549
+    Asyncore SMTP server wrapped into a thread. Based on DummyFTPServer from:
  550
+    http://svn.python.org/view/python/branches/py3k/Lib/test/test_ftplib.py?revision=86061&view=markup
  551
+    """
  552
+
  553
+    def __init__(self, *args, **kwargs):
  554
+        threading.Thread.__init__(self)
  555
+        smtpd.SMTPServer.__init__(self, *args, **kwargs)
  556
+        self._sink = []
  557
+        self.active = False
  558
+        self.active_lock = threading.Lock()
  559
+        self.sink_lock = threading.Lock()
  560
+
  561
+    def process_message(self, peer, mailfrom, rcpttos, data):
  562
+        m = email.message_from_string(data)
  563
+        maddr = email.Utils.parseaddr(m.get('from'))[1]
  564
+        if mailfrom != maddr:
  565
+            return "553 '%s' != '%s'" % (mailfrom, maddr)
  566
+        self.sink_lock.acquire()
  567
+        self._sink.append(m)
  568
+        self.sink_lock.release()
  569
+
  570
+    def get_sink(self):
  571
+        self.sink_lock.acquire()
  572
+        try:
  573
+            return self._sink[:]
  574
+        finally:
  575
+            self.sink_lock.release()
  576
+
  577
+    def flush_sink(self):
  578
+        self.sink_lock.acquire()
  579
+        self._sink[:] = []
  580
+        self.sink_lock.release()
  581
+
  582
+    def start(self):
  583
+        assert not self.active
  584
+        self.__flag = threading.Event()
  585
+        threading.Thread.start(self)
  586
+        self.__flag.wait()
  587
+
  588
+    def run(self):
  589
+        self.active = True
  590
+        self.__flag.set()
  591
+        while self.active and asyncore.socket_map:
  592
+            self.active_lock.acquire()
  593
+            asyncore.loop(timeout=0.1, count=1)
  594
+            self.active_lock.release()
  595
+        asyncore.close_all()
  596
+
  597
+    def stop(self):
  598
+        assert self.active
  599
+        self.active = False
  600
+        self.join()
  601
+
  602
+
  603
+class SMTPBackendTests(BaseEmailBackendTests, TestCase):
  604
+    email_backend = 'django.core.mail.backends.smtp.EmailBackend'
  605
+
  606
+    @classmethod
  607
+    def setUpClass(cls):
  608
+        cls.server = FakeSMTPServer(('127.0.0.1', 0), None)
  609
+        cls.settings = alter_django_settings(
  610
+            EMAIL_HOST="127.0.0.1",
  611
+            EMAIL_PORT=cls.server.socket.getsockname()[1])
  612
+        cls.server.start()
  613
+
  614
+    @classmethod
  615
+    def tearDownClass(cls):
  616
+        cls.server.stop()
  617
+
  618
+    def setUp(self):
  619
+        super(SMTPBackendTests, self).setUp()
  620
+        self.server.flush_sink()
  621
+
  622
+    def tearDown(self):
  623
+        self.server.flush_sink()
  624
+        super(SMTPBackendTests, self).tearDown()
  625
+
  626
+    def flush_mailbox(self):
  627
+        self.server.flush_sink()
  628
+
  629
+    def get_mailbox_content(self):
  630
+        return self.server.get_sink()

0 notes on commit 1199721

Please sign in to comment.
Something went wrong with that request. Please try again.