Skip to content

Commit

Permalink
Add existing HTTP Push specific config items to HttpPushSpecificConfig
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Feb 7, 2022
1 parent efd663e commit 2fafd1a
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 97 deletions.
Expand Up @@ -15,7 +15,6 @@
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
Expand Down Expand Up @@ -93,8 +92,10 @@ private DefaultHttpPushFactory(final Connection connection, final int parallelis

static HttpPushFactory of(final Connection connection, final HttpPushConfig httpPushConfig,
final ConnectionLogger connectionLogger, final Supplier<SshTunnelState> tunnelConfigSupplier) {

final Uri baseUri = Uri.create(connection.getUri());
final int parallelism = parseParallelism(connection.getSpecificConfig());
final var httpPushSpecificConfig = HttpPushSpecificConfig.fromConnection(connection, httpPushConfig);
final int parallelism = parseParallelism(httpPushSpecificConfig);

final HttpsConnectionContext httpsConnectionContext;
if (HttpPushValidator.isSecureScheme(baseUri.getScheme())) {
Expand Down Expand Up @@ -256,11 +257,8 @@ private static ConnectionPoolSettings disambiguateByConnectionId(final ActorSyst
.withParserSettings(parserSettings.withHeaderValueCacheLimits(disambiguator)));
}

static int parseParallelism(final Map<String, String> specificConfig) {
return Optional.ofNullable(specificConfig.get(HttpPushFactory.PARALLELISM_JSON_KEY))
.map(Integer::valueOf)
.map(DefaultHttpPushFactory::determineNextPowerOfTwo)
.orElse(1);
static int parseParallelism(final HttpPushSpecificConfig specificConfig) {
return determineNextPowerOfTwo(specificConfig.parallelism());
}

private static int determineNextPowerOfTwo(final int parallelism) {
Expand Down
Expand Up @@ -30,7 +30,6 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -129,8 +128,6 @@ final class HttpPublisherActor extends BasePublisherActor<HttpPublishTarget> {
"a) The HTTP endpoint does not consume the messages fast enough.\n" +
"b) The client count and/or the parallelism of this connection is not configured high enough.";

static final String OMIT_REQUEST_BODY_CONFIG_KEY = "omitRequestBody";

private final HttpPushFactory factory;

private final Materializer materializer;
Expand Down Expand Up @@ -702,20 +699,11 @@ private ConnectionFailure toConnectionFailure(@Nullable final Done done, @Nullab
return ConnectionFailure.of(getSelf(), error, "HttpPublisherActor stream terminated");
}

private List<HttpMethod> parseOmitBodyMethods(final Connection connection,
private static List<HttpMethod> parseOmitBodyMethods(final Connection connection,
final HttpPushConfig httpPushConfig) {
return Optional.of(connection.getSpecificConfig())
.map(specificConfig -> specificConfig.get(OMIT_REQUEST_BODY_CONFIG_KEY))
.map(methods -> {
if (methods.isEmpty()) {
return Stream.<String>empty();
} else {
return Arrays.stream(methods.split(","));
}
})
.map(s -> s.flatMap(m -> HttpMethods.lookup(m).stream()).collect(Collectors.toList()))
.orElse(httpPushConfig.getOmitRequestBodyMethods()
.stream().flatMap(m -> HttpMethods.lookup(m).stream()).collect(Collectors.toList()));
final var specificConfig = HttpPushSpecificConfig.fromConnection(connection, httpPushConfig);
return specificConfig.omitRequestBody().stream()
.map(s -> HttpMethods.lookup(s).orElse(null)).collect(Collectors.toList());
}

private enum ReservedHeaders {
Expand Down
Expand Up @@ -38,11 +38,6 @@
*/
public interface HttpPushFactory {

/**
* Specific config name for the amount of concurrent HTTP requests to make.
*/
String PARALLELISM_JSON_KEY = "parallelism";

/**
* Create a request template without headers or payload for an HTTP publish target.
* Published external messages set the headers and payload.
Expand Down
Expand Up @@ -14,6 +14,7 @@

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand All @@ -23,6 +24,7 @@
import org.eclipse.ditto.connectivity.service.config.HttpPushConfig;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigFactory;

/**
Expand All @@ -33,6 +35,10 @@ public final class HttpPushSpecificConfig {

static final String IDLE_TIMEOUT = "idleTimeout";

static final String PARALLELISM = "parallelism";

static final String OMIT_REQUEST_BODY = "omitRequestBody";

private final Config specificConfig;

private HttpPushSpecificConfig(final Config specificConfig) {
Expand All @@ -58,6 +64,8 @@ public static HttpPushSpecificConfig fromConnection(final Connection connection,
private static Map<String, Object> toDefaultConfig(final HttpPushConfig httpConfig) {
final Map<String, Object> defaultMap = new HashMap<>();
defaultMap.put(IDLE_TIMEOUT, httpConfig.getRequestTimeout());
defaultMap.put(PARALLELISM, 1);
defaultMap.put(OMIT_REQUEST_BODY, httpConfig.getOmitRequestBodyMethods());
return defaultMap;
}

Expand All @@ -68,6 +76,24 @@ public Duration idleTimeout() {
return specificConfig.getDuration(IDLE_TIMEOUT);
}

/**
* @return the parallelism applied to HTTP publishing.
*/
public Integer parallelism() {
return specificConfig.getInt(PARALLELISM);
}

/**
* @return for which HTTP methods request bodies should be omitted.
*/
public List<String> omitRequestBody() {
try {
return specificConfig.getStringList(OMIT_REQUEST_BODY);
} catch (final ConfigException.WrongType e) {
return List.of(specificConfig.getString(OMIT_REQUEST_BODY).split(","));
}
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Expand Up @@ -16,7 +16,6 @@
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -87,8 +86,6 @@ public void validate(final Connection connection, final DittoHeaders dittoHeader
validateSourceConfigs(connection, dittoHeaders);
validateTargetConfigs(connection, dittoHeaders);
validatePayloadMappings(connection, actorSystem, connectivityConfig, dittoHeaders);
validateParallelism(connection.getSpecificConfig(), dittoHeaders);
validateOmitBodyMethods(connection.getSpecificConfig(), dittoHeaders);
validateCredentials(connection, dittoHeaders);
validateSpecificConfig(connection, dittoHeaders);
}
Expand Down Expand Up @@ -144,39 +141,6 @@ private static void validateHttpMethod(final String methodName,
}
}

private static void validateParallelism(final Map<String, String> specificConfig, final DittoHeaders dittoHeaders) {
final String parallelismString = specificConfig.get(HttpPushFactory.PARALLELISM_JSON_KEY);
if (parallelismString != null) {
try {
final int parallelism = Integer.parseInt(parallelismString);
if (parallelism <= 0) {
throw parallelismValidationFailed(parallelismString, dittoHeaders);
}
} catch (final NumberFormatException e) {
throw parallelismValidationFailed(parallelismString, dittoHeaders);
}
}
}

private static void validateOmitBodyMethods(final Map<String, String> specificConfig,
final DittoHeaders dittoHeaders) {

final String omitBody = specificConfig.get(HttpPublisherActor.OMIT_REQUEST_BODY_CONFIG_KEY);
if (omitBody != null && !omitBody.isEmpty()) {
final String[] methodsArray = omitBody.split(",");
for (final String method : methodsArray) {
if (HttpMethods.lookup(method).isEmpty()) {
final String errorMessage = String.format("The configured value '%s' of '%s' is invalid. " +
"It contains an invalid HTTP method: %s",
omitBody, HttpPublisherActor.OMIT_REQUEST_BODY_CONFIG_KEY, method);
throw ConnectionConfigurationInvalidException.newBuilder(errorMessage)
.dittoHeaders(dittoHeaders)
.build();
}
}
}
}

private void validateCredentials(final Connection connection, final DittoHeaders dittoHeaders) {
connection.getCredentials().ifPresent(credentials -> {
if (credentials instanceof OAuthClientCredentials) {
Expand All @@ -196,7 +160,12 @@ private void validateCredentials(final Connection connection, final DittoHeaders
private void validateSpecificConfig(final Connection connection, final DittoHeaders dittoHeaders) {
final HttpPushSpecificConfig
httpPushSpecificConfig = HttpPushSpecificConfig.fromConnection(connection, httpPushConfig);
final var idleTimeout = httpPushSpecificConfig.idleTimeout();
validateIdleTimeout(httpPushSpecificConfig.idleTimeout(), dittoHeaders);
validateParallelism(httpPushSpecificConfig.parallelism(), dittoHeaders);
validateOmitBodyMethods(httpPushSpecificConfig.omitRequestBody(), dittoHeaders);
}

private static void validateIdleTimeout(final Duration idleTimeout, final DittoHeaders dittoHeaders) {
if (idleTimeout.isNegative() || idleTimeout.compareTo(MAX_IDLE_TIMEOUT) > 0) {
throw ConnectionConfigurationInvalidException
.newBuilder("Idle timeout '" + idleTimeout.toSeconds() +
Expand All @@ -207,13 +176,40 @@ private void validateSpecificConfig(final Connection connection, final DittoHead
}
}

private static ConnectionConfigurationInvalidException parallelismValidationFailed(final String parallelismString,
private static void validateParallelism(final int parallelism, final DittoHeaders dittoHeaders) {
try {
if (parallelism <= 0) {
throw parallelismValidationFailed(parallelism, dittoHeaders);
}
} catch (final NumberFormatException e) {
throw parallelismValidationFailed(parallelism, dittoHeaders);
}
}

private static void validateOmitBodyMethods(final List<String> omitBodyMethods,
final DittoHeaders dittoHeaders) {

if (!omitBodyMethods.isEmpty()) {
for (final String method : omitBodyMethods) {
if (!method.equals("") && HttpMethods.lookup(method).isEmpty()) {
final String errorMessage = String.format("The configured value '%s' of '%s' is invalid. " +
"It contains an invalid HTTP method: %s",
omitBodyMethods, HttpPushSpecificConfig.OMIT_REQUEST_BODY, method);
throw ConnectionConfigurationInvalidException.newBuilder(errorMessage)
.dittoHeaders(dittoHeaders)
.build();
}
}
}
}

private static ConnectionConfigurationInvalidException parallelismValidationFailed(final int parallelism,
final DittoHeaders headers) {

final String errorMessage = String.format("The configured value '%s' of '%s' is invalid. " +
"It must be a positive integer.",
parallelismString,
HttpPushFactory.PARALLELISM_JSON_KEY);
parallelism,
HttpPushSpecificConfig.PARALLELISM);
return ConnectionConfigurationInvalidException.newBuilder(errorMessage)
.dittoHeaders(headers)
.build();
Expand Down
Expand Up @@ -13,10 +13,10 @@
package org.eclipse.ditto.connectivity.service.messaging.httppush;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import static org.mockito.Mockito.when;

import org.junit.Test;
import org.mockito.Mockito;

/**
* Unit tests for {@link DefaultHttpPushFactory}.
Expand All @@ -25,24 +25,25 @@ public final class DefaultHttpPushFactoryTest {

@Test
public void ensureParsedParallelismIsAlwaysFactorOfTwo() {
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "1")))
.isEqualTo(1);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "2")))
.isEqualTo(2);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "3")))
.isEqualTo(4);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "4")))
.isEqualTo(4);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "5")))
.isEqualTo(8);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "8")))
.isEqualTo(8);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "9")))
.isEqualTo(16);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "10")))
.isEqualTo(16);
assertThat(DefaultHttpPushFactory.parseParallelism(Map.of(HttpPushFactory.PARALLELISM_JSON_KEY, "16")))
.isEqualTo(16);
final var mockConf = Mockito.mock(HttpPushSpecificConfig.class);
when(mockConf.parallelism()).thenReturn(1);
assertThat(DefaultHttpPushFactory.parseParallelism(mockConf)).isEqualTo(1);
when(mockConf.parallelism()).thenReturn(2);
assertThat(DefaultHttpPushFactory.parseParallelism(mockConf)).isEqualTo(2);
when(mockConf.parallelism()).thenReturn(3);
assertThat(DefaultHttpPushFactory.parseParallelism(mockConf)).isEqualTo(4);
when(mockConf.parallelism()).thenReturn(4);
assertThat(DefaultHttpPushFactory.parseParallelism(mockConf)).isEqualTo(4);
when(mockConf.parallelism()).thenReturn(5);
assertThat(DefaultHttpPushFactory.parseParallelism(mockConf)).isEqualTo(8);
when(mockConf.parallelism()).thenReturn(8);
assertThat(DefaultHttpPushFactory.parseParallelism(mockConf)).isEqualTo(8);
when(mockConf.parallelism()).thenReturn(9);
assertThat(DefaultHttpPushFactory.parseParallelism(mockConf)).isEqualTo(16);
when(mockConf.parallelism()).thenReturn(10);
assertThat(DefaultHttpPushFactory.parseParallelism(mockConf)).isEqualTo(16);
when(mockConf.parallelism()).thenReturn(16);
assertThat(DefaultHttpPushFactory.parseParallelism(mockConf)).isEqualTo(16);
}

}
Expand Up @@ -13,7 +13,7 @@
package org.eclipse.ditto.connectivity.service.messaging.httppush;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.ditto.connectivity.service.messaging.httppush.HttpPublisherActor.OMIT_REQUEST_BODY_CONFIG_KEY;
import static org.eclipse.ditto.connectivity.service.messaging.httppush.HttpPushSpecificConfig.OMIT_REQUEST_BODY;
import static org.eclipse.ditto.connectivity.service.messaging.httppush.HttpTestDittoProtocolHelper.signalToJsonString;
import static org.eclipse.ditto.connectivity.service.messaging.httppush.HttpTestDittoProtocolHelper.signalToMultiMapped;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -907,13 +907,13 @@ public void testHttpPathReservedHeaderWithLeadingSlash() throws Exception {
@Test
public void testOmitRequestBody() throws Exception {
testOmitRequestBody(HttpMethods.GET, Map.of(), true);
testOmitRequestBody(HttpMethods.GET, Map.of(OMIT_REQUEST_BODY_CONFIG_KEY, "GET"), true);
testOmitRequestBody(HttpMethods.GET, Map.of(OMIT_REQUEST_BODY_CONFIG_KEY, "POST"), false);
testOmitRequestBody(HttpMethods.GET, Map.of(OMIT_REQUEST_BODY_CONFIG_KEY, ""), false);
testOmitRequestBody(HttpMethods.GET, Map.of(OMIT_REQUEST_BODY, "GET"), true);
testOmitRequestBody(HttpMethods.GET, Map.of(OMIT_REQUEST_BODY, "POST"), false);
testOmitRequestBody(HttpMethods.GET, Map.of(OMIT_REQUEST_BODY, ""), false);
testOmitRequestBody(HttpMethods.DELETE, Map.of(), true);
testOmitRequestBody(HttpMethods.DELETE, Map.of(OMIT_REQUEST_BODY_CONFIG_KEY, "GET,DELETE"), true);
testOmitRequestBody(HttpMethods.DELETE, Map.of(OMIT_REQUEST_BODY_CONFIG_KEY, "GET"), false);
testOmitRequestBody(HttpMethods.DELETE, Map.of(OMIT_REQUEST_BODY_CONFIG_KEY, ""), false);
testOmitRequestBody(HttpMethods.DELETE, Map.of(OMIT_REQUEST_BODY, "GET,DELETE"), true);
testOmitRequestBody(HttpMethods.DELETE, Map.of(OMIT_REQUEST_BODY, "GET"), false);
testOmitRequestBody(HttpMethods.DELETE, Map.of(OMIT_REQUEST_BODY, ""), false);
}

private void testOmitRequestBody(final HttpMethod method, final Map<String, String> specificConfig,
Expand Down
Expand Up @@ -386,6 +386,7 @@ private static Connection createHttpPushConnection(final ServerBinding binding)
ConnectivityStatus.OPEN,
"http://127.0.0.1:" + binding.localAddress().getPort())
.targets(singletonList(AbstractBaseClientActorTest.HTTP_TARGET))
.specificConfig(Map.of())
.build();
}

Expand Down
Expand Up @@ -15,6 +15,7 @@
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.eclipse.ditto.connectivity.service.messaging.TestConstants.Authorization.AUTHORIZATION_CONTEXT;
import static org.eclipse.ditto.connectivity.service.messaging.httppush.HttpPushSpecificConfig.OMIT_REQUEST_BODY;
import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf;
import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable;

Expand Down Expand Up @@ -129,7 +130,7 @@ public void testInvalidTargetAddress() {
@Test
public void testInvalidOmitBodyHttpMethod() {
final Connection connection = getConnectionWithTarget("POST:events").toBuilder()
.specificConfig(Map.of(HttpPublisherActor.OMIT_REQUEST_BODY_CONFIG_KEY, "GET,DELET,POST"))
.specificConfig(Map.of(OMIT_REQUEST_BODY, "GET,DELET,POST"))
.build();
verifyConnectionConfigurationInvalidExceptionIsThrown(connection, "It contains an invalid HTTP method");
}
Expand All @@ -155,7 +156,7 @@ public void testNullOmitBodyHttpMethods() {
@Test
public void testEmptyOmitBodyHttpMethods() {
final Connection connection = getConnectionWithTarget("POST:events").toBuilder()
.specificConfig(Map.of(HttpPublisherActor.OMIT_REQUEST_BODY_CONFIG_KEY, ""))
.specificConfig(Map.of(OMIT_REQUEST_BODY, ""))
.build();
underTest.validate(connection, DittoHeaders.empty(), actorSystem, connectivityConfig);
}
Expand Down

0 comments on commit 2fafd1a

Please sign in to comment.