Skip to content

Commit

Permalink
Merge pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExe…
Browse files Browse the repository at this point in the history
…cutor performance
  • Loading branch information
mxm committed May 7, 2020
2 parents 7e0be24 + b740404 commit 79a66fd
Showing 1 changed file with 32 additions and 70 deletions.
102 changes: 32 additions & 70 deletions sdks/python/apache_beam/utils/thread_pool_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,11 @@ def run(self):


class _Worker(threading.Thread):
def __init__(
self, idle_worker_queue, permitted_thread_age_in_seconds, work_item):
def __init__(self, idle_worker_queue, work_item):
super(_Worker, self).__init__()
self._idle_worker_queue = idle_worker_queue
self._permitted_thread_age_in_seconds = permitted_thread_age_in_seconds
self._work_item = work_item
self._wake_event = threading.Event()
self._wake_semaphore = threading.Semaphore(0)
self._lock = threading.Lock()
self._shutdown = False

Expand All @@ -70,62 +68,28 @@ def run(self):
self._work_item.run()
self._work_item = None

# If we are explicitly awake then don't add ourselves back to the
# idle queue. This occurs in case 3 described below.
if not self._wake_event.is_set():
self._idle_worker_queue.put(self)

self._wake_event.wait(self._permitted_thread_age_in_seconds)
with self._lock:
# When we are awoken, we may be in one of three states:
# 1) _work_item is set and _shutdown is False.
# This represents the case when we have accepted work.
# 2) _work_item is unset and _shutdown is True.
# This represents the case where either we timed out before
# accepting work or explicitly were shutdown without accepting
# any work.
# 3) _work_item is set and _shutdown is True.
# This represents a race where we accepted work and also
# were shutdown before the worker thread started processing
# that work. In this case we guarantee to process the work
# but we don't clear the event ensuring that the next loop
# around through to the wait() won't block and we will exit
# since _work_item will be unset.

# We only exit when _work_item is unset to prevent dropping of
# submitted work.
if self._work_item is None:
self._shutdown = True
return
if not self._shutdown:
self._wake_event.clear()

def accepted_work(self, work_item):
"""Returns True if the work was accepted.
self._idle_worker_queue.put(self)
self._wake_semaphore.acquire()
if self._work_item is None:
return

def assign_work(self, work_item):
"""Assigns the work item and wakes up the thread.
This method must only be called while the worker is idle.
"""
with self._lock:
if self._shutdown:
return False

self._work_item = work_item
self._wake_event.set()
return True
self._work_item = work_item
self._wake_semaphore.release()

def shutdown(self):
"""Marks this thread as shutdown possibly waking it up if it is idle."""
with self._lock:
if self._shutdown:
return
self._shutdown = True
self._wake_event.set()
"""Wakes up this thread with a 'None' work item signalling to shutdown."""
self._wake_semaphore.release()


class UnboundedThreadPoolExecutor(_base.Executor):
def __init__(self, permitted_thread_age_in_seconds=30):
self._permitted_thread_age_in_seconds = permitted_thread_age_in_seconds
def __init__(self):
self._idle_worker_queue = queue.Queue()
self._max_idle_threads = 16
self._workers = weakref.WeakSet()
self._shutdown = False
self._lock = threading.Lock() # Guards access to _workers and _shutdown
Expand All @@ -137,35 +101,33 @@ def submit(self, fn, *args, **kwargs):
"""
future = _base.Future()
work_item = _WorkItem(future, fn, args, kwargs)
try:
# Keep trying to get an idle worker from the queue until we find one
# that accepts the work.
while not self._idle_worker_queue.get(
block=False).accepted_work(work_item):
pass
return future
except queue.Empty:
with self._lock:
if self._shutdown:
raise RuntimeError(
'Cannot schedule new tasks after thread pool '
'has been shutdown.')

worker = _Worker(
self._idle_worker_queue,
self._permitted_thread_age_in_seconds,
work_item)
with self._lock:
if self._shutdown:
raise RuntimeError(
'Cannot schedule new tasks after thread pool has been shutdown.')
try:
self._idle_worker_queue.get(block=False).assign_work(work_item)

# If we have more idle threads then the max allowed, shutdown a thread.
if self._idle_worker_queue.qsize() > self._max_idle_threads:
try:
self._idle_worker_queue.get(block=False).shutdown()
except queue.Empty:
pass
except queue.Empty:
worker = _Worker(self._idle_worker_queue, work_item)
worker.daemon = True
worker.start()
self._workers.add(worker)
return future
return future

def shutdown(self, wait=True):
with self._lock:
if self._shutdown:
return

self._shutdown = True

for worker in self._workers:
worker.shutdown()

Expand Down

0 comments on commit 79a66fd

Please sign in to comment.