Skip to content

Commit

Permalink
Allow dynamic update of read and write limits set for Servers
Browse files Browse the repository at this point in the history
  • Loading branch information
sarankk authored and vietj committed Feb 6, 2024
1 parent 3ab73e0 commit 6ddadec
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 17 deletions.
12 changes: 12 additions & 0 deletions src/main/asciidoc/net.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,18 @@ through {@link io.vertx.core.net.NetServerOptions} and for HttpServer it can be
{@link examples.NetExamples#configureTrafficShapingForHttpServer}
----

These traffic shaping options can also be dynamically updated after server start.

[source,$lang]
----
{@link examples.NetExamples#dynamicallyUpdateTrafficShapingForNetServer}
----

[source,$lang]
----
{@link examples.NetExamples#dynamicallyUpdateTrafficShapingForHttpServer}
----

[[ssl]]
=== Configuring servers and clients to work with SSL/TLS

Expand Down
35 changes: 35 additions & 0 deletions src/main/java/examples/NetExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,23 @@ public void configureTrafficShapingForNetServer(Vertx vertx) {
NetServer server = vertx.createNetServer(options);
}

public void dynamicallyUpdateTrafficShapingForNetServer(Vertx vertx) {
NetServerOptions options = new NetServerOptions()
.setHost("localhost")
.setPort(1234)
.setTrafficShapingOptions(new TrafficShapingOptions()
.setInboundGlobalBandwidth(64 * 1024)
.setOutboundGlobalBandwidth(128 * 1024));
NetServer server = vertx.createNetServer(options);
TrafficShapingOptions update = new TrafficShapingOptions()
.setInboundGlobalBandwidth(2 * 64 * 1024) // twice
.setOutboundGlobalBandwidth(128 * 1024); // unchanged
server
.listen(1234, "localhost")
// wait until traffic shaping handler is created for updates
.onSuccess(v -> server.updateTrafficShapingOptions(update));
}

public void configureTrafficShapingForHttpServer(Vertx vertx) {
HttpServerOptions options = new HttpServerOptions()
.setHost("localhost")
Expand All @@ -731,4 +748,22 @@ public void configureTrafficShapingForHttpServer(Vertx vertx) {

HttpServer server = vertx.createHttpServer(options);
}


public void dynamicallyUpdateTrafficShapingForHttpServer(Vertx vertx) {
HttpServerOptions options = new HttpServerOptions()
.setHost("localhost")
.setPort(1234)
.setTrafficShapingOptions(new TrafficShapingOptions()
.setInboundGlobalBandwidth(64 * 1024)
.setOutboundGlobalBandwidth(128 * 1024));
HttpServer server = vertx.createHttpServer(options);
TrafficShapingOptions update = new TrafficShapingOptions()
.setInboundGlobalBandwidth(2 * 64 * 1024) // twice
.setOutboundGlobalBandwidth(128 * 1024); // unchanged
server
.listen(1234, "localhost")
// wait until traffic shaping handler is created for updates
.onSuccess(v -> server.updateTrafficShapingOptions(update));
}
}
9 changes: 9 additions & 0 deletions src/main/java/io/vertx/core/http/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.vertx.core.metrics.Measured;
import io.vertx.core.net.SSLOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.TrafficShapingOptions;
import io.vertx.core.net.impl.SocketAddressImpl;
import io.vertx.core.streams.ReadStream;

Expand Down Expand Up @@ -182,6 +183,14 @@ default void updateSSLOptions(SSLOptions options, boolean force, Handler<AsyncRe
}
}

/**
* Update traffic shaping options {@code options}, the update happens if valid values are passed for traffic
* shaping options. This update happens synchronously and at best effort for rate update to take effect immediately.
*
* @param options the new traffic shaping options
*/
void updateTrafficShapingOptions(TrafficShapingOptions options);

/**
* Tell the server to start listening. The server will listen on the port and host specified in the
* {@link io.vertx.core.http.HttpServerOptions} that was used when creating the server.
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/vertx/core/net/NetServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,12 @@ default void updateSSLOptions(SSLOptions options, boolean force, Handler<AsyncRe
fut.onComplete(handler);
}
}

/**
* Update traffic shaping options {@code options}, the update happens if valid values are passed for traffic
* shaping options. This update happens synchronously and at best effort for rate update to take effect immediately.
*
* @param options the new traffic shaping options
*/
void updateTrafficShapingOptions(TrafficShapingOptions options);
}
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/net/TrafficShapingOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ public TrafficShapingOptions setMaxDelayToWaitUnit(TimeUnit maxDelayToWaitTimeUn
}

/**
* Set the delay between two computations of performances for channels or 0 if no stats are to be computed
* Set the delay between two computations of performances for channels
*
* @param checkIntervalForStats delay between two computations of performances
* @return a reference to this, so the API can be used fluently
*/
public TrafficShapingOptions setCheckIntervalForStats(long checkIntervalForStats) {
this.checkIntervalForStats = checkIntervalForStats;
ObjectUtil.checkPositive(this.checkIntervalForStats, "checkIntervalForStats");
ObjectUtil.checkPositiveOrZero(this.checkIntervalForStats, "checkIntervalForStats");
return this;
}

Expand Down
31 changes: 27 additions & 4 deletions src/main/java/io/vertx/core/net/impl/TCPServerBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,13 @@ private GlobalTrafficShapingHandler createTrafficShapingHandler(EventLoopGroup e
return null;
}
GlobalTrafficShapingHandler trafficShapingHandler;
if (options.getMaxDelayToWait() != 0 && options.getCheckIntervalForStats() != 0) {
if (options.getMaxDelayToWait() != 0) {
long maxDelayToWaitInMillis = options.getMaxDelayToWaitTimeUnit().toMillis(options.getMaxDelayToWait());
long checkIntervalForStatsInMillis = options.getCheckIntervalForStatsTimeUnit().toMillis(options.getCheckIntervalForStats());
trafficShapingHandler = new GlobalTrafficShapingHandler(eventLoopGroup, options.getOutboundGlobalBandwidth(), options.getInboundGlobalBandwidth(), checkIntervalForStatsInMillis, maxDelayToWaitInMillis);
} else if (options.getCheckIntervalForStats() != 0) {
} else {
long checkIntervalForStatsInMillis = options.getCheckIntervalForStatsTimeUnit().toMillis(options.getCheckIntervalForStats());
trafficShapingHandler = new GlobalTrafficShapingHandler(eventLoopGroup, options.getOutboundGlobalBandwidth(), options.getInboundGlobalBandwidth(), checkIntervalForStatsInMillis);
} else {
trafficShapingHandler = new GlobalTrafficShapingHandler(eventLoopGroup, options.getOutboundGlobalBandwidth(), options.getInboundGlobalBandwidth());
}
if (options.getPeakOutboundGlobalBandwidth() != 0) {
trafficShapingHandler.setMaxGlobalWriteSize(options.getPeakOutboundGlobalBandwidth());
Expand Down Expand Up @@ -147,6 +145,31 @@ public Future<Boolean> updateSSLOptions(SSLOptions options, boolean force) {
}
}

public void updateTrafficShapingOptions(TrafficShapingOptions options) {
if (options == null) {
throw new IllegalArgumentException("Invalid null value passed for traffic shaping options update");
}
if (trafficShapingHandler == null) {
throw new IllegalStateException("Unable to update traffic shaping options because the server was not configured " +
"to use traffic shaping during startup");
}
TCPServerBase server = actualServer;
if (server != null && server != this) {
server.updateTrafficShapingOptions(options);
} else {
long checkIntervalForStatsInMillis = options.getCheckIntervalForStatsTimeUnit().toMillis(options.getCheckIntervalForStats());
trafficShapingHandler.configure(options.getOutboundGlobalBandwidth(), options.getInboundGlobalBandwidth(), checkIntervalForStatsInMillis);

if (options.getPeakOutboundGlobalBandwidth() != 0) {
trafficShapingHandler.setMaxGlobalWriteSize(options.getPeakOutboundGlobalBandwidth());
}
if (options.getMaxDelayToWait() != 0) {
long maxDelayToWaitInMillis = options.getMaxDelayToWaitTimeUnit().toMillis(options.getMaxDelayToWait());
trafficShapingHandler.setMaxWriteDelay(maxDelayToWaitInMillis);
}
}
}

public Future<TCPServerBase> bind(SocketAddress address) {
ContextInternal listenContext = vertx.getOrCreateContext();
return listen(address, listenContext).map(this);
Expand Down
94 changes: 83 additions & 11 deletions src/test/java/io/vertx/core/http/HttpBandwidthLimitingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,27 @@ public static Iterable<Object[]> data() {

Function<Vertx, HttpServer> http1ServerFactory = (v) -> Providers.http1Server(v, INBOUND_LIMIT, OUTBOUND_LIMIT);
Function<Vertx, HttpServer> http2ServerFactory = (v) -> Providers.http2Server(v, INBOUND_LIMIT, OUTBOUND_LIMIT);
Function<Vertx, HttpServer> http1NonTrafficShapedServerFactory = (v) -> Providers.http1Server(v, 0, 0);
Function<Vertx, HttpServer> http2NonTrafficShapedServerFactory = (v) -> Providers.http1Server(v, 0, 0);
Function<Vertx, HttpClient> http1ClientFactory = (v) -> v.createHttpClient();
Function<Vertx, HttpClient> http2ClientFactory = (v) -> v.createHttpClient(createHttp2ClientOptions());

return Arrays.asList(new Object[][] {
{ 1.1, http1ServerFactory, http1ClientFactory },
{ 2.0, http2ServerFactory, http2ClientFactory }
{ 1.1, http1ServerFactory, http1ClientFactory, http1NonTrafficShapedServerFactory },
{ 2.0, http2ServerFactory, http2ClientFactory, http2NonTrafficShapedServerFactory }
});
}

private Function<Vertx, HttpServer> serverFactory;
private Function<Vertx, HttpClient> clientFactory;
private Function<Vertx, HttpServer> nonTrafficShapedServerFactory;

public HttpBandwidthLimitingTest(double protoVersion, Function<Vertx, HttpServer> serverFactory,
Function<Vertx, HttpClient> clientFactory) {
Function<Vertx, HttpClient> clientFactory,
Function<Vertx, HttpServer> nonTrafficShapedServerFactory) {
this.serverFactory = serverFactory;
this.clientFactory = clientFactory;
this.nonTrafficShapedServerFactory = nonTrafficShapedServerFactory;
}

@Before
Expand Down Expand Up @@ -199,6 +204,63 @@ public void start(Promise<Void> startPromise) {
Assert.assertTrue(elapsedMillis > expectedTimeMillis(totalReceivedLength.get(), OUTBOUND_LIMIT)); // because there are simultaneous 2 requests
}

@Test
public void testDynamicOutboundRateUpdate() throws Exception {
Buffer expectedBuffer = TestUtils.randomBuffer(TEST_CONTENT_SIZE);

HttpServer testServer = serverFactory.apply(vertx);
testServer.requestHandler(HANDLERS.bufferRead(expectedBuffer));
startServer(testServer);

// update outbound rate to twice the limit
TrafficShapingOptions trafficOptions = new TrafficShapingOptions()
.setInboundGlobalBandwidth(INBOUND_LIMIT) // unchanged
.setOutboundGlobalBandwidth(2 * OUTBOUND_LIMIT);
testServer.updateTrafficShapingOptions(trafficOptions);

long startTime = System.nanoTime();
HttpClient testClient = clientFactory.apply(vertx);
read(expectedBuffer, testServer, testClient);
await();
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);

Assert.assertTrue(elapsedMillis < expectedUpperBoundTimeMillis(TEST_CONTENT_SIZE, OUTBOUND_LIMIT));
}

@Test
public void testDynamicInboundRateUpdate() throws Exception {
Buffer expectedBuffer = TestUtils.randomBuffer((TEST_CONTENT_SIZE));

HttpServer testServer = serverFactory.apply(vertx);
testServer.requestHandler(HANDLERS.bufferWrite(expectedBuffer));
startServer(testServer);

// update inbound rate to twice the limit
TrafficShapingOptions trafficOptions = new TrafficShapingOptions()
.setOutboundGlobalBandwidth(OUTBOUND_LIMIT) // unchanged
.setInboundGlobalBandwidth(2 * INBOUND_LIMIT);
testServer.updateTrafficShapingOptions(trafficOptions);

long startTime = System.nanoTime();
HttpClient testClient = clientFactory.apply(vertx);
write(expectedBuffer, testServer, testClient);
await();
long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);

Assert.assertTrue(elapsedMillis < expectedUpperBoundTimeMillis(TEST_CONTENT_SIZE, INBOUND_LIMIT));
}

@Test(expected = IllegalStateException.class)
public void testRateUpdateWhenServerStartedWithoutTrafficShaping() {
HttpServer testServer = nonTrafficShapedServerFactory.apply(vertx);

// update inbound rate to twice the limit
TrafficShapingOptions trafficOptions = new TrafficShapingOptions()
.setOutboundGlobalBandwidth(OUTBOUND_LIMIT)
.setInboundGlobalBandwidth(2 * INBOUND_LIMIT);
testServer.updateTrafficShapingOptions(trafficOptions);
}

/**
* The throttling takes a while to kick in so the expected time cannot be strict especially
* for small data sizes in these tests.
Expand All @@ -211,6 +273,10 @@ private long expectedTimeMillis(long size, int rate) {
return (long) (TimeUnit.MILLISECONDS.convert(( size / rate), TimeUnit.SECONDS) * 0.5); // multiplied by 0.5 to be more tolerant of time pauses during CI runs
}

private long expectedUpperBoundTimeMillis(long size, int rate) {
return TimeUnit.MILLISECONDS.convert(( size / rate), TimeUnit.SECONDS); // Since existing rate will be upperbound, runs should complete by this time
}

private void read(Buffer expected, HttpServer server, HttpClient client) {
client.request(HttpMethod.GET, server.actualPort(), DEFAULT_HTTP_HOST,"/buffer-read")
.compose(req -> req.send()
Expand Down Expand Up @@ -280,19 +346,25 @@ static class Providers {
private static HttpServer http1Server(Vertx vertx, int inboundLimit, int outboundLimit) {
HttpServerOptions options = new HttpServerOptions()
.setHost(DEFAULT_HTTP_HOST)
.setPort(DEFAULT_HTTP_PORT)
.setTrafficShapingOptions(new TrafficShapingOptions()
.setInboundGlobalBandwidth(inboundLimit)
.setOutboundGlobalBandwidth(outboundLimit));
.setPort(DEFAULT_HTTP_PORT);

if (inboundLimit != 0 || outboundLimit != 0) {
options.setTrafficShapingOptions(new TrafficShapingOptions()
.setInboundGlobalBandwidth(inboundLimit)
.setOutboundGlobalBandwidth(outboundLimit));
}

return vertx.createHttpServer(options);
}

private static HttpServer http2Server(Vertx vertx, int inboundLimit, int outboundLimit) {
HttpServerOptions options = createHttp2ServerOptions(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST)
.setTrafficShapingOptions(new TrafficShapingOptions()
.setInboundGlobalBandwidth(inboundLimit)
.setOutboundGlobalBandwidth(outboundLimit));
HttpServerOptions options = createHttp2ServerOptions(DEFAULT_HTTP_PORT, DEFAULT_HTTP_HOST);

if (inboundLimit != 0 || outboundLimit != 0) {
options.setTrafficShapingOptions(new TrafficShapingOptions()
.setInboundGlobalBandwidth(inboundLimit)
.setOutboundGlobalBandwidth(outboundLimit));
}

return vertx.createHttpServer(options);
}
Expand Down
Loading

0 comments on commit 6ddadec

Please sign in to comment.