-
Notifications
You must be signed in to change notification settings - Fork 4
/
test_multiprocessing.py
37 lines (29 loc) · 1.13 KB
/
test_multiprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import unittest
from .feature import Feature, JSONFeature
from .lmdbstore import LmdbDatabase
from .model import BaseModel
from .persistence import PersistenceSettings, UuidProvider, \
StringDelimitedKeyBuilder
from .test_integration import TextStream, Tokenizer, WordCount
from tempfile import mkdtemp
from multiprocessing import Pool
class Settings(PersistenceSettings):
id_provider = UuidProvider()
key_builder = StringDelimitedKeyBuilder()
database = LmdbDatabase(
path=mkdtemp(),
map_size=10000000,
key_builder=key_builder)
class D(BaseModel, Settings):
stream = Feature(TextStream, store=True)
words = Feature(Tokenizer, needs=stream, store=False)
count = JSONFeature(WordCount, needs=words, store=True)
def get_count(_):
return len(list(D.database.iter_ids()))
class MultiProcessTests(unittest.TestCase):
def test_can_list_ids_from_multiple_processes(self):
D.process(stream='Here is some text')
D.process(stream='Here is some more')
pool = Pool(4)
counts = pool.map(get_count, [_ for _ in range(10)])
self.assertSequenceEqual([2] * 10, counts)