Skip to content

Commit

Permalink
Test for custom loads/dumps. Version 1.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-petrenko committed Feb 8, 2022
1 parent f15e62a commit 234f1d4
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 18 deletions.
43 changes: 28 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,20 @@ Completely mimics the interface of the standard multiprocessing.Queue, so can be

Adds `get_many()` and `put_many()` methods to receive/send multiple messages at once for the price of a single lock.

## Recent releases

##### v1.3.0
* Now support custom serializers and deserializers instead of Pickle (thank you @beasteers!):
```Python
q = Queue(max_size_bytes=100000, loads=custom_deserializer, dumps=custom_serializer)
```

## Requirements

- Linux or MacOS
- Python 3.6 or newer
- GCC 4.9.0 or newer


## Installation

```pip install faster-fifo```
Expand Down Expand Up @@ -78,28 +85,28 @@ except Empty:

*(measured execution times in seconds)*

| | multiprocessing.Queue | faster-fifo, get() | faster-fifo, get_many() |
| | multiprocessing.Queue | faster-fifo, get() | faster-fifo, get_many() |
|---------------------------------------------------|:---------------------:|:-----------------------:|:-------------------------:|
| 1 producer 1 consumer (200K msgs per producer) | 2.54 | 0.86 | 0.92 |
| 1 producer 10 consumers (200K msgs per producer) | 4.00 | 1.39 | 1.36 |
| 10 producers 1 consumer (100K msgs per producer) | 13.19 | 6.74 | 0.94 |
| 3 producers 20 consumers (100K msgs per producer) | 9.30 | 2.22 | 2.17 |
| 20 producers 3 consumers (50K msgs per producer) | 18.62 | 7.41 | 0.64 |
| 20 producers 20 consumers (50K msgs per producer) | 36.51 | 1.32 | 3.79 |
| 1 producer 1 consumer (200K msgs per producer) | 2.54 | 0.86 | 0.92 |
| 1 producer 10 consumers (200K msgs per producer) | 4.00 | 1.39 | 1.36 |
| 10 producers 1 consumer (100K msgs per producer) | 13.19 | 6.74 | 0.94 |
| 3 producers 20 consumers (100K msgs per producer) | 9.30 | 2.22 | 2.17 |
| 20 producers 3 consumers (50K msgs per producer) | 18.62 | 7.41 | 0.64 |
| 20 producers 20 consumers (50K msgs per producer) | 36.51 | 1.32 | 3.79 |


##### System #2 (Intel(R) Core(TM) i5-4200U CPU @ 1.60GHz, 2 cores, Ubuntu 18.04)

*(measured execution times in seconds)*

| | multiprocessing.Queue | faster-fifo, get() | faster-fifo, get_many() |
| | multiprocessing.Queue | faster-fifo, get() | faster-fifo, get_many() |
|---------------------------------------------------|:---------------------:|:-----------------------:|:-------------------------:|
| 1 producer 1 consumer (200K msgs per producer) | 7.86 | 2.09 | 2.2 |
| 1 producer 10 consumers (200K msgs per producer) | 11.68 | 4.01 | 3.88 |
| 10 producers 1 consumer (100K msgs per producer) | 44.48 | 16.68 | 5.98 |
| 3 producers 20 consumers (100K msgs per producer) | 22.59 | 7.83 | 7.49 |
| 20 producers 3 consumers (50K msgs per producer) | 66.3 | 22.3 | 6.35 |
| 20 producers 20 consumers (50K msgs per producer) | 78.75 | 14.39 | 15.78 |
| 1 producer 1 consumer (200K msgs per producer) | 7.86 | 2.09 | 2.2 |
| 1 producer 10 consumers (200K msgs per producer) | 11.68 | 4.01 | 3.88 |
| 10 producers 1 consumer (100K msgs per producer) | 44.48 | 16.68 | 5.98 |
| 3 producers 20 consumers (100K msgs per producer) | 22.59 | 7.83 | 7.49 |
| 20 producers 3 consumers (50K msgs per producer) | 66.3 | 22.3 | 6.35 |
| 20 producers 20 consumers (50K msgs per producer) | 78.75 | 14.39 | 15.78 |


## Using multiprocessing.get_context('spawn')
Expand All @@ -110,6 +117,12 @@ In order to use faster_fifo with 'spawn' make sure to add `import faster_fifo_re
PicklingError: Can't pickle <class '__main__.c_ubyte_Array_2'>: attribute lookup c_ubyte_Array_2
```

## Run tests

`python -m unittest`

(there are also C++ unit tests, should run them if C++ code was altered)

## Footnote

Originally designed for SampleFactory, a high-throughput asynchronous RL codebase https://github.com/alex-petrenko/sample-factory.
Expand Down
19 changes: 19 additions & 0 deletions cpp_faster_fifo/tests/test_faster_fifo.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,22 @@ def test_spawn_ctx(self):
p.start()
for p in procs:
p.join()


# this can actually be used instead of Pickle if we know that we need to support only specific data types
# should be significantly faster
def custom_int_deserializer(msg_bytes):
return int.from_bytes(msg_bytes, 'big')


def custom_int_serializer(x):
return x.to_bytes(4, 'big')


class TestCustomSerializer(TestCase):
def test_custom_loads_dumps(self):
q = Queue(max_size_bytes=100000, loads=custom_int_deserializer, dumps=custom_int_serializer)
for i in range(32767):
q.put(i)
deserialized_i = q.get()
assert i == deserialized_i
4 changes: 2 additions & 2 deletions faster_fifo.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class Queue:
self.message_buffer_memview = None

# allow class level serializers
def loads(self, obj):
return _ForkingPickler.loads(obj)
def loads(self, msg_bytes):
return _ForkingPickler.loads(msg_bytes)

def dumps(self, obj):
return _ForkingPickler.dumps(obj).tobytes()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
setup(
# Information
name='faster-fifo',
version='1.2.0',
version='1.3.0',
url='https://github.com/alex-petrenko/faster-fifo',
author='Aleksei Petrenko & Tushar Kumar',
license='MIT',
Expand Down

0 comments on commit 234f1d4

Please sign in to comment.