Skip to content

Commit

Permalink
Add HttpPushSpecificConfig to enable custom idle timeout configuratio…
Browse files Browse the repository at this point in the history
…n per connection

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Feb 3, 2022
1 parent 2e1f6aa commit efd663e
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ private static Uri stripUserInfo(final Uri requestUri) {
private Flow<Pair<HttpRequest, HttpPushContext>, Pair<Try<HttpResponse>, HttpPushContext>, ?>
buildHttpRequestFlow(final HttpPushConfig config) {

final Duration requestTimeout = config.getRequestTimeout();
final Duration requestTimeout = HttpPushSpecificConfig.fromConnection(connection, config).idleTimeout();

final PreparedTimer timer = DittoMetrics.timer("http_publish_request_time")
// Set maximum duration higher than request timeout to avoid race conditions
Expand Down Expand Up @@ -389,10 +389,10 @@ private HttpRequest createRequest(final HttpPublishTarget publishTarget, final E
result = requestWithoutEntity.withEntity(getTextPayload(message));
} else {
result = requestWithoutEntity.withEntity(getBytePayload(message));
}
}

return result;
}
return result;
}

}

Expand Down Expand Up @@ -451,7 +451,8 @@ public void onResponse(final Try<HttpResponse> tryResponse) {
l.debug("Got response <{} {} {}>", response.status(), response.getHeaders(),
response.entity().getContentType());

toCommandResponseOrAcknowledgement(signal, autoAckTarget, response, maxTotalMessageSize, ackSizeQuota,
toCommandResponseOrAcknowledgement(signal, autoAckTarget, response, maxTotalMessageSize,
ackSizeQuota,
targetAuthorizationContext)
.thenAccept(resultFuture::complete)
.exceptionally(e -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.httppush;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.service.config.HttpPushConfig;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

/**
* Class providing access to HTTP push specific configuration.
*/
@Immutable
public final class HttpPushSpecificConfig {

static final String IDLE_TIMEOUT = "idleTimeout";

private final Config specificConfig;

private HttpPushSpecificConfig(final Config specificConfig) {
this.specificConfig = specificConfig;
}

/**
* Creates a new instance of HttpSpecificConfig based on the {@code specificConfig} of the passed
* {@code connection}.
*
* @param connection the Connection to extract the {@code specificConfig} map from.
* @param httpConfig the http config to create the default config from.
* @return the new HttpSpecificConfig instance
*/
public static HttpPushSpecificConfig fromConnection(final Connection connection, final HttpPushConfig httpConfig) {
final Map<String, Object> defaultConfig = toDefaultConfig(httpConfig);
final Config config = ConfigFactory.parseMap(connection.getSpecificConfig())
.withFallback(ConfigFactory.parseMap(defaultConfig));

return new HttpPushSpecificConfig(config);
}

private static Map<String, Object> toDefaultConfig(final HttpPushConfig httpConfig) {
final Map<String, Object> defaultMap = new HashMap<>();
defaultMap.put(IDLE_TIMEOUT, httpConfig.getRequestTimeout());
return defaultMap;
}

/**
* @return the idle timeout applied for HTTP push requests.
*/
public Duration idleTimeout() {
return specificConfig.getDuration(IDLE_TIMEOUT);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final HttpPushSpecificConfig that = (HttpPushSpecificConfig) o;
return Objects.equals(specificConfig, that.specificConfig);
}

@Override
public int hashCode() {
return Objects.hash(specificConfig);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"specificConfig=" + specificConfig +
"]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.httppush;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -53,6 +55,9 @@ public final class HttpPushValidator extends AbstractProtocolValidator {
.map(HttpMethod::name)
.collect(Collectors.joining(", "));

static final Duration MAX_IDLE_TIMEOUT = Duration.of(60, ChronoUnit.SECONDS);

private final HttpPushConfig httpPushConfig;
private final boolean oauth2EnforceHttps;

/**
Expand All @@ -66,7 +71,8 @@ public static HttpPushValidator newInstance(final HttpPushConfig config) {
}

private HttpPushValidator(final HttpPushConfig config) {
oauth2EnforceHttps = config.getOAuth2Config().shouldEnforceHttps();
httpPushConfig = config;
oauth2EnforceHttps = httpPushConfig.getOAuth2Config().shouldEnforceHttps();
}

@Override
Expand All @@ -84,6 +90,7 @@ public void validate(final Connection connection, final DittoHeaders dittoHeader
validateParallelism(connection.getSpecificConfig(), dittoHeaders);
validateOmitBodyMethods(connection.getSpecificConfig(), dittoHeaders);
validateCredentials(connection, dittoHeaders);
validateSpecificConfig(connection, dittoHeaders);
}

@Override
Expand Down Expand Up @@ -186,6 +193,20 @@ private void validateCredentials(final Connection connection, final DittoHeaders
});
}

private void validateSpecificConfig(final Connection connection, final DittoHeaders dittoHeaders) {
final HttpPushSpecificConfig
httpPushSpecificConfig = HttpPushSpecificConfig.fromConnection(connection, httpPushConfig);
final var idleTimeout = httpPushSpecificConfig.idleTimeout();
if (idleTimeout.isNegative() || idleTimeout.compareTo(MAX_IDLE_TIMEOUT) > 0) {
throw ConnectionConfigurationInvalidException
.newBuilder("Idle timeout '" + idleTimeout.toSeconds() +
"' is not within the allowed range of [0, " + MAX_IDLE_TIMEOUT.toSeconds() + "] seconds.")
.description("Please adjust the timeout to be within the allowed range.")
.dittoHeaders(dittoHeaders)
.build();
}
}

private static ConnectionConfigurationInvalidException parallelismValidationFailed(final String parallelismString,
final DittoHeaders headers) {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.httppush;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.service.config.HttpPushConfig;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public final class HttpPushSpecificConfigTest {

private HttpPushConfig httpConfig;
private Connection connection;

@Before
public void setup() {
httpConfig = Mockito.mock(HttpPushConfig.class);
when(httpConfig.getRequestTimeout()).thenReturn(Duration.ofSeconds(2));
connection = Mockito.mock(Connection.class);
}

@Test
public void parseHttpSpecificConfig() {
final Map<String, String> configuredSpecificConfig = new HashMap<>();
configuredSpecificConfig.put(HttpPushSpecificConfig.IDLE_TIMEOUT, "3s");

when(connection.getSpecificConfig()).thenReturn(configuredSpecificConfig);
final var specificConfig = HttpPushSpecificConfig.fromConnection(connection, httpConfig);

assertThat(specificConfig.idleTimeout()).isEqualTo(Duration.ofSeconds(3));
}

@Test
public void defaultConfig() {
when(connection.getSpecificConfig()).thenReturn(Collections.emptyMap());
final HttpPushSpecificConfig specificConfig = HttpPushSpecificConfig.fromConnection(connection, httpConfig);

assertThat(specificConfig.idleTimeout()).isEqualTo(Duration.ofSeconds(2));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,18 @@ public void testInvalidOmitBodyHttpMethod() {
verifyConnectionConfigurationInvalidExceptionIsThrown(connection, "It contains an invalid HTTP method");
}

@Test
public void testInvalidIdleTime() {
final var idleTimeout = "-3s";
final Connection connection = getConnectionWithTarget("POST:events").toBuilder()
.specificConfig(Map.of(HttpPushSpecificConfig.IDLE_TIMEOUT, idleTimeout))
.build();
verifyConnectionConfigurationInvalidExceptionIsThrown(connection,
"Idle timeout '" + idleTimeout.substring(0, idleTimeout.length() - 1) +
"' is not within the allowed range of [0, " + HttpPushValidator.MAX_IDLE_TIMEOUT.toSeconds() +
"] seconds.");
}

@Test
public void testNullOmitBodyHttpMethods() {
final Connection connection = getConnectionWithTarget("POST:events").toBuilder().build();
Expand All @@ -154,7 +166,7 @@ private static Connection getConnectionWithTarget(final String target) {

private static Connection getConnectionWithHostAndTarget(final String host, final String target) {
return ConnectivityModelFactory.newConnectionBuilder(CONNECTION_ID, ConnectionType.HTTP_PUSH,
ConnectivityStatus.OPEN, "http://" + host + ":80")
ConnectivityStatus.OPEN, "http://" + host + ":80")
.targets(singletonList(ConnectivityModelFactory.newTargetBuilder()
.address(target)
.authorizationContext(AUTHORIZATION_CONTEXT)
Expand Down

0 comments on commit efd663e

Please sign in to comment.