Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32811][runtime] Add port range support for "taskmanager.data.bind-port" #23183

Merged
merged 1 commit into from
Aug 29, 2023

Conversation

ferenc-csaky
Copy link
Contributor

What is the purpose of the change

Adds port range support for taskmanager.data.bind-port, which can be helpful in a restrictive network setup.

Brief change log

  • Changed taskmanager.data.bind-port opion type from Integer to String.
  • Introduced PortRange utility to be able to carry the port range string and its generated iterator together.
  • Updated NettyConfig to handle a PortRange instead of a port as the port of the server.
  • Updated NettyServer to be able to initialize itself from a PortRange.
  • Adapted relevant code pieces and documentation.

Verifying this change

This change added tests and can be verified as follows:

  • Added NettyServerFromPortRangeTest to test Netty server init from a port range.
  • Manually verified by setting taskmanager.data.bind-port to a port range (e.g. 55000-55100) while taskmanager.data.port is set to 0, then TM should bind to the first available port from the given range both locally and externally.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 9, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@mbalassi mbalassi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I run this locally, it makes sense to add this improvement. It would be great if we could make it explicit in the logs (maybe adding an extra log line) that the following is for the data bind port:

2023-08-26 21:30:45,211 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig        [] - NettyConfig [server address: localhost/127.0.0.1, server port range: 55000-55100, ssl enabled: false, memory segment size (bytes): 32768, transport type: AUTO, number of server threads: 1 (manual), number of client threads: 1 (manual), server connect backlog: 0 (use Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
2023-08-26 21:30:45,246 INFO  org.apache.flink.runtime.io.network.NettyShuffleServiceFactory [] - Created a new FileChannelManager for storing result partitions of BLOCKING shuffles. Used directories:
        /var/folders/yh/t9bt8gwj4zsd949jnr55vx980000gn/T/flink-netty-shuffle-9c342614-dc5f-467f-9a9d-1736246bdc62
2023-08-26 21:30:45,280 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool [] - Allocated 128 MB for network buffer pool (number of memory segments: 4096, bytes per segment: 32768).
2023-08-26 21:30:45,289 INFO  org.apache.flink.runtime.io.network.NettyShuffleEnvironment  [] - Starting the network environment and its components.
2023-08-26 21:30:45,309 INFO  org.apache.flink.runtime.io.network.netty.NettyClient        [] - Transport type 'auto': using NIO.
2023-08-26 21:30:45,310 INFO  org.apache.flink.runtime.io.network.netty.NettyClient        [] - Successful initialization (took 20 ms).
2023-08-26 21:30:45,312 INFO  org.apache.flink.runtime.io.network.netty.NettyServer        [] - Transport type 'auto': using NIO.
2023-08-26 21:30:45,341 INFO  org.apache.flink.runtime.io.network.netty.NettyServer        [] - Successful initialization (took 30 ms). Listening on SocketAddress /127.0.0.1:55000.

bootstrap.localAddress(config.getServerAddress(), port);
try {
bindFuture = bootstrap.bind().syncUninterruptibly();
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we be more specific than just Exception here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably yes, but Netty wraps and throws any kind of exception via FailedChannelFuture, which is not specified really well, so not without a lot more digging.

Furthermore I copied this "pattern" from the codebase, so I considered it good enough:

@ferenc-csaky
Copy link
Contributor Author

I run this locally, it makes sense to add this improvement. It would be great if we could make it explicit in the logs (maybe adding an extra log line) that the following is for the data bind port:

2023-08-26 21:30:45,211 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig        [] - NettyConfig [server address: localhost/127.0.0.1, server port range: 55000-55100, ssl enabled: false, memory segment size (bytes): 32768, transport type: AUTO, number of server threads: 1 (manual), number of client threads: 1 (manual), server connect backlog: 0 (use Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
2023-08-26 21:30:45,246 INFO  org.apache.flink.runtime.io.network.NettyShuffleServiceFactory [] - Created a new FileChannelManager for storing result partitions of BLOCKING shuffles. Used directories:
        /var/folders/yh/t9bt8gwj4zsd949jnr55vx980000gn/T/flink-netty-shuffle-9c342614-dc5f-467f-9a9d-1736246bdc62
2023-08-26 21:30:45,280 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool [] - Allocated 128 MB for network buffer pool (number of memory segments: 4096, bytes per segment: 32768).
2023-08-26 21:30:45,289 INFO  org.apache.flink.runtime.io.network.NettyShuffleEnvironment  [] - Starting the network environment and its components.
2023-08-26 21:30:45,309 INFO  org.apache.flink.runtime.io.network.netty.NettyClient        [] - Transport type 'auto': using NIO.
2023-08-26 21:30:45,310 INFO  org.apache.flink.runtime.io.network.netty.NettyClient        [] - Successful initialization (took 20 ms).
2023-08-26 21:30:45,312 INFO  org.apache.flink.runtime.io.network.netty.NettyServer        [] - Transport type 'auto': using NIO.
2023-08-26 21:30:45,341 INFO  org.apache.flink.runtime.io.network.netty.NettyServer        [] - Successful initialization (took 30 ms). Listening on SocketAddress /127.0.0.1:55000.

I'm not sure where you think it would be appropriate. I think Netty specific classes are too general to add that info explicitly, what can be done easily is to highlight the listening port explicitly when the connection is successfully established (last line is the newly added log):

2023-08-27 09:42:16,255 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Starting TaskManager with ResourceID: localhost:59695-945f3f
2023-08-27 09:42:16,286 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices    [] - Temporary file directory '/var/folders/wp/ccy48gw1255bswh9bx9svxjc0000gn/T': total 460 GB, usable 125 GB (27.17% usable)
2023-08-27 09:42:16,287 INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager         [] - Created a new FileChannelManager for spilling of task related data to disk (joins, sorting, ...). Used directories:
        /var/folders/wp/ccy48gw1255bswh9bx9svxjc0000gn/T/flink-io-6c07c533-5a9f-42e2-abeb-a3b35042bd57
2023-08-27 09:42:16,291 INFO  org.apache.flink.runtime.io.network.netty.NettyConfig        [] - NettyConfig [server address: localhost/127.0.0.1, server port range: 55000-55100, ssl enabled: false, memory segment size (bytes): 32768, transport type: AUTO, number of server threads: 1 (manual), number of client threads: 1 (manual), server connect backlog: 0 (use Netty's default), client connect timeout (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
2023-08-27 09:42:16,324 INFO  org.apache.flink.runtime.io.network.NettyShuffleServiceFactory [] - Created a new FileChannelManager for storing result partitions of BLOCKING shuffles. Used directories:
        /var/folders/wp/ccy48gw1255bswh9bx9svxjc0000gn/T/flink-netty-shuffle-2e98bd72-3306-4ddc-b551-3d3c77d95cfa
2023-08-27 09:42:16,345 INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool [] - Allocated 128 MB for network buffer pool (number of memory segments: 4096, bytes per segment: 32768).
2023-08-27 09:42:16,352 INFO  org.apache.flink.runtime.io.network.NettyShuffleEnvironment  [] - Starting the network environment and its components.
2023-08-27 09:42:16,374 INFO  org.apache.flink.runtime.io.network.netty.NettyClient        [] - Transport type 'auto': using NIO.
2023-08-27 09:42:16,374 INFO  org.apache.flink.runtime.io.network.netty.NettyClient        [] - Successful initialization (took 21 ms).
2023-08-27 09:42:16,376 INFO  org.apache.flink.runtime.io.network.netty.NettyServer        [] - Transport type 'auto': using NIO.
2023-08-27 09:42:16,395 INFO  org.apache.flink.runtime.io.network.netty.NettyServer        [] - Successful initialization (took 19 ms). Listening on SocketAddress /127.0.0.1:55000.
2023-08-27 09:42:16,395 INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices    [] - TaskManager connection initialized successully; listening internally on port: 55000

The other option would be to log it before the process IMO, but since the whole thing is started by the TaskManagerRunner, it would be more like redundant information beforehand. Tying the actual internal listening port to the TM explicitly can be useful and make things moe obvious.

WDYT?

@mbalassi
Copy link
Contributor

Thanks, this makes sense imho. Unless there are any objections I will merge this tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants