Skip to content

Commit

Permalink
[FLINK-33532][rpc] Support configured akka remote dispatcher thread p…
Browse files Browse the repository at this point in the history
…ool size
  • Loading branch information
caodizhou authored and KarmaGYZ committed Nov 16, 2023
1 parent 012704d commit 8ef71ba
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 0 deletions.
18 changes: 18 additions & 0 deletions docs/layouts/shortcodes/generated/akka_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@
<td>Integer</td>
<td>Min number of threads to cap factor-based parallelism number to.</td>
</tr>
<tr>
<td><h5>pekko.remote-fork-join-executor.parallelism-factor</h5></td>
<td style="word-wrap: break-word;">2.0</td>
<td>Double</td>
<td>The parallelism factor is used to determine thread pool size using the following formula: ceil(available processors * factor). Resulting size is then bounded by the parallelism-min and parallelism-max values.</td>
</tr>
<tr>
<td><h5>pekko.remote-fork-join-executor.parallelism-max</h5></td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
<td>Max number of threads to cap factor-based parallelism number to.</td>
</tr>
<tr>
<td><h5>pekko.remote-fork-join-executor.parallelism-min</h5></td>
<td style="word-wrap: break-word;">8</td>
<td>Integer</td>
<td>Min number of threads to cap factor-based parallelism number to.</td>
</tr>
<tr>
<td><h5>pekko.framesize</h5></td>
<td style="word-wrap: break-word;">"10485760b"</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,38 @@ public static boolean isForceRpcInvocationSerializationEnabled(Configuration con
"Max number of threads to cap factor-based parallelism number to.")
.build());

public static final ConfigOption<Double> REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR =
ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-factor")
.doubleType()
.defaultValue(2.0)
.withDescription(
Description.builder()
.text(
"The parallelism factor is used to determine thread pool size using the"
+ " following formula: ceil(available processors * factor). Resulting size"
+ " is then bounded by the parallelism-min and parallelism-max values.")
.build());

public static final ConfigOption<Integer> REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN =
ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-min")
.intType()
.defaultValue(8)
.withDescription(
Description.builder()
.text(
"Min number of threads to cap factor-based parallelism number to.")
.build());

public static final ConfigOption<Integer> REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX =
ConfigOptions.key("pekko.remote-fork-join-executor.parallelism-max")
.intType()
.defaultValue(16)
.withDescription(
Description.builder()
.text(
"Max number of threads to cap factor-based parallelism number to.")
.build());

// ==================================================
// Configurations for client-socket-work-pool.
// ==================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,4 +273,17 @@ public static RpcSystem.ForkJoinExecutorConfiguration getForkJoinExecutorConfigu
return new RpcSystem.ForkJoinExecutorConfiguration(
parallelismFactor, minParallelism, maxParallelism);
}

public static RpcSystem.ForkJoinExecutorConfiguration getRemoteForkJoinExecutorConfiguration(
final Configuration configuration) {
final double parallelismFactor =
configuration.getDouble(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
final int minParallelism =
configuration.getInteger(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
final int maxParallelism =
configuration.getInteger(AkkaOptions.REMOTE_FORK_JOIN_EXECUTOR_PARALLELISM_MAX);

return new RpcSystem.ForkJoinExecutorConfiguration(
parallelismFactor, minParallelism, maxParallelism);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ private static Config getRemoteConfig(
addBaseRemoteConfig(builder, configuration, port, externalPort);
addHostnameRemoteConfig(builder, bindAddress, externalHostname);
addSslRemoteConfig(builder, configuration);
addRemoteForkJoinExecutorConfig(
builder,
ActorSystemBootstrapTools.getRemoteForkJoinExecutorConfiguration(configuration));

return builder.build();
}
Expand Down Expand Up @@ -384,6 +387,27 @@ private static void addSslRemoteConfig(
.add("}");
}

private static Config addRemoteForkJoinExecutorConfig(
ConfigBuilder builder, RpcSystem.ForkJoinExecutorConfiguration configuration) {
final double parallelismFactor = configuration.getParallelismFactor();
final int minNumThreads = configuration.getMinParallelism();
final int maxNumThreads = configuration.getMaxParallelism();

return builder.add("pekko {")
.add(" remote {")
.add(" default-remote-dispatcher {")
.add(" executor = fork-join-executor")
.add(" fork-join-executor {")
.add(" parallelism-factor = " + parallelismFactor)
.add(" parallelism-min = " + minNumThreads)
.add(" parallelism-max = " + maxNumThreads)
.add(" }")
.add(" }")
.add(" }")
.add("}")
.build();
}

/**
* Creates a local actor system without remoting.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,15 @@ void getConfigDefaultsToForkJoinExecutor() {
.isEqualTo("fork-join-executor");
}

@Test
void getConfigDefaultsToRemoteForkJoinExecutor() {
final Config config =
PekkoUtils.getConfig(new Configuration(), new HostAndPort("localhost", 1234));

assertThat(config.getString("pekko.remote.default-remote-dispatcher.executor"))
.isEqualTo("fork-join-executor");
}

@Test
void getConfigSetsExecutorWithThreadPriority() {
final int threadPriority = 3;
Expand Down

0 comments on commit 8ef71ba

Please sign in to comment.