From 5900348c01496c1147e9d11faef41da03bfbcd56 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 22 May 2026 19:24:51 -0400 Subject: [PATCH 1/6] Revert "Fix test hang in subprocess expansion service on port bind failure (#38572)" This reverts commit 930b94cceb69e33bd9a9a2f1287ebe5c75533536. --- .../portability/expansion_service_main.py | 14 +-- .../apache_beam/utils/subprocess_server.py | 86 +++++++++---------- 2 files changed, 42 insertions(+), 58 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_main.py b/sdks/python/apache_beam/runners/portability/expansion_service_main.py index f2d03e0e898c..269d02b3efbd 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service_main.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service_main.py @@ -55,9 +55,7 @@ def main(argv): with fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter( known_args.fully_qualified_name_glob): - # Bind to localhost instead of 0.0.0.0 to ensure compatibility with loopback - # connections on dual-stack (IPv4/IPv6) systems. - address = 'localhost:{}'.format(known_args.port) + address = '0.0.0.0:{}'.format(known_args.port) server = grpc.server(thread_pool_executor.shared_unbounded_instance()) if known_args.serve_loopback_worker: beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( @@ -73,15 +71,9 @@ def main(argv): artifact_service.ArtifactRetrievalService( artifact_service.BeamFilesystemHandler(None).file_reader), server) - # Ensure gRPC server successfully binds. If this fails (e.g., due to port collision), - # add_insecure_port returns 0. We raise an error to crash the subprocess immediately, - # allowing the parent process to detect it and fail fast rather than hanging. - bound_port = server.add_insecure_port(address) - if not bound_port: - raise RuntimeError( - "Failed to bind expansion service to {}".format(address)) + server.add_insecure_port(address) server.start() - _LOGGER.info('Listening for expansion requests at %d', bound_port) + _LOGGER.info('Listening for expansion requests at %d', known_args.port) def cleanup(unused_signum, unused_frame): _LOGGER.info('Shutting down expansion service.') diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index b22e6badb5e7..d21cb486b8f4 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -186,53 +186,45 @@ def __exit__(self, *unused_args): self.stop() def start(self): - max_attempts = 3 - for attempt in range(max_attempts): - try: - process, endpoint = self.start_process() - wait_secs = .1 - channel_options = [ - ("grpc.max_receive_message_length", -1), - ("grpc.max_send_message_length", -1), - # Default: 20000ms (20s), increased to 10 minutes for stability - ("grpc.keepalive_timeout_ms", 600_000), - # Default: 2, set to 0 to allow unlimited pings without data - ("grpc.http2.max_pings_without_data", 0), - # Default: False, set to True to allow keepalive pings when no calls - ("grpc.keepalive_permit_without_calls", True), - # Default: 2, set to 0 to allow unlimited ping strikes - ("grpc.http2.max_ping_strikes", 0), - # Default: 0 (disabled), enable socket reuse for better handling - ("grpc.so_reuseport", 1), - ] - self._grpc_channel = grpc.insecure_channel( - endpoint, options=channel_options) - channel_ready = grpc.channel_ready_future(self._grpc_channel) - while True: - if process is not None and process.poll() is not None: - _LOGGER.error("Started job service with %s", process.args) - raise RuntimeError( - 'Service failed to start up with error %s' % process.poll()) - try: - channel_ready.result(timeout=wait_secs) - break - except (grpc.FutureTimeoutError, grpc.RpcError): - wait_secs *= 1.2 - logging.log( - logging.WARNING if wait_secs > 1 else logging.DEBUG, - 'Waiting for grpc channel to be ready at %s.', - endpoint) - return self._stub_class(self._grpc_channel) - except Exception as e: - _LOGGER.warning( - "Error bringing up service on attempt %d: %s", - attempt + 1, - e, - exc_info=True) - self.stop() - if attempt == max_attempts - 1: - raise - time.sleep(1) + try: + process, endpoint = self.start_process() + wait_secs = .1 + channel_options = [ + ("grpc.max_receive_message_length", -1), + ("grpc.max_send_message_length", -1), + # Default: 20000ms (20s), increased to 10 minutes for stability + ("grpc.keepalive_timeout_ms", 600_000), + # Default: 2, set to 0 to allow unlimited pings without data + ("grpc.http2.max_pings_without_data", 0), + # Default: False, set to True to allow keepalive pings when no calls + ("grpc.keepalive_permit_without_calls", True), + # Default: 2, set to 0 to allow unlimited ping strikes + ("grpc.http2.max_ping_strikes", 0), + # Default: 0 (disabled), enable socket reuse for better handling + ("grpc.so_reuseport", 1), + ] + self._grpc_channel = grpc.insecure_channel( + endpoint, options=channel_options) + channel_ready = grpc.channel_ready_future(self._grpc_channel) + while True: + if process is not None and process.poll() is not None: + _LOGGER.error("Started job service with %s", process.args) + raise RuntimeError( + 'Service failed to start up with error %s' % process.poll()) + try: + channel_ready.result(timeout=wait_secs) + break + except (grpc.FutureTimeoutError, grpc.RpcError): + wait_secs *= 1.2 + logging.log( + logging.WARNING if wait_secs > 1 else logging.DEBUG, + 'Waiting for grpc channel to be ready at %s.', + endpoint) + return self._stub_class(self._grpc_channel) + except: # pylint: disable=bare-except + _LOGGER.exception("Error bringing up service") + self.stop() + raise def start_process(self): if self._owner_id is not None: From b67db210e464b68c68a75eb02ae1cd2dfd011ec9 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 22 May 2026 20:24:27 -0400 Subject: [PATCH 2/6] Ensure immediate cleanup of subprocess server on start failure When a SubprocessServer fails to start (e.g., due to a process exit or startup error), the server process could leak if standard purging is blocked by other active owners sharing the cached subprocess. To fix this: - Implement `_SharedCache.force_remove()` to immediately remove a key from the cache and run its destructor regardless of active owners. - Add `SubprocessServer.stop_force()` which calls `force_remove()` to completely terminate the server's process. - Call `stop_force()` in the `except` block of `SubprocessServer.start()` --- .../apache_beam/utils/subprocess_server.py | 22 ++++- .../utils/subprocess_server_test.py | 84 +++++++++++++++++++ 2 files changed, 105 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index d21cb486b8f4..0b09b364362f 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -118,6 +118,12 @@ def get(self, *key): self._cache[key].owners.add(owner) return self._cache[key].obj + def force_remove(self, *key): + with self._lock: + entry = self._cache.pop(key, None) + if entry is not None: + self._destructor(entry.obj) + class JavaHelper: @classmethod @@ -223,7 +229,7 @@ def start(self): return self._stub_class(self._grpc_channel) except: # pylint: disable=bare-except _LOGGER.exception("Error bringing up service") - self.stop() + self.stop_force() raise def start_process(self): @@ -274,6 +280,20 @@ def stop_process(self): finally: self._grpc_channel = None + def stop_force(self): + try: + self._cache.force_remove(tuple(self._cmd), self._port, self._logger) + finally: + self._owner_id = None + if self._grpc_channel: + try: + self._grpc_channel.close() + except: # pylint: disable=bare-except + _LOGGER.error( + "Could not close the gRPC channel started with cmd %s", self._cmd) + finally: + self._grpc_channel = None + def _really_stop_process(process_and_endpoint): process, _ = process_and_endpoint # pylint: disable=unpacking-non-sequence if not process: diff --git a/sdks/python/apache_beam/utils/subprocess_server_test.py b/sdks/python/apache_beam/utils/subprocess_server_test.py index 073b8b3bcbe8..c9fe45ff24e0 100644 --- a/sdks/python/apache_beam/utils/subprocess_server_test.py +++ b/sdks/python/apache_beam/utils/subprocess_server_test.py @@ -464,6 +464,90 @@ def __init__(self): # without raising ValueError. server.stop_process() + def test_force_remove(self): + destructor_calls = [] + def custom_destructor(obj): + destructor_calls.append(obj) + + cache = subprocess_server._SharedCache(self.with_prefix, custom_destructor) + + owner1 = cache.register() + owner2 = cache.register() + + # Get object 'a' under both active owners + a = cache.get('a') + self.assertEqual(a[0], 'a') + self.assertIn(('a',), cache._cache) + + # force_remove on a non-existent key should be a safe no-op + cache.force_remove('non_existent') + + # Call force_remove, which should bypass the owners check and delete it immediately + cache.force_remove('a') + + # The cache entry should be gone + self.assertNotIn(('a',), cache._cache) + + # Destructor must be called on 'a' + self.assertEqual(destructor_calls, [a]) + + # Retrieving 'a' again under the active owners should construct a new object + new_a = cache.get('a') + self.assertNotEqual(new_a, a) + self.assertEqual(new_a[0], 'a') + + # Clean up + cache.purge(owner1) + cache.purge(owner2) + + def test_subprocess_server_start_failed_no_leak(self): + destructor_calls = [] + def custom_destructor(obj): + destructor_calls.append(obj) + + class DummyProcess: + def __init__(self): + self.args = ["dummy_cmd"] + def poll(self): + return 1 # Simulate that process exited/failed + + dummy_process = DummyProcess() + cache = subprocess_server._SharedCache( + lambda *args: (dummy_process, "localhost:12345"), custom_destructor) + + # 1. Register an independent, unrelated owner in the cache first. + other_owner = cache.register() + + class CustomServer(subprocess_server.SubprocessServer): + _cache = cache + + def __init__(self): + super().__init__(lambda channel: None, ["dummy_cmd"], port=12345) + + server = CustomServer() + # Fetch the process using other_owner, creating the cache entry and registering other_owner on it. + cache.get(tuple(server._cmd), server._port, server._logger) + + cache_key = (tuple(server._cmd), server._port, server._logger) + self.assertIn(cache_key, cache._cache) + self.assertEqual(cache._cache[cache_key].owners, {other_owner}) + + # 2. Verify starting the server (which registers its own owner and retrieves from cache) raises RuntimeError + with self.assertRaises(RuntimeError): + server.start() + + # 3. Verify that the destructor was called on the process, meaning no leak (even though other_owner was still registered!) + self.assertEqual(destructor_calls, [(dummy_process, "localhost:12345")]) + + # 4. Verify that the server has cleaned up its owner_id + self.assertIsNone(server._owner_id) + + # 5. Verify the cache entry has been removed completely + self.assertNotIn(cache_key, cache._cache) + + # Clean up the other owner + cache.purge(other_owner) + if __name__ == '__main__': unittest.main() From 1ec7f71768f528049390e5ebad5d28c20fbedc88 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 22 May 2026 20:30:35 -0400 Subject: [PATCH 3/6] Support modern manylinux tags based on pip version in Stager This ensures we can download pre-built wheels for environment staging rather than relying on tarball building, which is sometimes slow. --- sdks/python/apache_beam/runners/portability/stager.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index e862fde4efef..320b23749c51 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -732,10 +732,12 @@ def _get_platform_for_default_sdk_container(): # addressed, download wheel based on glibc version in Beam's Python # Base image pip_version = distribution('pip').version - if version.parse(pip_version) >= version.parse('19.3'): - # pip can only recognize manylinux2014_x86_64 wheels - # from version 19.3. - return 'manylinux2014_x86_64' + # See more information about manylinux at + # https://github.com/pypa/manylinux + if version.parse(pip_version) >= version.parse('20.3'): + return 'manylinux_2_28_x86_64' + elif version.parse(pip_version) >= version.parse('19.3'): + return 'manylinux2014' else: return 'manylinux2010_x86_64' From 5559da12e92fdc7c79dde23f46c1c2475d9e4829 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 22 May 2026 21:11:38 -0400 Subject: [PATCH 4/6] Formatting --- sdks/python/apache_beam/utils/subprocess_server_test.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/utils/subprocess_server_test.py b/sdks/python/apache_beam/utils/subprocess_server_test.py index c9fe45ff24e0..a44b89b17e37 100644 --- a/sdks/python/apache_beam/utils/subprocess_server_test.py +++ b/sdks/python/apache_beam/utils/subprocess_server_test.py @@ -466,6 +466,7 @@ def __init__(self): def test_force_remove(self): destructor_calls = [] + def custom_destructor(obj): destructor_calls.append(obj) @@ -477,7 +478,7 @@ def custom_destructor(obj): # Get object 'a' under both active owners a = cache.get('a') self.assertEqual(a[0], 'a') - self.assertIn(('a',), cache._cache) + self.assertIn(('a', ), cache._cache) # force_remove on a non-existent key should be a safe no-op cache.force_remove('non_existent') @@ -486,7 +487,7 @@ def custom_destructor(obj): cache.force_remove('a') # The cache entry should be gone - self.assertNotIn(('a',), cache._cache) + self.assertNotIn(('a', ), cache._cache) # Destructor must be called on 'a' self.assertEqual(destructor_calls, [a]) @@ -502,12 +503,14 @@ def custom_destructor(obj): def test_subprocess_server_start_failed_no_leak(self): destructor_calls = [] + def custom_destructor(obj): destructor_calls.append(obj) class DummyProcess: def __init__(self): self.args = ["dummy_cmd"] + def poll(self): return 1 # Simulate that process exited/failed From f8e2507bee220ec242da29b42924ffdedda3fa78 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 22 May 2026 21:12:29 -0400 Subject: [PATCH 5/6] Trigger more python tests. --- .github/trigger_files/beam_PostCommit_Python_Versions.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Python_Versions.json b/.github/trigger_files/beam_PostCommit_Python_Versions.json index 541dc4ea8e87..8ed972c9f579 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Versions.json +++ b/.github/trigger_files/beam_PostCommit_Python_Versions.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "revision": 2 + "revision": 3 } From 95ebb12b20029f074b2a62ac79deb2e02950fbd8 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 22 May 2026 23:46:47 -0400 Subject: [PATCH 6/6] Typo --- sdks/python/apache_beam/runners/portability/stager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 320b23749c51..136c320da009 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -737,7 +737,7 @@ def _get_platform_for_default_sdk_container(): if version.parse(pip_version) >= version.parse('20.3'): return 'manylinux_2_28_x86_64' elif version.parse(pip_version) >= version.parse('19.3'): - return 'manylinux2014' + return 'manylinux2014_x86_64' else: return 'manylinux2010_x86_64'