Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
55cb276
increase grpc keepalive timeout and adjust ping settings
liferoad Oct 15, 2025
e268dc3
yapf
liferoad Oct 15, 2025
0520967
perf(subprocess_server): add grpc keepalive options to improve connec…
liferoad Oct 15, 2025
0ca7bcb
perf(grpc): increase keepalive and ping intervals to reduce frequency
liferoad Oct 15, 2025
c617972
format
liferoad Oct 15, 2025
4196239
more changes
liferoad Oct 16, 2025
3f59e83
fix(milvus): increase timeout to 60s for container startup
liferoad Oct 16, 2025
1b785f9
fix(io): handle empty init_result in FileBasedSink by falling back to…
liferoad Oct 16, 2025
8da691d
Merge branch 'master' into grpc-options
liferoad Oct 16, 2025
079b5d7
retry Milvus
liferoad Oct 16, 2025
23a8609
style: use string formatting in milvus search logging
liferoad Oct 16, 2025
e606e24
Merge branch 'master' into grpc-options
liferoad Oct 16, 2025
633cf93
fixed external tests
liferoad Oct 17, 2025
811e0e2
Merge branch 'master' into grpc-options
liferoad Oct 17, 2025
9234046
tests
liferoad Oct 17, 2025
f56221c
Merge branch 'master' into grpc-options
liferoad Oct 17, 2025
0b444e6
fix(enrichment_test): sort output and expected values before comparison
liferoad Oct 17, 2025
9f65988
docs(filebasedsink): add TODO comment for prism issue
liferoad Oct 20, 2025
ff2f13a
Merge branch 'master' into grpc-options
liferoad Oct 20, 2025
aec1d2a
more tunes on the grpc options
liferoad Oct 20, 2025
16d93fa
Merge branch 'master' into grpc-options
liferoad Oct 21, 2025
82ce53a
Merge branch 'master' into grpc-options
liferoad Oct 31, 2025
4a5ddce
addressed some comments
liferoad Oct 31, 2025
774ff13
removed some options
liferoad Oct 31, 2025
600fed4
Merge branch 'master' into grpc-options
liferoad Nov 6, 2025
8853082
keep 300000 for keepalive_timeout_ms
liferoad Nov 6, 2025
a5822ec
fixed the comments
liferoad Nov 7, 2025
603bbf0
added keepalive_time_ms back
liferoad Nov 7, 2025
5659767
Update sdks/python/apache_beam/utils/subprocess_server.py
tvalentyn Nov 14, 2025
49c851b
address comments.
tvalentyn Nov 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions sdks/python/apache_beam/runners/worker/channel_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@

class GRPCChannelFactory(grpc.StreamStreamClientInterceptor):
DEFAULT_OPTIONS = [
("grpc.keepalive_time_ms", 20000),
("grpc.keepalive_timeout_ms", 300000),
# Setting keepalive_time_ms is needed for other options to work.
("grpc.keepalive_time_ms", 20_000),
# Default: 20s. Increasing to 5 min.
("grpc.keepalive_timeout_ms", 300_000),
# Default: 2, set to 0 to allow unlimited pings without data
("grpc.http2.max_pings_without_data", 0),
# Default: False, set to True to allow keepalive pings when no calls
("grpc.keepalive_permit_without_calls", True),
]

def __init__(self):
Expand Down
16 changes: 14 additions & 2 deletions sdks/python/apache_beam/utils/subprocess_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,20 @@ def start(self):
try:
process, endpoint = self.start_process()
wait_secs = .1
channel_options = [("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1)]
channel_options = [
("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1),
# Default: 20000ms (20s), increased to 10 minutes for stability
("grpc.keepalive_timeout_ms", 600_000),
# Default: 2, set to 0 to allow unlimited pings without data
("grpc.http2.max_pings_without_data", 0),
# Default: False, set to True to allow keepalive pings when no calls
("grpc.keepalive_permit_without_calls", True),
# Default: 2, set to 0 to allow unlimited ping strikes
("grpc.http2.max_ping_strikes", 0),
# Default: 0 (disabled), enable socket reuse for better handling
("grpc.so_reuseport", 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not going to do anything usefull unless you specifically bind multiple server listeners to the same port. See https://github.com/grpc/grpc/tree/master/examples/python/multiprocessing#calculating-prime-numbers-with-multiple-processes.

This also can be useful when you want to bind a socket to a random (:0) port, get the actual port number, and only then bind the server to it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For prism runner, if the job_port pipeline option is not specified, its default value is 0:

parser.add_argument(
'--job_port',
default=0,

This port number is used by prism to call pick_port:

job_port, = subprocess_server.pick_port(self._job_port)

Inside pick_port, we will first bind socket to 0 and then retrieve the port number if the port argument is 0.

s.bind(('localhost', 0))
return s.getsockname()[1]

This is exactly the case where this grpc option is useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! With this option, you don't need to close the socket for the found port anymore, as you'll be able to bind and serve on it:

# Close sockets only now to avoid the same port to be chosen twice
for s in sockets:
s.close()

You'll need to bind the initial socket with SO_REUSEPORT, ideally with SO_REUSEADDR as well.

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

This approach addresses the race condition where an unused port was found by one process, closes the socket, but before this process starts listening on the found port, it's acquired by another process, resulting in EADDRINUSE.

By not closing the socket until the server stops listening, you'll prevent other processes from seeing that port as unused.

Note that this only applies to systems where SO_REUSEPORT is supported.

]
self._grpc_channel = grpc.insecure_channel(
endpoint, options=channel_options)
channel_ready = grpc.channel_ready_future(self._grpc_channel)
Expand Down
Loading