Skip to content

Commit

Permalink
pythongh-89240: limit multiprocessing.Pool to 61 workers on windows
Browse files Browse the repository at this point in the history
  • Loading branch information
aisk committed Mar 22, 2023
1 parent d1a89ce commit 441ada9
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Doc/library/multiprocessing.rst
Expand Up @@ -2209,6 +2209,10 @@ with the :class:`Pool` class.

*processes* is the number of worker processes to use. If *processes* is
``None`` then the number returned by :func:`os.cpu_count` is used.
On Windows, *processes* must be equal or lower than ``61``. If it is not
then :exc:`ValueError` will be raised. If *processes* is ``None``, then
the default chosen will be at most ``61``, even if more processors are
available.

If *initializer* is not ``None`` then each worker process will call
``initializer(*initargs)`` when it starts.
Expand Down
10 changes: 10 additions & 0 deletions Lib/multiprocessing/pool.py
Expand Up @@ -17,6 +17,7 @@
import itertools
import os
import queue
import sys
import threading
import time
import traceback
Expand Down Expand Up @@ -175,6 +176,10 @@ class Pool(object):
Class which supports an async version of applying functions to arguments.
'''
_wrap_exception = True
# On Windows, WaitForMultipleObjects is used to wait for processes to
# finish. It can wait on, at most, 64 objects. There is an overhead of three
# objects.
_MAX_WINDOWS_WORKERS = 64 - 3

@staticmethod
def Process(ctx, *args, **kwds):
Expand All @@ -201,8 +206,12 @@ def __init__(self, processes=None, initializer=None, initargs=(),

if processes is None:
processes = os.cpu_count() or 1
if sys.platform == 'win32':
processes = min(processes, self._MAX_WINDOWS_WORKERS)
if processes < 1:
raise ValueError("Number of processes must be at least 1")
if sys.platform == 'win32' and processes > self._MAX_WINDOWS_WORKERS:
raise ValueError(f"max_workers must be <= {self._MAX_WINDOWS_WORKERS}")
if maxtasksperchild is not None:
if not isinstance(maxtasksperchild, int) or maxtasksperchild <= 0:
raise ValueError("maxtasksperchild must be a positive int or None")
Expand Down Expand Up @@ -920,6 +929,7 @@ def _set(self, i, obj):

class ThreadPool(Pool):
_wrap_exception = False
_MAX_WINDOWS_WORKERS = float("inf")

@staticmethod
def Process(ctx, *args, **kwds):
Expand Down
12 changes: 12 additions & 0 deletions Lib/test/_test_multiprocessing.py
Expand Up @@ -2772,6 +2772,18 @@ def test_resource_warning(self):
pool = None
support.gc_collect()

class TestPoolMaxWorkers(unittest.TestCase):
@unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
def test_max_workers_too_large(self):
with self.assertRaisesRegex(ValueError, "max_workers must be <= 61"):
multiprocessing.pool.Pool(62)

# ThreadPool have no limit.
p = multiprocessing.pool.ThreadPool(62)
p.close()
p.join()


def raising():
raise KeyError("key")

Expand Down
@@ -0,0 +1,2 @@
Limit ``processes`` in :class:`multiprocessing.Pool` to 61 to work around a
WaitForMultipleObjects limitation.

0 comments on commit 441ada9

Please sign in to comment.