Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Adding a concurrency test case

  • Loading branch information...
commit e04ca26cf4020c5b61202c4ebb78189825dd86e5 1 parent 3597298
@coleifer authored
Showing with 44 additions and 2 deletions.
  1. +44 −2 tests.py
View
46 tests.py
@@ -4,6 +4,7 @@
import datetime
import logging
import os
+import Queue
import unittest
from decimal import Decimal
@@ -1291,8 +1292,49 @@ def do_will_succeed2():
class ConcurrencyTestCase(ModelTestCase):
- requires = []
- # TODO
+ requires = [User]
+
+ def setUp(self):
+ self._orig_db = test_db
+ User._meta.database = database_class(database_name, threadlocals=True)
+ super(ConcurrencyTestCase, self).setUp()
+
+ def tearDown(self):
+ User._meta.database = self._orig_db
+ super(ConcurrencyTestCase, self).tearDown()
+
+ def test_multiple_writers(self):
+ def create_user_thread(low, hi):
+ for i in range(low, hi):
+ User.create(username='u%d' % i)
+ User._meta.database.close()
+
+ threads = []
+
+ for i in range(5):
+ threads.append(threading.Thread(target=create_user_thread, args=(i*10, i * 10 + 10)))
+
+ [t.start() for t in threads]
+ [t.join() for t in threads]
+
+ self.assertEqual(User.select().count(), 50)
+
+ def test_multiple_readers(self):
+ data_queue = Queue.Queue()
+
+ def reader_thread(q, num):
+ for i in range(num):
+ data_queue.put(User.select().count())
+
+ threads = []
+
+ for i in range(5):
+ threads.append(threading.Thread(target=reader_thread, args=(data_queue, 20)))
+
+ [t.start() for t in threads]
+ [t.join() for t in threads]
+
+ self.assertEqual(data_queue.qsize(), 100)
class ModelInheritanceTestCase(BasePeeweeTestCase):
Please sign in to comment.
Something went wrong with that request. Please try again.