Skip to content

Commit

Permalink
Merge pull request #3 from cnheider/master
Browse files Browse the repository at this point in the history
Add support for lambda functions
  • Loading branch information
cnheider committed Apr 1, 2019
2 parents f087f29 + 3f53fee commit 161a942
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 16 deletions.
9 changes: 9 additions & 0 deletions tests/test_pqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ def test_integration_func():
print(a)


@pytest.mark.slow
def test_lambda_func():
task = lambda x: x

with PooledQueueProcessor(task, [2], max_queue_size=10) as processor:
for a, _ in zip(processor, range(30)):
print(a)


# @pytest.mark.slow
def test_integration_except():
task = Exc()
Expand Down
2 changes: 1 addition & 1 deletion version.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def get_version():
# Most git tags are prefixed with 'v' (example: v1.2.3) this is
# never desirable for artifact repositories, so we strip the
# leading 'v' if it's present.
version = version[1:] if type(version) is str and version.startswith("v") else version
version = version[1:] if isinstance(version, str) and version.startswith("v") else version
else:
# Default version is an ISO8601 compiliant datetime. PyPI doesn't allow
# the colon ':' character in its versions, and time is required to allow
Expand Down
4 changes: 2 additions & 2 deletions warg/named_ordered_dictionary.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class NamedOrderedDictionary(Mapping):
def __init__(self, *args: Any, **kwargs: Any) -> None:
# super().__init__(**kwargs)

if len(args) == 1 and type(args[0]) is dict:
if len(args) == 1 and isinstance(args[0], dict):
args_dict = args[0]
else:
args_dict = {}
Expand Down Expand Up @@ -205,7 +205,7 @@ def update(self, *args: Any, **kwargs: Any) -> None:
items (dict): Python dictionary containing updated values.
"""

if len(args) == 1 and type(args[0]) is dict:
if len(args) == 1 and isinstance(args[0], dict):
args_dict = args[0]
else:
args_dict = {}
Expand Down
49 changes: 36 additions & 13 deletions warg/pooled_queue_processor.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,39 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import multiprocessing
import sys
import pickle
import time
from abc import abstractmethod
from typing import Iterable, Mapping
from abc import abstractmethod, ABC
from typing import Iterable, Mapping, Any

from cloudpickle import cloudpickle

__author__ = "cnheider"

import multiprocessing as mp
import queue


class PooledQueueTask(object):
class CloudPickleBase(object):
"""
Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle)
:param x: (Any) the variable you wish to wrap for pickling with cloudpickle
"""

def __init__(self, x: Any):
self._x = x

def __getstate__(self):
return cloudpickle.dumps(self._x)

def __setstate__(self, x):
self._x = pickle.loads(x)

def __call__(self, *args, **kwargs):
return self._x(*args, **kwargs)


class PooledQueueTask(ABC):
def __call__(self, *args, **kwargs):
return self.call(*args, **kwargs)

Expand All @@ -23,32 +44,34 @@ def call(self, *args, **kwargs):

class PooledQueueProcessor(object):
"""
This is a workaround of Pythons extremely slow interprocess communication pipes.
The ideal solution would be to use a multiprocessing.queue, but it apparently communication is band
limited.
This solution has processes complete tasks (batches) and a thread add the results to a queue.queue.
"""
This is a workaround of Pythons extremely slow interprocess communication pipes.
The ideal solution would be to use a multiprocessing.queue, but it apparently communication is band
limited.
This solution has processes complete tasks (batches) and a thread add the results to a queue.queue.
"""

def __init__(
self,
func,
args: Iterable = (),
kwargs: Mapping = {},
max_queue_size=100,
n_proc=4,
n_proc=None,
max_tasks_per_child=None,
fill_at_construction=True,
blocking=True,
):
self._max_queue_size = max_queue_size
if isinstance(func, type):
func = func()
self._func = func
self._func = CloudPickleBase(func)
self.args = args
self.kwargs = kwargs
self.blocking = blocking
if max_tasks_per_child is None:
max_tasks_per_child = max_queue_size // 4
if n_proc is None:
n_proc = multiprocessing.cpu_count()

self._queue = queue.Queue(maxsize=max_queue_size)
self._pool = mp.Pool(n_proc, maxtasksperchild=max_tasks_per_child)
Expand Down Expand Up @@ -90,8 +113,8 @@ def raise_error(self, excptn):
def get(self):
"""
:return:
"""
:return:
"""
if self.queue_size < 1: # self._queue.empty():
if len(multiprocessing.active_children()) == 0:
if self.blocking:
Expand Down

0 comments on commit 161a942

Please sign in to comment.