/
test_signals.py
311 lines (250 loc) · 12.2 KB
/
test_signals.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
# -*- coding: utf-8 -*-
import sys
sys.path[0:0] = [""]
import unittest
from mongoengine import *
from mongoengine import signals
signal_output = []
class SignalTests(unittest.TestCase):
"""
Testing signals before/after saving and deleting.
"""
def get_signal_output(self, fn, *args, **kwargs):
# Flush any existing signal output
global signal_output
signal_output = []
fn(*args, **kwargs)
return signal_output
def setUp(self):
connect(db='mongoenginetest')
class Author(Document):
name = StringField()
def __unicode__(self):
return self.name
@classmethod
def pre_init(cls, sender, document, *args, **kwargs):
signal_output.append('pre_init signal, %s' % cls.__name__)
signal_output.append(str(kwargs['values']))
@classmethod
def post_init(cls, sender, document, **kwargs):
signal_output.append('post_init signal, %s, document._created = %s' % (document, document._created))
@classmethod
def pre_save(cls, sender, document, **kwargs):
signal_output.append('pre_save signal, %s' % document)
@classmethod
def pre_save_post_validation(cls, sender, document, **kwargs):
signal_output.append('pre_save_post_validation signal, %s' % document)
if 'created' in kwargs:
if kwargs['created']:
signal_output.append('Is created')
else:
signal_output.append('Is updated')
@classmethod
def post_save(cls, sender, document, **kwargs):
dirty_keys = document._delta()[0].keys() + document._delta()[1].keys()
signal_output.append('post_save signal, %s' % document)
signal_output.append('post_save dirty keys, %s' % dirty_keys)
if 'created' in kwargs:
if kwargs['created']:
signal_output.append('Is created')
else:
signal_output.append('Is updated')
@classmethod
def pre_delete(cls, sender, document, **kwargs):
signal_output.append('pre_delete signal, %s' % document)
@classmethod
def post_delete(cls, sender, document, **kwargs):
signal_output.append('post_delete signal, %s' % document)
@classmethod
def pre_bulk_insert(cls, sender, documents, **kwargs):
signal_output.append('pre_bulk_insert signal, %s' % documents)
@classmethod
def post_bulk_insert(cls, sender, documents, **kwargs):
signal_output.append('post_bulk_insert signal, %s' % documents)
if kwargs.get('loaded', False):
signal_output.append('Is loaded')
else:
signal_output.append('Not loaded')
self.Author = Author
Author.drop_collection()
class Another(Document):
name = StringField()
def __unicode__(self):
return self.name
@classmethod
def pre_delete(cls, sender, document, **kwargs):
signal_output.append('pre_delete signal, %s' % document)
@classmethod
def post_delete(cls, sender, document, **kwargs):
signal_output.append('post_delete signal, %s' % document)
self.Another = Another
Another.drop_collection()
class ExplicitId(Document):
id = IntField(primary_key=True)
@classmethod
def post_save(cls, sender, document, **kwargs):
if 'created' in kwargs:
if kwargs['created']:
signal_output.append('Is created')
else:
signal_output.append('Is updated')
self.ExplicitId = ExplicitId
ExplicitId.drop_collection()
# Save up the number of connected signals so that we can check at the
# end that all the signals we register get properly unregistered
self.pre_signals = (
len(signals.pre_init.receivers),
len(signals.post_init.receivers),
len(signals.pre_save.receivers),
len(signals.pre_save_post_validation.receivers),
len(signals.post_save.receivers),
len(signals.pre_delete.receivers),
len(signals.post_delete.receivers),
len(signals.pre_bulk_insert.receivers),
len(signals.post_bulk_insert.receivers),
)
signals.pre_init.connect(Author.pre_init, sender=Author)
signals.post_init.connect(Author.post_init, sender=Author)
signals.pre_save.connect(Author.pre_save, sender=Author)
signals.pre_save_post_validation.connect(Author.pre_save_post_validation, sender=Author)
signals.post_save.connect(Author.post_save, sender=Author)
signals.pre_delete.connect(Author.pre_delete, sender=Author)
signals.post_delete.connect(Author.post_delete, sender=Author)
signals.pre_bulk_insert.connect(Author.pre_bulk_insert, sender=Author)
signals.post_bulk_insert.connect(Author.post_bulk_insert, sender=Author)
signals.pre_delete.connect(Another.pre_delete, sender=Another)
signals.post_delete.connect(Another.post_delete, sender=Another)
signals.post_save.connect(ExplicitId.post_save, sender=ExplicitId)
def tearDown(self):
signals.pre_init.disconnect(self.Author.pre_init)
signals.post_init.disconnect(self.Author.post_init)
signals.post_delete.disconnect(self.Author.post_delete)
signals.pre_delete.disconnect(self.Author.pre_delete)
signals.post_save.disconnect(self.Author.post_save)
signals.pre_save_post_validation.disconnect(self.Author.pre_save_post_validation)
signals.pre_save.disconnect(self.Author.pre_save)
signals.pre_bulk_insert.disconnect(self.Author.pre_bulk_insert)
signals.post_bulk_insert.disconnect(self.Author.post_bulk_insert)
signals.post_delete.disconnect(self.Another.post_delete)
signals.pre_delete.disconnect(self.Another.pre_delete)
signals.post_save.disconnect(self.ExplicitId.post_save)
# Check that all our signals got disconnected properly.
post_signals = (
len(signals.pre_init.receivers),
len(signals.post_init.receivers),
len(signals.pre_save.receivers),
len(signals.pre_save_post_validation.receivers),
len(signals.post_save.receivers),
len(signals.pre_delete.receivers),
len(signals.post_delete.receivers),
len(signals.pre_bulk_insert.receivers),
len(signals.post_bulk_insert.receivers),
)
self.ExplicitId.objects.delete()
self.assertEqual(self.pre_signals, post_signals)
def test_model_signals(self):
""" Model saves should throw some signals. """
def create_author():
self.Author(name='Bill Shakespeare')
def bulk_create_author_with_load():
a1 = self.Author(name='Bill Shakespeare')
self.Author.objects.insert([a1], load_bulk=True)
def bulk_create_author_without_load():
a1 = self.Author(name='Bill Shakespeare')
self.Author.objects.insert([a1], load_bulk=False)
def load_existing_author():
a = self.Author(name='Bill Shakespeare')
a.save()
self.get_signal_output(lambda: None) # eliminate signal output
a1 = self.Author.objects(name='Bill Shakespeare')[0]
self.assertEqual(self.get_signal_output(create_author), [
"pre_init signal, Author",
"{'name': 'Bill Shakespeare'}",
"post_init signal, Bill Shakespeare, document._created = True",
])
a1 = self.Author(name='Bill Shakespeare')
self.assertEqual(self.get_signal_output(a1.save), [
"pre_save signal, Bill Shakespeare",
"pre_save_post_validation signal, Bill Shakespeare",
"Is created",
"post_save signal, Bill Shakespeare",
"post_save dirty keys, ['name']",
"Is created"
])
a1.reload()
a1.name = 'William Shakespeare'
self.assertEqual(self.get_signal_output(a1.save), [
"pre_save signal, William Shakespeare",
"pre_save_post_validation signal, William Shakespeare",
"Is updated",
"post_save signal, William Shakespeare",
"post_save dirty keys, ['name']",
"Is updated"
])
self.assertEqual(self.get_signal_output(a1.delete), [
'pre_delete signal, William Shakespeare',
'post_delete signal, William Shakespeare',
])
signal_output = self.get_signal_output(load_existing_author)
# test signal_output lines separately, because of random ObjectID after object load
self.assertEqual(signal_output[0],
"pre_init signal, Author",
)
self.assertEqual(signal_output[2],
"post_init signal, Bill Shakespeare, document._created = False",
)
signal_output = self.get_signal_output(bulk_create_author_with_load)
# The output of this signal is not entirely deterministic. The reloaded
# object will have an object ID. Hence, we only check part of the output
self.assertEqual(signal_output[3], "pre_bulk_insert signal, [<Author: Bill Shakespeare>]"
)
self.assertEqual(signal_output[-2:],
["post_bulk_insert signal, [<Author: Bill Shakespeare>]",
"Is loaded",])
self.assertEqual(self.get_signal_output(bulk_create_author_without_load), [
"pre_init signal, Author",
"{'name': 'Bill Shakespeare'}",
"post_init signal, Bill Shakespeare, document._created = True",
"pre_bulk_insert signal, [<Author: Bill Shakespeare>]",
"post_bulk_insert signal, [<Author: Bill Shakespeare>]",
"Not loaded",
])
def test_queryset_delete_signals(self):
""" Queryset delete should throw some signals. """
self.Another(name='Bill Shakespeare').save()
self.assertEqual(self.get_signal_output(self.Another.objects.delete), [
'pre_delete signal, Bill Shakespeare',
'post_delete signal, Bill Shakespeare',
])
def test_signals_with_explicit_doc_ids(self):
""" Model saves must have a created flag the first time."""
ei = self.ExplicitId(id=123)
# post save must received the created flag, even if there's already
# an object id present
self.assertEqual(self.get_signal_output(ei.save), ['Is created'])
# second time, it must be an update
self.assertEqual(self.get_signal_output(ei.save), ['Is updated'])
def test_signals_with_switch_collection(self):
ei = self.ExplicitId(id=123)
ei.switch_collection("explicit__1")
self.assertEqual(self.get_signal_output(ei.save), ['Is created'])
ei.switch_collection("explicit__1")
self.assertEqual(self.get_signal_output(ei.save), ['Is updated'])
ei.switch_collection("explicit__1", keep_created=False)
self.assertEqual(self.get_signal_output(ei.save), ['Is created'])
ei.switch_collection("explicit__1", keep_created=False)
self.assertEqual(self.get_signal_output(ei.save), ['Is created'])
def test_signals_with_switch_db(self):
connect('mongoenginetest')
register_connection('testdb-1', 'mongoenginetest2')
ei = self.ExplicitId(id=123)
ei.switch_db("testdb-1")
self.assertEqual(self.get_signal_output(ei.save), ['Is created'])
ei.switch_db("testdb-1")
self.assertEqual(self.get_signal_output(ei.save), ['Is updated'])
ei.switch_db("testdb-1", keep_created=False)
self.assertEqual(self.get_signal_output(ei.save), ['Is created'])
ei.switch_db("testdb-1", keep_created=False)
self.assertEqual(self.get_signal_output(ei.save), ['Is created'])
if __name__ == '__main__':
unittest.main()