Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python_Versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"revision": 2
"revision": 3
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ def main(argv):
with fully_qualified_named_transform.FullyQualifiedNamedTransform.with_filter(
known_args.fully_qualified_name_glob):

# Bind to localhost instead of 0.0.0.0 to ensure compatibility with loopback
# connections on dual-stack (IPv4/IPv6) systems.
address = 'localhost:{}'.format(known_args.port)
address = '0.0.0.0:{}'.format(known_args.port)
server = grpc.server(thread_pool_executor.shared_unbounded_instance())
if known_args.serve_loopback_worker:
beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
Expand All @@ -73,15 +71,9 @@ def main(argv):
artifact_service.ArtifactRetrievalService(
artifact_service.BeamFilesystemHandler(None).file_reader),
server)
# Ensure gRPC server successfully binds. If this fails (e.g., due to port collision),
# add_insecure_port returns 0. We raise an error to crash the subprocess immediately,
# allowing the parent process to detect it and fail fast rather than hanging.
bound_port = server.add_insecure_port(address)
if not bound_port:
raise RuntimeError(
"Failed to bind expansion service to {}".format(address))
server.add_insecure_port(address)
Comment thread
shunping marked this conversation as resolved.
server.start()
_LOGGER.info('Listening for expansion requests at %d', bound_port)
_LOGGER.info('Listening for expansion requests at %d', known_args.port)
Comment thread
shunping marked this conversation as resolved.
Comment thread
shunping marked this conversation as resolved.

def cleanup(unused_signum, unused_frame):
_LOGGER.info('Shutting down expansion service.')
Expand Down
8 changes: 5 additions & 3 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -732,9 +732,11 @@ def _get_platform_for_default_sdk_container():
# addressed, download wheel based on glibc version in Beam's Python
# Base image
pip_version = distribution('pip').version
if version.parse(pip_version) >= version.parse('19.3'):
# pip can only recognize manylinux2014_x86_64 wheels
# from version 19.3.
# See more information about manylinux at
# https://github.com/pypa/manylinux
if version.parse(pip_version) >= version.parse('20.3'):
return 'manylinux_2_28_x86_64'
elif version.parse(pip_version) >= version.parse('19.3'):
return 'manylinux2014_x86_64'
else:
return 'manylinux2010_x86_64'
Expand Down
106 changes: 59 additions & 47 deletions sdks/python/apache_beam/utils/subprocess_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ def get(self, *key):
self._cache[key].owners.add(owner)
return self._cache[key].obj

def force_remove(self, *key):
with self._lock:
entry = self._cache.pop(key, None)
if entry is not None:
self._destructor(entry.obj)


class JavaHelper:
@classmethod
Expand Down Expand Up @@ -186,53 +192,45 @@ def __exit__(self, *unused_args):
self.stop()

def start(self):
Comment thread
shunping marked this conversation as resolved.
max_attempts = 3
for attempt in range(max_attempts):
try:
process, endpoint = self.start_process()
wait_secs = .1
channel_options = [
("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1),
# Default: 20000ms (20s), increased to 10 minutes for stability
("grpc.keepalive_timeout_ms", 600_000),
# Default: 2, set to 0 to allow unlimited pings without data
("grpc.http2.max_pings_without_data", 0),
# Default: False, set to True to allow keepalive pings when no calls
("grpc.keepalive_permit_without_calls", True),
# Default: 2, set to 0 to allow unlimited ping strikes
("grpc.http2.max_ping_strikes", 0),
# Default: 0 (disabled), enable socket reuse for better handling
("grpc.so_reuseport", 1),
]
self._grpc_channel = grpc.insecure_channel(
endpoint, options=channel_options)
channel_ready = grpc.channel_ready_future(self._grpc_channel)
while True:
if process is not None and process.poll() is not None:
_LOGGER.error("Started job service with %s", process.args)
raise RuntimeError(
'Service failed to start up with error %s' % process.poll())
try:
channel_ready.result(timeout=wait_secs)
break
except (grpc.FutureTimeoutError, grpc.RpcError):
wait_secs *= 1.2
logging.log(
logging.WARNING if wait_secs > 1 else logging.DEBUG,
'Waiting for grpc channel to be ready at %s.',
endpoint)
return self._stub_class(self._grpc_channel)
except Exception as e:
_LOGGER.warning(
"Error bringing up service on attempt %d: %s",
attempt + 1,
e,
exc_info=True)
self.stop()
if attempt == max_attempts - 1:
raise
time.sleep(1)
try:
process, endpoint = self.start_process()
wait_secs = .1
channel_options = [
("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1),
# Default: 20000ms (20s), increased to 10 minutes for stability
("grpc.keepalive_timeout_ms", 600_000),
# Default: 2, set to 0 to allow unlimited pings without data
("grpc.http2.max_pings_without_data", 0),
# Default: False, set to True to allow keepalive pings when no calls
("grpc.keepalive_permit_without_calls", True),
# Default: 2, set to 0 to allow unlimited ping strikes
("grpc.http2.max_ping_strikes", 0),
# Default: 0 (disabled), enable socket reuse for better handling
("grpc.so_reuseport", 1),
]
self._grpc_channel = grpc.insecure_channel(
endpoint, options=channel_options)
channel_ready = grpc.channel_ready_future(self._grpc_channel)
while True:
if process is not None and process.poll() is not None:
_LOGGER.error("Started job service with %s", process.args)
raise RuntimeError(
'Service failed to start up with error %s' % process.poll())
try:
channel_ready.result(timeout=wait_secs)
break
except (grpc.FutureTimeoutError, grpc.RpcError):
wait_secs *= 1.2
logging.log(
logging.WARNING if wait_secs > 1 else logging.DEBUG,
'Waiting for grpc channel to be ready at %s.',
endpoint)
return self._stub_class(self._grpc_channel)
except: # pylint: disable=bare-except
_LOGGER.exception("Error bringing up service")
self.stop_force()
raise
Comment thread
shunping marked this conversation as resolved.

def start_process(self):
if self._owner_id is not None:
Expand Down Expand Up @@ -282,6 +280,20 @@ def stop_process(self):
finally:
self._grpc_channel = None

def stop_force(self):
try:
self._cache.force_remove(tuple(self._cmd), self._port, self._logger)
finally:
self._owner_id = None
if self._grpc_channel:
try:
self._grpc_channel.close()
except: # pylint: disable=bare-except
Comment thread
shunping marked this conversation as resolved.
_LOGGER.error(
"Could not close the gRPC channel started with cmd %s", self._cmd)
finally:
self._grpc_channel = None

def _really_stop_process(process_and_endpoint):
process, _ = process_and_endpoint # pylint: disable=unpacking-non-sequence
if not process:
Expand Down
87 changes: 87 additions & 0 deletions sdks/python/apache_beam/utils/subprocess_server_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,93 @@ def __init__(self):
# without raising ValueError.
server.stop_process()

def test_force_remove(self):
destructor_calls = []

def custom_destructor(obj):
destructor_calls.append(obj)

cache = subprocess_server._SharedCache(self.with_prefix, custom_destructor)

owner1 = cache.register()
owner2 = cache.register()

# Get object 'a' under both active owners
a = cache.get('a')
self.assertEqual(a[0], 'a')
self.assertIn(('a', ), cache._cache)

# force_remove on a non-existent key should be a safe no-op
cache.force_remove('non_existent')

# Call force_remove, which should bypass the owners check and delete it immediately
cache.force_remove('a')

# The cache entry should be gone
self.assertNotIn(('a', ), cache._cache)

# Destructor must be called on 'a'
self.assertEqual(destructor_calls, [a])

# Retrieving 'a' again under the active owners should construct a new object
new_a = cache.get('a')
self.assertNotEqual(new_a, a)
self.assertEqual(new_a[0], 'a')

# Clean up
cache.purge(owner1)
cache.purge(owner2)

def test_subprocess_server_start_failed_no_leak(self):
destructor_calls = []

def custom_destructor(obj):
destructor_calls.append(obj)

class DummyProcess:
def __init__(self):
self.args = ["dummy_cmd"]

def poll(self):
return 1 # Simulate that process exited/failed

dummy_process = DummyProcess()
cache = subprocess_server._SharedCache(
lambda *args: (dummy_process, "localhost:12345"), custom_destructor)

# 1. Register an independent, unrelated owner in the cache first.
other_owner = cache.register()

class CustomServer(subprocess_server.SubprocessServer):
_cache = cache

def __init__(self):
super().__init__(lambda channel: None, ["dummy_cmd"], port=12345)

server = CustomServer()
# Fetch the process using other_owner, creating the cache entry and registering other_owner on it.
cache.get(tuple(server._cmd), server._port, server._logger)

cache_key = (tuple(server._cmd), server._port, server._logger)
self.assertIn(cache_key, cache._cache)
self.assertEqual(cache._cache[cache_key].owners, {other_owner})

# 2. Verify starting the server (which registers its own owner and retrieves from cache) raises RuntimeError
with self.assertRaises(RuntimeError):
server.start()

# 3. Verify that the destructor was called on the process, meaning no leak (even though other_owner was still registered!)
self.assertEqual(destructor_calls, [(dummy_process, "localhost:12345")])

# 4. Verify that the server has cleaned up its owner_id
self.assertIsNone(server._owner_id)

# 5. Verify the cache entry has been removed completely
self.assertNotIn(cache_key, cache._cache)

# Clean up the other owner
cache.purge(other_owner)


if __name__ == '__main__':
unittest.main()
Loading