Skip to content

Commit

Permalink
Multiprocess Queue Support MacOS (open-mmlab#33)
Browse files Browse the repository at this point in the history
* Queue for Mac OS

* add queue
  • Loading branch information
zhanghang1989 committed Jun 3, 2019
1 parent 59ac4f3 commit 012b44c
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 7 deletions.
8 changes: 3 additions & 5 deletions autogluon/resource/dist_manager.py
@@ -1,6 +1,7 @@
import logging
import multiprocessing as mp
from .resource import *
from ..utils import Queue

__all__ = ['DistributedResourceManager', 'NodeResourceManager']

Expand Down Expand Up @@ -116,14 +117,12 @@ def __init__(self, remote):
self.LOCK = mp.Lock()
self.MAX_CPU_COUNT = get_remote_cpu_count(remote)
self.MAX_GPU_COUNT = get_remote_gpu_count(remote)
self.CPU_QUEUE = mp.Queue()
self.GPU_QUEUE = mp.Queue()
self.CPU_QUEUE = Queue()
self.GPU_QUEUE = Queue()
for cid in range(self.MAX_CPU_COUNT):
self.CPU_QUEUE.put(cid)
for gid in range(self.MAX_GPU_COUNT):
self.GPU_QUEUE.put(gid)
#logger.debug('\n\self.CPU_QUEUE.qsize() {}'.format(self.CPU_QUEUE.qsize()) + \
# ', self.GPU_QUEUE.qsize() {}'.format(self.GPU_QUEUE.qsize()))

def _request(self, remote, resource):
"""ResourceManager, we recommand using scheduler instead of creating your own
Expand Down Expand Up @@ -153,7 +152,6 @@ def _release(self, resource):
self.GPU_QUEUE.put(gid)

def get_all_resources(self):
gpu_count = self.GPU_QUEUE.qsize()
return self.MAX_CPU_COUNT, self.MAX_GPU_COUNT

def check_availability(self, resource):
Expand Down
5 changes: 3 additions & 2 deletions autogluon/resource/manager.py
@@ -1,6 +1,7 @@
import logging
import multiprocessing as mp
from .resource import *
from ..utils import Queue

__all__ = ['ResourceManager']

Expand All @@ -10,8 +11,8 @@ class ResourceManager(object):
"""Resource Manager to keep track of the cpu and gpu usage
"""
LOCK = mp.Lock()
CPU_QUEUE = mp.Queue()
GPU_QUEUE = mp.Queue()
CPU_QUEUE = Queue()
GPU_QUEUE = Queue()
MAX_CPU_COUNT = get_cpu_count()
MAX_GPU_COUNT = get_gpu_count()
for cid in range(MAX_CPU_COUNT):
Expand Down
1 change: 1 addition & 0 deletions autogluon/utils/__init__.py
@@ -1,5 +1,6 @@
from .files import *
from .data_analyzer import *
from .visualizer import *
from .queue import Queue

__all__ = data_analyzer.__all__ + visualizer.__all__ + files.__all__
52 changes: 52 additions & 0 deletions autogluon/utils/queue.py
@@ -0,0 +1,52 @@
# Adapted from: http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
import multiprocessing

class SharedCounter(object):
""" A synchronized shared counter.
"""

def __init__(self, n = 0):
self.count = multiprocessing.Value('i', n)

def increment(self, n = 1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n

@property
def value(self):
""" Return the value of the counter """
return self.count.value


class Queue(multiprocessing.queues.Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
"""
def __init__(self, *args, **kwargs):
super(Queue, self).__init__(*args, ctx=multiprocessing.get_context(), **kwargs)
self.size = SharedCounter(0)

def put(self, *args, **kwargs):
self.size.increment(1)
super(Queue, self).put(*args, **kwargs)

def get(self, *args, **kwargs):
self.size.increment(-1)
return super(Queue, self).get(*args, **kwargs)

def qsize(self):
""" Reliable implementation of multiprocessing.Queue.qsize() """
return self.size.value

def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()

0 comments on commit 012b44c

Please sign in to comment.