Skip to content
Permalink
Browse files
improve k8s resource allocation (#100)
* add DEFAULT_TRANSPORT_PORT
* split master and worker resource quantity
* transport threads don't exceed the count of workers
  • Loading branch information
coderzc committed Sep 13, 2021
1 parent 3f2961a commit 8cfc0ed329346d53b9c1615d9ad2e7c6c5467c21
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 7 deletions.
@@ -61,11 +61,15 @@ public int serverPort() {
}

public int serverThreads() {
return this.config.get(ComputerOptions.TRANSPORT_SERVER_THREADS);
return Math.min(
this.config.get(ComputerOptions.TRANSPORT_SERVER_THREADS),
this.config.get(ComputerOptions.JOB_WORKERS_COUNT));
}

public int clientThreads() {
return this.config.get(ComputerOptions.TRANSPORT_CLIENT_THREADS);
return Math.min(
this.config.get(ComputerOptions.TRANSPORT_CLIENT_THREADS),
this.config.get(ComputerOptions.JOB_WORKERS_COUNT));
}

public TransportProvider transportProvider() {
@@ -84,6 +84,7 @@ public class ComputerJobDeployer {
private static final String RPC_PORT_NAME = "rpc-port";
private static final int DEFAULT_TRANSPORT_PORT = 8099;
private static final int DEFAULT_RPC_PORT = 8090;
private static final int DEFAULT_TRANSPORT_THREADS = 8;

private static final String CONFIG_MAP_VOLUME = "config-map-volume";

@@ -196,6 +197,19 @@ private Set<ContainerPort> handleConfig(ComputerJobSpec spec) {
config.put(ComputerOptions.RPC_SERVER_PORT.name(), rpcPort);
}

/*
Set a default number of transport threads,
if the number of CPU quantity of the worker is not specified
*/
if (spec.getWorkerCpu() == null) {
String defaultThreads = String.valueOf(DEFAULT_TRANSPORT_THREADS);

config.putIfAbsent(ComputerOptions.TRANSPORT_CLIENT_THREADS.name(),
defaultThreads);
config.putIfAbsent(ComputerOptions.TRANSPORT_SERVER_THREADS.name(),
defaultThreads);
}

ContainerPort transportContainerPort = new ContainerPortBuilder()
.withName(TRANSPORT_PORT_NAME)
.withContainerPort(Integer.valueOf(transportPort))
@@ -412,8 +426,15 @@ private Container getContainer(String name, ComputerJobSpec spec,
.build();
envVars.add(jvmOptionsEnv);

Quantity masterCpu = spec.getMasterCpu();
Quantity masterMemory = spec.getMasterMemory();
Quantity cpu;
Quantity memory;
if (name.contains("master")) {
cpu = spec.getMasterCpu();
memory = spec.getMasterMemory();
} else {
cpu = spec.getWorkerCpu();
memory = spec.getWorkerMemory();
}

List<VolumeMount> volumeMounts = spec.getVolumeMounts();
if (volumeMounts == null) {
@@ -435,8 +456,8 @@ private Container getContainer(String name, ComputerJobSpec spec,
.addAllToArgs(args)
.addAllToPorts(ports)
.withNewResources()
.addToLimits(ResourceName.CPU.value(), masterCpu)
.addToLimits(ResourceName.MEMORY.value(), masterMemory)
.addToLimits(ResourceName.CPU.value(), cpu)
.addToLimits(ResourceName.MEMORY.value(), memory)
.endResources()
.build();
}
@@ -78,7 +78,7 @@ public void testListenWithDefaultPort() {
int port = this.server.listen(config, messageHandler);

TransportConf conf = this.server.conf();
Assert.assertEquals(3, conf.serverThreads());
Assert.assertLte(3, conf.serverThreads());
Assert.assertEquals(IOMode.NIO, conf.ioMode());
Assert.assertEquals("127.0.0.1",
conf.serverAddress().getHostAddress());

0 comments on commit 8cfc0ed

Please sign in to comment.