diff --git a/.github/trigger_files/beam_PostCommit_Python_Versions.json b/.github/trigger_files/beam_PostCommit_Python_Versions.json index 541dc4ea8e87..8ed972c9f579 100644 --- a/.github/trigger_files/beam_PostCommit_Python_Versions.json +++ b/.github/trigger_files/beam_PostCommit_Python_Versions.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "revision": 2 + "revision": 3 } diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_main.py b/sdks/python/apache_beam/runners/portability/expansion_service_main.py index f2d03e0e898c..269d02b3efbd 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service_main.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service_main.py @@ -55,9 +55,7 @@ def main(argv): with fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter( known_args.fully_qualified_name_glob): - # Bind to localhost instead of 0.0.0.0 to ensure compatibility with loopback - # connections on dual-stack (IPv4/IPv6) systems. - address = 'localhost:{}'.format(known_args.port) + address = '0.0.0.0:{}'.format(known_args.port) server = grpc.server(thread_pool_executor.shared_unbounded_instance()) if known_args.serve_loopback_worker: beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server( @@ -73,15 +71,9 @@ def main(argv): artifact_service.ArtifactRetrievalService( artifact_service.BeamFilesystemHandler(None).file_reader), server) - # Ensure gRPC server successfully binds. If this fails (e.g., due to port collision), - # add_insecure_port returns 0. We raise an error to crash the subprocess immediately, - # allowing the parent process to detect it and fail fast rather than hanging. - bound_port = server.add_insecure_port(address) - if not bound_port: - raise RuntimeError( - "Failed to bind expansion service to {}".format(address)) + server.add_insecure_port(address) server.start() - _LOGGER.info('Listening for expansion requests at %d', bound_port) + _LOGGER.info('Listening for expansion requests at %d', known_args.port) def cleanup(unused_signum, unused_frame): _LOGGER.info('Shutting down expansion service.') diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index e862fde4efef..136c320da009 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -732,9 +732,11 @@ def _get_platform_for_default_sdk_container(): # addressed, download wheel based on glibc version in Beam's Python # Base image pip_version = distribution('pip').version - if version.parse(pip_version) >= version.parse('19.3'): - # pip can only recognize manylinux2014_x86_64 wheels - # from version 19.3. + # See more information about manylinux at + # https://github.com/pypa/manylinux + if version.parse(pip_version) >= version.parse('20.3'): + return 'manylinux_2_28_x86_64' + elif version.parse(pip_version) >= version.parse('19.3'): return 'manylinux2014_x86_64' else: return 'manylinux2010_x86_64' diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index b22e6badb5e7..0b09b364362f 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -118,6 +118,12 @@ def get(self, *key): self._cache[key].owners.add(owner) return self._cache[key].obj + def force_remove(self, *key): + with self._lock: + entry = self._cache.pop(key, None) + if entry is not None: + self._destructor(entry.obj) + class JavaHelper: @classmethod @@ -186,53 +192,45 @@ def __exit__(self, *unused_args): self.stop() def start(self): - max_attempts = 3 - for attempt in range(max_attempts): - try: - process, endpoint = self.start_process() - wait_secs = .1 - channel_options = [ - ("grpc.max_receive_message_length", -1), - ("grpc.max_send_message_length", -1), - # Default: 20000ms (20s), increased to 10 minutes for stability - ("grpc.keepalive_timeout_ms", 600_000), - # Default: 2, set to 0 to allow unlimited pings without data - ("grpc.http2.max_pings_without_data", 0), - # Default: False, set to True to allow keepalive pings when no calls - ("grpc.keepalive_permit_without_calls", True), - # Default: 2, set to 0 to allow unlimited ping strikes - ("grpc.http2.max_ping_strikes", 0), - # Default: 0 (disabled), enable socket reuse for better handling - ("grpc.so_reuseport", 1), - ] - self._grpc_channel = grpc.insecure_channel( - endpoint, options=channel_options) - channel_ready = grpc.channel_ready_future(self._grpc_channel) - while True: - if process is not None and process.poll() is not None: - _LOGGER.error("Started job service with %s", process.args) - raise RuntimeError( - 'Service failed to start up with error %s' % process.poll()) - try: - channel_ready.result(timeout=wait_secs) - break - except (grpc.FutureTimeoutError, grpc.RpcError): - wait_secs *= 1.2 - logging.log( - logging.WARNING if wait_secs > 1 else logging.DEBUG, - 'Waiting for grpc channel to be ready at %s.', - endpoint) - return self._stub_class(self._grpc_channel) - except Exception as e: - _LOGGER.warning( - "Error bringing up service on attempt %d: %s", - attempt + 1, - e, - exc_info=True) - self.stop() - if attempt == max_attempts - 1: - raise - time.sleep(1) + try: + process, endpoint = self.start_process() + wait_secs = .1 + channel_options = [ + ("grpc.max_receive_message_length", -1), + ("grpc.max_send_message_length", -1), + # Default: 20000ms (20s), increased to 10 minutes for stability + ("grpc.keepalive_timeout_ms", 600_000), + # Default: 2, set to 0 to allow unlimited pings without data + ("grpc.http2.max_pings_without_data", 0), + # Default: False, set to True to allow keepalive pings when no calls + ("grpc.keepalive_permit_without_calls", True), + # Default: 2, set to 0 to allow unlimited ping strikes + ("grpc.http2.max_ping_strikes", 0), + # Default: 0 (disabled), enable socket reuse for better handling + ("grpc.so_reuseport", 1), + ] + self._grpc_channel = grpc.insecure_channel( + endpoint, options=channel_options) + channel_ready = grpc.channel_ready_future(self._grpc_channel) + while True: + if process is not None and process.poll() is not None: + _LOGGER.error("Started job service with %s", process.args) + raise RuntimeError( + 'Service failed to start up with error %s' % process.poll()) + try: + channel_ready.result(timeout=wait_secs) + break + except (grpc.FutureTimeoutError, grpc.RpcError): + wait_secs *= 1.2 + logging.log( + logging.WARNING if wait_secs > 1 else logging.DEBUG, + 'Waiting for grpc channel to be ready at %s.', + endpoint) + return self._stub_class(self._grpc_channel) + except: # pylint: disable=bare-except + _LOGGER.exception("Error bringing up service") + self.stop_force() + raise def start_process(self): if self._owner_id is not None: @@ -282,6 +280,20 @@ def stop_process(self): finally: self._grpc_channel = None + def stop_force(self): + try: + self._cache.force_remove(tuple(self._cmd), self._port, self._logger) + finally: + self._owner_id = None + if self._grpc_channel: + try: + self._grpc_channel.close() + except: # pylint: disable=bare-except + _LOGGER.error( + "Could not close the gRPC channel started with cmd %s", self._cmd) + finally: + self._grpc_channel = None + def _really_stop_process(process_and_endpoint): process, _ = process_and_endpoint # pylint: disable=unpacking-non-sequence if not process: diff --git a/sdks/python/apache_beam/utils/subprocess_server_test.py b/sdks/python/apache_beam/utils/subprocess_server_test.py index 073b8b3bcbe8..a44b89b17e37 100644 --- a/sdks/python/apache_beam/utils/subprocess_server_test.py +++ b/sdks/python/apache_beam/utils/subprocess_server_test.py @@ -464,6 +464,93 @@ def __init__(self): # without raising ValueError. server.stop_process() + def test_force_remove(self): + destructor_calls = [] + + def custom_destructor(obj): + destructor_calls.append(obj) + + cache = subprocess_server._SharedCache(self.with_prefix, custom_destructor) + + owner1 = cache.register() + owner2 = cache.register() + + # Get object 'a' under both active owners + a = cache.get('a') + self.assertEqual(a[0], 'a') + self.assertIn(('a', ), cache._cache) + + # force_remove on a non-existent key should be a safe no-op + cache.force_remove('non_existent') + + # Call force_remove, which should bypass the owners check and delete it immediately + cache.force_remove('a') + + # The cache entry should be gone + self.assertNotIn(('a', ), cache._cache) + + # Destructor must be called on 'a' + self.assertEqual(destructor_calls, [a]) + + # Retrieving 'a' again under the active owners should construct a new object + new_a = cache.get('a') + self.assertNotEqual(new_a, a) + self.assertEqual(new_a[0], 'a') + + # Clean up + cache.purge(owner1) + cache.purge(owner2) + + def test_subprocess_server_start_failed_no_leak(self): + destructor_calls = [] + + def custom_destructor(obj): + destructor_calls.append(obj) + + class DummyProcess: + def __init__(self): + self.args = ["dummy_cmd"] + + def poll(self): + return 1 # Simulate that process exited/failed + + dummy_process = DummyProcess() + cache = subprocess_server._SharedCache( + lambda *args: (dummy_process, "localhost:12345"), custom_destructor) + + # 1. Register an independent, unrelated owner in the cache first. + other_owner = cache.register() + + class CustomServer(subprocess_server.SubprocessServer): + _cache = cache + + def __init__(self): + super().__init__(lambda channel: None, ["dummy_cmd"], port=12345) + + server = CustomServer() + # Fetch the process using other_owner, creating the cache entry and registering other_owner on it. + cache.get(tuple(server._cmd), server._port, server._logger) + + cache_key = (tuple(server._cmd), server._port, server._logger) + self.assertIn(cache_key, cache._cache) + self.assertEqual(cache._cache[cache_key].owners, {other_owner}) + + # 2. Verify starting the server (which registers its own owner and retrieves from cache) raises RuntimeError + with self.assertRaises(RuntimeError): + server.start() + + # 3. Verify that the destructor was called on the process, meaning no leak (even though other_owner was still registered!) + self.assertEqual(destructor_calls, [(dummy_process, "localhost:12345")]) + + # 4. Verify that the server has cleaned up its owner_id + self.assertIsNone(server._owner_id) + + # 5. Verify the cache entry has been removed completely + self.assertNotIn(cache_key, cache._cache) + + # Clean up the other owner + cache.purge(other_owner) + if __name__ == '__main__': unittest.main()