Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Conflicts: tests.py
- Loading branch information
Showing
6 changed files
with
257 additions
and
9 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Original file line | Diff line number | Diff line change |
---|---|---|---|
@@ -0,0 +1,71 @@ | |||
from peewee import Model as _Model | |||
|
|||
|
|||
class Signal(object): | |||
def __init__(self): | |||
self._flush() | |||
|
|||
def connect(self, receiver, name=None, sender=None): | |||
name = name or receiver.__name__ | |||
if name not in self._receivers: | |||
self._receivers[name] = (receiver, sender) | |||
self._receiver_list.append(name) | |||
else: | |||
raise ValueError('receiver named %s already connected' % name) | |||
|
|||
def disconnect(self, receiver=None, name=None): | |||
if receiver: | |||
name = receiver.__name__ | |||
if name: | |||
del self._receivers[name] | |||
self._receiver_list.remove(name) | |||
else: | |||
raise ValueError('a receiver or a name must be provided') | |||
|
|||
def send(self, instance, *args, **kwargs): | |||
sender = type(instance) | |||
responses = [] | |||
for name in self._receiver_list: | |||
r, s = self._receivers[name] | |||
if s is None or sender is s: | |||
responses.append((r, r(sender, instance, *args, **kwargs))) | |||
return responses | |||
|
|||
def _flush(self): | |||
self._receivers = {} | |||
self._receiver_list = [] | |||
|
|||
|
|||
pre_save = Signal() | |||
post_save = Signal() | |||
pre_delete = Signal() | |||
post_delete = Signal() | |||
pre_init = Signal() | |||
post_init = Signal() | |||
|
|||
|
|||
class Model(_Model): | |||
def __init__(self, *args, **kwargs): | |||
super(Model, self).__init__(*args, **kwargs) | |||
pre_init.send(self) | |||
|
|||
def prepared(self): | |||
super(Model, self).prepared() | |||
post_init.send(self) | |||
|
|||
def save(self, *args, **kwargs): | |||
created = not bool(self.get_pk()) | |||
pre_save.send(self, created=created) | |||
super(Model, self).save(*args, **kwargs) | |||
post_save.send(self, created=created) | |||
|
|||
def delete_instance(self, *args, **kwargs): | |||
pre_delete.send(self) | |||
super(Model, self).delete_instance(*args, **kwargs) | |||
post_delete.send(self) | |||
|
|||
def connect(signal, name=None, sender=None): | |||
def decorator(fn): | |||
signal.connect(fn, name, sender) | |||
return fn | |||
return decorator |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Original file line | Diff line number | Diff line change |
---|---|---|---|
@@ -0,0 +1,139 @@ | |||
import unittest | |||
|
|||
from peewee import * | |||
import signals | |||
|
|||
|
|||
db = SqliteDatabase(':memory:') | |||
|
|||
class BaseSignalModel(signals.Model): | |||
class Meta: | |||
database = db | |||
|
|||
class ModelA(BaseSignalModel): | |||
a = CharField() | |||
|
|||
class ModelB(BaseSignalModel): | |||
b = CharField() | |||
|
|||
|
|||
class SignalsTestCase(unittest.TestCase): | |||
def setUp(self): | |||
ModelA.create_table(True) | |||
ModelB.create_table(True) | |||
|
|||
def tearDown(self): | |||
ModelA.drop_table() | |||
ModelB.drop_table() | |||
signals.pre_save._flush() | |||
signals.post_save._flush() | |||
signals.pre_delete._flush() | |||
signals.post_delete._flush() | |||
signals.pre_init._flush() | |||
signals.post_init._flush() | |||
|
|||
def test_pre_save(self): | |||
state = [] | |||
|
|||
@signals.connect(signals.pre_save) | |||
def pre_save(sender, instance, created): | |||
state.append((sender, instance, instance.get_pk(), created)) | |||
m = ModelA() | |||
m.save() | |||
self.assertEqual(state, [(ModelA, m, None, True)]) | |||
|
|||
m.save() | |||
self.assertTrue(m.id is not None) | |||
self.assertEqual(state[-1], (ModelA, m, m.id, False)) | |||
|
|||
def test_post_save(self): | |||
state = [] | |||
|
|||
@signals.connect(signals.post_save) | |||
def post_save(sender, instance, created): | |||
state.append((sender, instance, instance.get_pk(), created)) | |||
m = ModelA() | |||
m.save() | |||
|
|||
self.assertTrue(m.id is not None) | |||
self.assertEqual(state, [(ModelA, m, m.id, True)]) | |||
|
|||
m.save() | |||
self.assertEqual(state[-1], (ModelA, m, m.id, False)) | |||
|
|||
def test_pre_delete(self): | |||
state = [] | |||
|
|||
m = ModelA() | |||
m.save() | |||
|
|||
@signals.connect(signals.pre_delete) | |||
def pre_delete(sender, instance): | |||
state.append((sender, instance, ModelA.select().count())) | |||
m.delete_instance() | |||
self.assertEqual(state, [(ModelA, m, 1)]) | |||
|
|||
def test_post_delete(self): | |||
state = [] | |||
|
|||
m = ModelA() | |||
m.save() | |||
|
|||
@signals.connect(signals.post_delete) | |||
def post_delete(sender, instance): | |||
state.append((sender, instance, ModelA.select().count())) | |||
m.delete_instance() | |||
self.assertEqual(state, [(ModelA, m, 0)]) | |||
|
|||
def test_pre_init(self): | |||
state = [] | |||
|
|||
m = ModelA(a='a') | |||
m.save() | |||
|
|||
@signals.connect(signals.pre_init) | |||
def pre_init(sender, instance): | |||
state.append((sender, instance.a)) | |||
|
|||
ModelA.get() | |||
self.assertEqual(state, [(ModelA, None)]) | |||
|
|||
def test_post_init(self): | |||
state = [] | |||
|
|||
m = ModelA(a='a') | |||
m.save() | |||
|
|||
@signals.connect(signals.post_init) | |||
def post_init(sender, instance): | |||
state.append((sender, instance.a)) | |||
|
|||
ModelA.get() | |||
self.assertEqual(state, [(ModelA, 'a')]) | |||
|
|||
def test_sender(self): | |||
state = [] | |||
|
|||
@signals.connect(signals.post_save, sender=ModelA) | |||
def post_save(sender, instance, created): | |||
state.append(instance) | |||
|
|||
m = ModelA.create() | |||
self.assertEqual(state, [m]) | |||
|
|||
m2 = ModelB.create() | |||
self.assertEqual(state, [m]) | |||
|
|||
def test_connect_disconnect(self): | |||
state = [] | |||
|
|||
@signals.connect(signals.post_save, sender=ModelA) | |||
def post_save(sender, instance, created): | |||
state.append(instance) | |||
|
|||
m = ModelA.create() | |||
self.assertEqual(state, [m]) | |||
|
|||
signals.post_save.disconnect(post_save) | |||
m2 = ModelA.create() | |||
self.assertEqual(state, [m]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Original file line | Diff line number | Diff line change |
---|---|---|---|
@@ -1,16 +1,50 @@ | |||
#!/usr/bin/env python | #!/usr/bin/env python | ||
import optparse | |||
import os | import os | ||
import sys | |||
import unittest | import unittest | ||
|
|
||
|
|
||
def collect(): | def runtests(module, verbosity): | ||
start_dir = os.path.abspath(os.path.dirname(__file__)) | suite = unittest.TestLoader().loadTestsFromModule(module) | ||
return unittest.defaultTestLoader.discover(start_dir) | results = unittest.TextTestRunner(verbosity=verbosity).run(suite) | ||
return results.failures, results.errors | |||
|
|
||
def get_option_parser(): | |||
parser = optparse.OptionParser() | |||
parser.add_option('-e', '--engine', dest='engine', default='sqlite', help='Database engine to test, one of [sqlite3, postgres, mysql]') | |||
parser.add_option('-v', '--verbosity', dest='verbosity', default=1, type='int', help='Verbosity of output') | |||
parser.add_option('-a', '--all', dest='all', default=False, action='store_true', help='Run all tests, including extras') | |||
parser.add_option('-x', '--extra', dest='extra', default=False, action='store_true', help='Run only extras tests') | |||
return parser | |||
|
|
||
if __name__ == '__main__': | if __name__ == '__main__': | ||
backend = os.environ.get('PEEWEE_TEST_BACKEND') or 'sqlite' | parser = get_option_parser() | ||
print 'RUNNING PEEWEE TESTS WITH [%s]' % backend | options, args = parser.parse_args() | ||
print '==============================================' | |||
unittest.main(module='tests') | |||
|
|
||
os.environ['PEEWEE_TEST_BACKEND'] = options.engine | |||
os.environ['PEEWEE_TEST_VERBOSITY'] = str(options.verbosity) | |||
|
|||
import tests | |||
from extras import tests as extras_tests | |||
|
|||
if options.all: | |||
modules = [tests, extras_tests] | |||
elif options.extra: | |||
modules = [extras_tests] | |||
else: | |||
modules = [tests] | |||
|
|||
results = [] | |||
any_failures = False | |||
any_errors = False | |||
for module in modules: | |||
failures, errors = runtests(module, options.verbosity) | |||
any_failures = any_failures or bool(failures) | |||
any_errors = any_errors or bool(errors) | |||
|
|||
if any_errors: | |||
sys.exit(2) | |||
elif any_failures: | |||
sys.exit(1) | |||
sys.exit(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters