-
Notifications
You must be signed in to change notification settings - Fork 2k
/
test_mailer.py
118 lines (103 loc) · 4.74 KB
/
test_mailer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import time
from nose.tools import assert_equal, assert_raises
from pylons import config
from email.mime.text import MIMEText
import hashlib
from ckan import model
from ckan.tests.pylons_controller import PylonsTestCase
from ckan.tests.mock_mail_server import SmtpServerHarness
from ckan.lib.mailer import mail_recipient, mail_user, send_reset_link, add_msg_niceties, MailerException, get_reset_link_body, get_reset_link
from ckan.lib.create_test_data import CreateTestData
from ckan.lib.base import g
class TestMailer(SmtpServerHarness, PylonsTestCase):
@classmethod
def setup_class(cls):
smtp_server = config.get('test_smtp_server')
if smtp_server:
host, port = smtp_server.split(':')
port = int(port) + int(str(hashlib.md5(cls.__name__).hexdigest())[0], 16)
config['test_smtp_server'] = '%s:%s' % (host, port)
CreateTestData.create_user(name='bob', email='bob@bob.net')
CreateTestData.create_user(name='mary') #NB No email addr provided
SmtpServerHarness.setup_class()
PylonsTestCase.setup_class()
@classmethod
def teardown_class(cls):
SmtpServerHarness.teardown_class()
model.repo.rebuild_db()
def setup(self):
self.clear_smtp_messages()
def mime_encode(self, msg, recipient_name):
sender_name = g.site_title
sender_url = g.site_url
body = add_msg_niceties(recipient_name, msg, sender_name, sender_url)
encoded_body = MIMEText(body.encode('utf-8'), 'plain', 'utf-8').get_payload().strip()
return encoded_body
def test_mail_recipient(self):
msgs = self.get_smtp_messages()
assert_equal(msgs, [])
# send email
test_email = {'recipient_name': 'Bob',
'recipient_email':'bob@bob.net',
'subject': 'Meeting',
'body': 'The meeting is cancelled.',
'headers': {'header1': 'value1'}}
mail_recipient(**test_email)
time.sleep(0.1)
# check it went to the mock smtp server
msgs = self.get_smtp_messages()
assert_equal(len(msgs), 1)
msg = msgs[0]
assert_equal(msg[1], config['ckan.mail_from'])
assert_equal(msg[2], [test_email['recipient_email']])
assert test_email['headers'].keys()[0] in msg[3], msg[3]
assert test_email['headers'].values()[0] in msg[3], msg[3]
assert test_email['subject'] in msg[3], msg[3]
expected_body = self.mime_encode(test_email['body'],
test_email['recipient_name'])
assert expected_body in msg[3], '%r not in %r' % (expected_body, msg[3])
def test_mail_user(self):
msgs = self.get_smtp_messages()
assert_equal(msgs, [])
# send email
test_email = {'recipient': model.User.by_name(u'bob'),
'subject': 'Meeting',
'body': 'The meeting is cancelled.',
'headers': {'header1': 'value1'}}
mail_user(**test_email)
time.sleep(0.1)
# check it went to the mock smtp server
msgs = self.get_smtp_messages()
assert_equal(len(msgs), 1)
msg = msgs[0]
assert_equal(msg[1], config['ckan.mail_from'])
assert_equal(msg[2], [model.User.by_name(u'bob').email])
assert test_email['headers'].keys()[0] in msg[3], msg[3]
assert test_email['headers'].values()[0] in msg[3], msg[3]
assert test_email['subject'] in msg[3], msg[3]
expected_body = self.mime_encode(test_email['body'],
'bob')
assert expected_body in msg[3], '%r not in %r' % (expected_body, msg[3])
def test_mail_user_without_email(self):
# send email
test_email = {'recipient': model.User.by_name(u'mary'),
'subject': 'Meeting',
'body': 'The meeting is cancelled.',
'headers': {'header1': 'value1'}}
assert_raises(MailerException, mail_user, **test_email)
def test_send_reset_email(self):
# send email
send_reset_link(model.User.by_name(u'bob'))
time.sleep(0.1)
# check it went to the mock smtp server
msgs = self.get_smtp_messages()
assert_equal(len(msgs), 1)
msg = msgs[0]
assert_equal(msg[1], config['ckan.mail_from'])
assert_equal(msg[2], [model.User.by_name(u'bob').email])
assert 'Reset' in msg[3], msg[3]
test_msg = get_reset_link_body(model.User.by_name(u'bob'))
expected_body = self.mime_encode(test_msg,
u'bob')
assert expected_body in msg[3], '%r not in %r' % (expected_body, msg[3])
# reset link tested in user functional test