diff --git a/autogluon/resource/dist_manager.py b/autogluon/resource/dist_manager.py index c0fd02b61f5..b09809c0652 100644 --- a/autogluon/resource/dist_manager.py +++ b/autogluon/resource/dist_manager.py @@ -1,6 +1,7 @@ import logging import multiprocessing as mp from .resource import * +from ..utils import Queue __all__ = ['DistributedResourceManager', 'NodeResourceManager'] @@ -116,14 +117,12 @@ def __init__(self, remote): self.LOCK = mp.Lock() self.MAX_CPU_COUNT = get_remote_cpu_count(remote) self.MAX_GPU_COUNT = get_remote_gpu_count(remote) - self.CPU_QUEUE = mp.Queue() - self.GPU_QUEUE = mp.Queue() + self.CPU_QUEUE = Queue() + self.GPU_QUEUE = Queue() for cid in range(self.MAX_CPU_COUNT): self.CPU_QUEUE.put(cid) for gid in range(self.MAX_GPU_COUNT): self.GPU_QUEUE.put(gid) - #logger.debug('\n\self.CPU_QUEUE.qsize() {}'.format(self.CPU_QUEUE.qsize()) + \ - # ', self.GPU_QUEUE.qsize() {}'.format(self.GPU_QUEUE.qsize())) def _request(self, remote, resource): """ResourceManager, we recommand using scheduler instead of creating your own @@ -153,7 +152,6 @@ def _release(self, resource): self.GPU_QUEUE.put(gid) def get_all_resources(self): - gpu_count = self.GPU_QUEUE.qsize() return self.MAX_CPU_COUNT, self.MAX_GPU_COUNT def check_availability(self, resource): diff --git a/autogluon/resource/manager.py b/autogluon/resource/manager.py index db337a0e04a..2fb7d0b7ba6 100644 --- a/autogluon/resource/manager.py +++ b/autogluon/resource/manager.py @@ -1,6 +1,7 @@ import logging import multiprocessing as mp from .resource import * +from ..utils import Queue __all__ = ['ResourceManager'] @@ -10,8 +11,8 @@ class ResourceManager(object): """Resource Manager to keep track of the cpu and gpu usage """ LOCK = mp.Lock() - CPU_QUEUE = mp.Queue() - GPU_QUEUE = mp.Queue() + CPU_QUEUE = Queue() + GPU_QUEUE = Queue() MAX_CPU_COUNT = get_cpu_count() MAX_GPU_COUNT = get_gpu_count() for cid in range(MAX_CPU_COUNT): diff --git a/autogluon/utils/__init__.py b/autogluon/utils/__init__.py index faad7261490..971ba5af7f0 100644 --- a/autogluon/utils/__init__.py +++ b/autogluon/utils/__init__.py @@ -1,5 +1,6 @@ from .files import * from .data_analyzer import * from .visualizer import * +from .queue import Queue __all__ = data_analyzer.__all__ + visualizer.__all__ + files.__all__ diff --git a/autogluon/utils/queue.py b/autogluon/utils/queue.py new file mode 100644 index 00000000000..4e0ab04ef62 --- /dev/null +++ b/autogluon/utils/queue.py @@ -0,0 +1,52 @@ +# Adapted from: http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/ +import multiprocessing + +class SharedCounter(object): + """ A synchronized shared counter. + """ + + def __init__(self, n = 0): + self.count = multiprocessing.Value('i', n) + + def increment(self, n = 1): + """ Increment the counter by n (default = 1) """ + with self.count.get_lock(): + self.count.value += n + + @property + def value(self): + """ Return the value of the counter """ + return self.count.value + + +class Queue(multiprocessing.queues.Queue): + """ A portable implementation of multiprocessing.Queue. + + Because of multithreading / multiprocessing semantics, Queue.qsize() may + raise the NotImplementedError exception on Unix platforms like Mac OS X + where sem_getvalue() is not implemented. This subclass addresses this + problem by using a synchronized shared counter (initialized to zero) and + increasing / decreasing its value every time the put() and get() methods + are called, respectively. This not only prevents NotImplementedError from + being raised, but also allows us to implement a reliable version of both + qsize() and empty(). + """ + def __init__(self, *args, **kwargs): + super(Queue, self).__init__(*args, ctx=multiprocessing.get_context(), **kwargs) + self.size = SharedCounter(0) + + def put(self, *args, **kwargs): + self.size.increment(1) + super(Queue, self).put(*args, **kwargs) + + def get(self, *args, **kwargs): + self.size.increment(-1) + return super(Queue, self).get(*args, **kwargs) + + def qsize(self): + """ Reliable implementation of multiprocessing.Queue.qsize() """ + return self.size.value + + def empty(self): + """ Reliable implementation of multiprocessing.Queue.empty() """ + return not self.qsize()