Skip to content

Commit

Permalink
round of HTTP parallelism to next factor of two
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Sep 6, 2021
1 parent 19cc0e9 commit db57622
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,15 @@ private static ConnectionPoolSettings disambiguateByConnectionId(final ActorSyst
.withParserSettings(parserSettings.withHeaderValueCacheLimits(disambiguator)));
}

private static int parseParallelism(final Map<String, String> specificConfig) {
return Optional.ofNullable(specificConfig.get(HttpPushFactory.PARALLELISM))
static int parseParallelism(final Map<String, String> specificConfig) {
final int parsedParallelism = Optional.ofNullable(specificConfig.get(HttpPushFactory.PARALLELISM_JSON_KEY))
.map(Integer::valueOf)
.orElse(1);
return determineNextPowerOfTwo(parsedParallelism);
}

private static int determineNextPowerOfTwo(final int parallelism) {
return parallelism == 1 ? 1 : Integer.highestOneBit(parallelism - 1) * 2;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.service.config.HttpPushConfig;
import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelState;
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger;
import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelState;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.PreparedTimer;

import akka.actor.ActorSystem;
Expand All @@ -40,7 +40,7 @@ public interface HttpPushFactory {
/**
* Specific config name for the amount of concurrent HTTP requests to make.
*/
String PARALLELISM = "parallelism";
String PARALLELISM_JSON_KEY = "parallelism";

/**
* Create a request template without headers or payload for an HTTP publish target.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private void validateHttpMethod(final String methodName, final DittoHeaders ditt

private void validateParallelism(final Map<String, String> specificConfig, final DittoHeaders dittoHeaders) {

final String parallelismString = specificConfig.get(HttpPushFactory.PARALLELISM);
final String parallelismString = specificConfig.get(HttpPushFactory.PARALLELISM_JSON_KEY);
if (parallelismString != null) {
try {
final int parallelism = Integer.parseInt(parallelismString);
Expand All @@ -150,7 +150,7 @@ private static ConnectionConfigurationInvalidException parallelismValidationFail
final String errorMessage = String.format("The configured value '%s' of '%s' is invalid. " +
"It must be a positive integer.",
parallelismString,
HttpPushFactory.PARALLELISM);
HttpPushFactory.PARALLELISM_JSON_KEY);
return ConnectionConfigurationInvalidException.newBuilder(errorMessage)
.dittoHeaders(headers)
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.httppush;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;

import org.junit.Test;

/**
* Unit tests for {@link DefaultHttpPushFactory}.
*/
public final class DefaultHttpPushFactoryTest {

@Test
public void ensureParsedParallelismIsAlwaysFactorOfTwo() {
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "1")))
.isEqualTo(1);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "2")))
.isEqualTo(2);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "3")))
.isEqualTo(4);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "4")))
.isEqualTo(4);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "5")))
.isEqualTo(8);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "8")))
.isEqualTo(8);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "9")))
.isEqualTo(16);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "10")))
.isEqualTo(16);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "16")))
.isEqualTo(16);
}

}

0 comments on commit db57622

Please sign in to comment.