Skip to content

Commit

Permalink
pythongh-89240: Enable multiprocessing on Windows to use large proces…
Browse files Browse the repository at this point in the history
…s pools (pythonGH-107873)

We add _winapi.BatchedWaitForMultipleObjects to wait for larger numbers of handles.
This is an internal module, hence undocumented, and should be used with caution.
Check the docstring for info before using BatchedWaitForMultipleObjects.
  • Loading branch information
zooba authored and fsc-eriker committed Feb 14, 2024
1 parent 571e83b commit 91fe49a
Show file tree
Hide file tree
Showing 12 changed files with 1,195 additions and 6 deletions.
10 changes: 10 additions & 0 deletions Include/internal/pycore_global_objects_fini_generated.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions Include/internal/pycore_global_strings.h
Expand Up @@ -372,6 +372,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(defaultaction)
STRUCT_FOR_ID(delete)
STRUCT_FOR_ID(depth)
STRUCT_FOR_ID(desired_access)
STRUCT_FOR_ID(detect_types)
STRUCT_FOR_ID(deterministic)
STRUCT_FOR_ID(device)
Expand Down Expand Up @@ -462,6 +463,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(groups)
STRUCT_FOR_ID(h)
STRUCT_FOR_ID(handle)
STRUCT_FOR_ID(handle_seq)
STRUCT_FOR_ID(hash_name)
STRUCT_FOR_ID(header)
STRUCT_FOR_ID(headers)
Expand All @@ -479,9 +481,12 @@ struct _Py_global_strings {
STRUCT_FOR_ID(indexgroup)
STRUCT_FOR_ID(inf)
STRUCT_FOR_ID(infer_variance)
STRUCT_FOR_ID(inherit_handle)
STRUCT_FOR_ID(inheritable)
STRUCT_FOR_ID(initial)
STRUCT_FOR_ID(initial_bytes)
STRUCT_FOR_ID(initial_owner)
STRUCT_FOR_ID(initial_state)
STRUCT_FOR_ID(initial_value)
STRUCT_FOR_ID(initval)
STRUCT_FOR_ID(inner_size)
Expand Down Expand Up @@ -537,6 +542,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(locals)
STRUCT_FOR_ID(logoption)
STRUCT_FOR_ID(loop)
STRUCT_FOR_ID(manual_reset)
STRUCT_FOR_ID(mapping)
STRUCT_FOR_ID(match)
STRUCT_FOR_ID(max_length)
Expand All @@ -553,6 +559,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(metadata)
STRUCT_FOR_ID(method)
STRUCT_FOR_ID(microsecond)
STRUCT_FOR_ID(milliseconds)
STRUCT_FOR_ID(minute)
STRUCT_FOR_ID(mod)
STRUCT_FOR_ID(mode)
Expand All @@ -562,6 +569,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(month)
STRUCT_FOR_ID(mro)
STRUCT_FOR_ID(msg)
STRUCT_FOR_ID(mutex)
STRUCT_FOR_ID(mycmp)
STRUCT_FOR_ID(n)
STRUCT_FOR_ID(n_arg)
Expand Down Expand Up @@ -665,6 +673,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(sched_priority)
STRUCT_FOR_ID(scheduler)
STRUCT_FOR_ID(second)
STRUCT_FOR_ID(security_attributes)
STRUCT_FOR_ID(seek)
STRUCT_FOR_ID(seekable)
STRUCT_FOR_ID(selectors)
Expand Down Expand Up @@ -752,6 +761,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(values)
STRUCT_FOR_ID(version)
STRUCT_FOR_ID(volume)
STRUCT_FOR_ID(wait_all)
STRUCT_FOR_ID(warnings)
STRUCT_FOR_ID(warnoptions)
STRUCT_FOR_ID(wbits)
Expand Down
10 changes: 10 additions & 0 deletions Include/internal/pycore_runtime_init_generated.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions Include/internal/pycore_unicodeobject_generated.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion Lib/multiprocessing/connection.py
Expand Up @@ -1011,8 +1011,20 @@ def _exhaustive_wait(handles, timeout):
# returning the first signalled might create starvation issues.)
L = list(handles)
ready = []
# Windows limits WaitForMultipleObjects at 64 handles, and we use a
# few for synchronisation, so we switch to batched waits at 60.
if len(L) > 60:
try:
res = _winapi.BatchedWaitForMultipleObjects(L, False, timeout)
except TimeoutError:
return []
ready.extend(L[i] for i in res)
if res:
L = [h for i, h in enumerate(L) if i > res[0] & i not in res]
timeout = 0
while L:
res = _winapi.WaitForMultipleObjects(L, False, timeout)
short_L = L[:60] if len(L) > 60 else L
res = _winapi.WaitForMultipleObjects(short_L, False, timeout)
if res == WAIT_TIMEOUT:
break
elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
Expand Down
18 changes: 18 additions & 0 deletions Lib/test/_test_multiprocessing.py
Expand Up @@ -6113,6 +6113,24 @@ def test_spawn_sys_executable_none_allows_import(self):
self.assertEqual(rc, 0)
self.assertFalse(err, msg=err.decode('utf-8'))

def test_large_pool(self):
#
# gh-89240: Check that large pools are always okay
#
testfn = os_helper.TESTFN
self.addCleanup(os_helper.unlink, testfn)
with open(testfn, 'w', encoding='utf-8') as f:
f.write(textwrap.dedent('''\
import multiprocessing
def f(x): return x*x
if __name__ == '__main__':
with multiprocessing.Pool(200) as p:
print(sum(p.map(f, range(1000))))
'''))
rc, out, err = script_helper.assert_python_ok(testfn)
self.assertEqual("332833500", out.decode('utf-8').strip())
self.assertFalse(err, msg=err.decode('utf-8'))


#
# Mixins
Expand Down

0 comments on commit 91fe49a

Please sign in to comment.