Skip to content

Commit

Permalink
increased coordinated-shutdown-timeout to 65s to give the requests en…
Browse files Browse the repository at this point in the history
…ough time to complete;

removed match for DistributedPubSubMediator.SubscribeAck from GatewayProxyActor and EdgeCommandForwarderActor because no subscribe message is send via pudSubMediator;
wait with stopping the AbstractHttpRequestActor until the request is completed;
add test to HttpRequestActorTest;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Sep 13, 2022
1 parent 5505cff commit acdade1
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ enum HttpConfigValue implements KnownConfigValue {
*/
PORT("port", 8080),

COORDINATED_SHUTDOWN_TIMEOUT("coordinated-shutdown-timeout", Duration.ofSeconds(1));
COORDINATED_SHUTDOWN_TIMEOUT("coordinated-shutdown-timeout", Duration.ofSeconds(65));

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.ReceiveBuilder;

/**
Expand Down Expand Up @@ -144,10 +143,6 @@ public Receive createReceive() {
.match(ThingSearchCommand.class, this::forwardToThingSearch)
.match(ThingSearchSudoCommand.class, this::forwardToThingSearch)
.match(Signal.class, this::handleUnknownSignal)
.match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck ->
log.debug("Successfully subscribed to distributed pub/sub on topic '{}'",
subscribeAck.subscribe().topic())
)
.matchAny(m -> log.warning("Got unknown message: {}", m))
.build();

Expand All @@ -161,8 +156,7 @@ private void forwardToThings(final Signal<?> thingSignal) {
scheduleTask(thingSignal, () -> signalTransformationCs.thenAccept(transformed -> {
log.withCorrelationId(transformed)
.info("Forwarding thing signal with ID <{}> and type <{}> to 'things' shard region",
transformed instanceof WithEntityId withEntityId ? withEntityId.getEntityId() :
null,
transformed instanceof WithEntityId withEntityId ? withEntityId.getEntityId() : null,
transformed.getType());

if (!Signal.isChannelLive(transformed) &&
Expand Down Expand Up @@ -267,9 +261,7 @@ private void forwardToConnectivity(final Command<?> connectivityCommand) {
private void forwardToThingSearch(final Command<?> command) {
// don't use "ask with retry" as the search could take some time and we don't want to stress the search
// by retrying a query several times if it took long
pubSubMediator.tell(
DistPubSubAccess.send(ThingsSearchConstants.SEARCH_ACTOR_PATH, command),
getSender());
pubSubMediator.tell(DistPubSubAccess.send(ThingsSearchConstants.SEARCH_ACTOR_PATH, command), getSender());
}

private void handleUnknownSignal(final Signal<?> signal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@
import org.eclipse.ditto.messages.model.Message;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommandResponse;

import akka.Done;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.CoordinatedShutdown;
import akka.actor.ReceiveTimeout;
import akka.actor.Status;
import akka.http.javadsl.model.ContentTypes;
Expand All @@ -90,6 +92,7 @@
import akka.http.scaladsl.model.EntityStreamSizeException;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.AskTimeoutException;
import akka.pattern.Patterns;
import akka.util.ByteString;
import scala.Option;
import scala.util.Either;
Expand All @@ -101,6 +104,12 @@
*/
public abstract class AbstractHttpRequestActor extends AbstractActor {

/**
* Ask-timeout in shutdown tasks. Its duration should be long enough for solution commands to succeed but
* ultimately does not matter because each shutdown phase has its own timeout.
*/
private static final Duration SHUTDOWN_ASK_TIMEOUT = Duration.ofMinutes(2L);

/**
* Signals the completion of a stream request.
*/
Expand Down Expand Up @@ -180,6 +189,17 @@ private void setReceiveTimeout(final Duration receiveTimeout) {
actorContext.setReceiveTimeout(receiveTimeout);
}

@Override
public void preStart() {
final var coordinatedShutdown = CoordinatedShutdown.get(getContext().getSystem());

final var serviceRequestsDoneTask = "service-requests-done-http-request-actor" ;
coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone(), serviceRequestsDoneTask,
() -> Patterns.ask(getSelf(), Control.SERVICE_REQUESTS_DONE, SHUTDOWN_ASK_TIMEOUT)
.thenApply(reply -> Done.done())
);
}

@Override
public AbstractActor.Receive createReceive() {
return ReceiveBuilder.create()
Expand Down Expand Up @@ -211,6 +231,7 @@ public AbstractActor.Receive createReceive() {
.build());
})
.match(Command.class, this::handleCommand)
.matchEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone)
.matchAny(m -> {
logger.warning("Got unknown message, expected a 'Command': {}", m);
completeWithResult(createHttpResponse(HttpStatus.INTERNAL_SERVER_ERROR));
Expand Down Expand Up @@ -398,6 +419,7 @@ private Receive getResponseAwaitingBehavior() {
"Got <Status.Failure> when a command response was expected: <{}>!", cause.getMessage());
completeWithResult(createHttpResponse(HttpStatus.INTERNAL_SERVER_ERROR));
})
.matchEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone)
.matchAny(m -> {
logger.error("Got unknown message when a command response was expected: <{}>!", m);
completeWithResult(createHttpResponse(HttpStatus.INTERNAL_SERVER_ERROR));
Expand Down Expand Up @@ -488,19 +510,18 @@ private void handleReceiveTimeout() {

logger.setCorrelationId(WithDittoHeaders.getCorrelationId(receivedCommand).orElse(null));
logger.info("Got <{}> after <{}> before an appropriate response arrived.",
ReceiveTimeout.class.getSimpleName(),
receiveTimeout);
ReceiveTimeout.class.getSimpleName(), receiveTimeout);
actorContext.cancelReceiveTimeout();

if (null != timeoutExceptionSupplier) {
final var timeoutException = timeoutExceptionSupplier.get();
handleDittoRuntimeException(timeoutException);
} else {

// This case is a programming error that should not happen at all.
logger.error("Actor does not have a timeout exception supplier." +
" Thus, no DittoRuntimeException could be handled.");
stop();
}
getContext().cancelReceiveTimeout();
}

private void handleDittoRuntimeException(final DittoRuntimeException exception) {
Expand Down Expand Up @@ -581,6 +602,8 @@ private void completeWithResult(final HttpResponse response) {

private void stop() {
logger.clearMDC();
// complete coordinated shutdown phase - ServiceRequestsDone
getSelf().tell(Done.getInstance(), getSelf());
// destroy ourselves:
getContext().stop(getSelf());
}
Expand Down Expand Up @@ -742,13 +765,11 @@ private static Duration getReceiveTimeout(final Signal<?> originatingSignal, fin
}
}

private static final class HttpAcknowledgementConfig implements AcknowledgementConfig {

private final HttpConfig httpConfig;
private void serviceRequestsDone(final Control serviceRequestsDone) {
logger.info("{}: waiting to complete the request", serviceRequestsDone);
}

private HttpAcknowledgementConfig(final HttpConfig httpConfig) {
this.httpConfig = httpConfig;
}
private record HttpAcknowledgementConfig(HttpConfig httpConfig) implements AcknowledgementConfig {

private static AcknowledgementConfig of(final HttpConfig httpConfig) {
return new HttpAcknowledgementConfig(httpConfig);
Expand Down Expand Up @@ -776,4 +797,8 @@ public int getIssuedMaxBytes() {

}

enum Control {
SERVICE_REQUESTS_DONE
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;

Expand All @@ -56,8 +55,7 @@ public final class GatewayProxyActor extends AbstractActor {
private final ActorRef statisticsActor;

@SuppressWarnings("unused")
private GatewayProxyActor(final ActorRef pubSubMediator,
final ActorSelection devOpsCommandsActor,
private GatewayProxyActor(final ActorRef pubSubMediator, final ActorSelection devOpsCommandsActor,
final ActorRef edgeCommandForwarder) {
this.pubSubMediator = pubSubMediator;
this.devOpsCommandsActor = devOpsCommandsActor;
Expand All @@ -73,10 +71,8 @@ private GatewayProxyActor(final ActorRef pubSubMediator,
* @param edgeCommandForwarder the Actor ref to the {@code EdgeCommandForwarderActor}.
* @return the Akka configuration Props object.
*/
public static Props props(final ActorRef pubSubMediator,
final ActorSelection devOpsCommandsActor,
public static Props props(final ActorRef pubSubMediator, final ActorSelection devOpsCommandsActor,
final ActorRef edgeCommandForwarder) {

return Props.create(GatewayProxyActor.class, pubSubMediator, devOpsCommandsActor, edgeCommandForwarder);
}

Expand Down Expand Up @@ -139,10 +135,6 @@ public Receive createReceive() {
getSender().tell(cause, getSelf());
})
.match(DittoRuntimeException.class, cre -> getSender().tell(cre, getSelf()))
.match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck ->
log.debug("Successfully subscribed to distributed pub/sub on topic '{}'",
subscribeAck.subscribe().topic())
)
.matchAny(m -> log.warning("Got unknown message, expected a 'Command': {}", m));

return receiveBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
package org.eclipse.ditto.gateway.service.starter;

import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

import org.eclipse.ditto.base.model.headers.translator.HeaderTranslator;
import org.eclipse.ditto.base.service.RootChildActorStarter;
import org.eclipse.ditto.base.service.actors.DittoRootActor;
import org.eclipse.ditto.edge.service.dispatching.EdgeCommandForwarderActor;
import org.eclipse.ditto.edge.service.dispatching.ShardRegions;
import org.eclipse.ditto.edge.service.headers.DittoHeadersValidator;
import org.eclipse.ditto.gateway.service.endpoints.directives.auth.DevopsAuthenticationDirective;
import org.eclipse.ditto.gateway.service.endpoints.directives.auth.DevopsAuthenticationDirectiveFactory;
import org.eclipse.ditto.gateway.service.endpoints.directives.auth.GatewayAuthenticationDirectiveFactory;
import org.eclipse.ditto.gateway.service.endpoints.routes.CustomApiRoutesProvider;
Expand All @@ -43,7 +41,6 @@
import org.eclipse.ditto.gateway.service.endpoints.utils.GatewaySignalEnrichmentProvider;
import org.eclipse.ditto.gateway.service.health.DittoStatusAndHealthProviderFactory;
import org.eclipse.ditto.gateway.service.health.GatewayHttpReadinessCheck;
import org.eclipse.ditto.gateway.service.health.StatusAndHealthProvider;
import org.eclipse.ditto.gateway.service.proxy.actors.GatewayProxyActor;
import org.eclipse.ditto.gateway.service.security.authentication.jwt.JwtAuthenticationFactory;
import org.eclipse.ditto.gateway.service.security.authentication.jwt.JwtAuthenticationResultProvider;
Expand All @@ -62,7 +59,6 @@
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.health.DefaultHealthCheckingActorFactory;
import org.eclipse.ditto.internal.utils.health.HealthCheckingActorOptions;
import org.eclipse.ditto.internal.utils.health.cluster.ClusterStatus;
import org.eclipse.ditto.internal.utils.health.routes.StatusRoute;
import org.eclipse.ditto.internal.utils.http.DefaultHttpClientFacade;
import org.eclipse.ditto.internal.utils.http.HttpClientFacade;
Expand All @@ -73,7 +69,6 @@

import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;
Expand All @@ -82,7 +77,6 @@
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.server.Route;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.Materializer;
import akka.stream.SystemMaterializer;

/**
Expand Down Expand Up @@ -212,24 +206,24 @@ private static Route createRoute(final ActorSystem actorSystem,
final HeaderTranslator headerTranslator) {

final var dittoExtensionConfig = ScopedConfig.dittoExtension(actorSystem.settings().config());
final AuthenticationConfig authConfig = gatewayConfig.getAuthenticationConfig();
final Materializer materializer = SystemMaterializer.get(actorSystem).materializer();
final var authConfig = gatewayConfig.getAuthenticationConfig();
final var materializer = SystemMaterializer.get(actorSystem).materializer();

final GatewayAuthenticationDirectiveFactory authenticationDirectiveFactory =
final var authenticationDirectiveFactory =
GatewayAuthenticationDirectiveFactory.get(actorSystem, dittoExtensionConfig);

final DevopsAuthenticationDirective devopsAuthenticationDirective =
final var devopsAuthenticationDirective =
devopsAuthenticationDirectiveFactory.devops();
final DevopsAuthenticationDirective statusAuthenticationDirective =
final var statusAuthenticationDirective =
devopsAuthenticationDirectiveFactory.status();

final Supplier<ClusterStatus> clusterStateSupplier = new ClusterStatusSupplier(Cluster.get(actorSystem));
final StatusAndHealthProvider statusAndHealthProvider =
final var clusterStateSupplier = new ClusterStatusSupplier(Cluster.get(actorSystem));
final var statusAndHealthProvider =
DittoStatusAndHealthProviderFactory.of(actorSystem, clusterStateSupplier, healthCheckConfig);

final var dittoHeadersValidator = DittoHeadersValidator.get(actorSystem, dittoExtensionConfig);

final HttpConfig httpConfig = gatewayConfig.getHttpConfig();
final var httpConfig = gatewayConfig.getHttpConfig();

final var streamingConfig = gatewayConfig.getStreamingConfig();
final var signalEnrichmentProvider = GatewaySignalEnrichmentProvider.get(actorSystem, dittoExtensionConfig);
Expand Down Expand Up @@ -283,7 +277,7 @@ private static Route createRoute(final ActorSystem actorSystem,
}

private ActorRef createHealthCheckActor(final HealthCheckConfig healthCheckConfig) {
final HealthCheckingActorOptions healthCheckingActorOptions =
final var healthCheckingActorOptions =
HealthCheckingActorOptions.getBuilder(healthCheckConfig.isEnabled(), healthCheckConfig.getInterval())
.build();

Expand All @@ -293,8 +287,7 @@ private ActorRef createHealthCheckActor(final HealthCheckConfig healthCheckConfi

private ActorRef startProxyActor(final ActorRefFactory actorSystem, final ActorRef pubSubMediator,
final ActorRef edgeCommandForwarder) {

final ActorSelection devOpsCommandsActor =
final var devOpsCommandsActor =
actorSystem.actorSelection(DevOpsRoute.DEVOPS_COMMANDS_ACTOR_SELECTION);

return startChildActor(GatewayProxyActor.ACTOR_NAME,
Expand All @@ -307,10 +300,10 @@ private static DevopsAuthenticationDirectiveFactory getDevopsAuthenticationDirec
final CacheConfig publicKeysConfig,
final DevOpsConfig devOpsConfig,
final ActorSystem actorSystem) {

final OAuthConfig devopsOauthConfig = devOpsConfig.getOAuthConfig();
final JwtAuthenticationFactory devopsJwtAuthenticationFactory =
final var devopsOauthConfig = devOpsConfig.getOAuthConfig();
final var devopsJwtAuthenticationFactory =
JwtAuthenticationFactory.newInstance(devopsOauthConfig, publicKeysConfig, httpClient, actorSystem);

return DevopsAuthenticationDirectiveFactory.newInstance(devopsJwtAuthenticationFactory, devOpsConfig);
}

Expand All @@ -320,6 +313,7 @@ private String getHostname(final HttpConfig httpConfig) {
hostname = LocalHostAddressSupplier.getInstance().get();
log.info("No explicit hostname configured, using HTTP hostname <{}>.", hostname);
}

return hostname;
}

Expand Down
4 changes: 2 additions & 2 deletions gateway/service/src/main/resources/gateway.conf
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ ditto {
port = 8080
port = ${?HTTP_PORT}
port = ${?PORT}
coordinated-shutdown-timeout = 10s
coordinated-shutdown-timeout = 65s
coordinated-shutdown-timeout = ${?COORDINATED_SHUTDOWN_REQUEST_TIMEOUT}

schema-versions = [2]
Expand Down Expand Up @@ -452,7 +452,7 @@ akka {
# default timeout is 5s for the phase - give a longer timeout in order
# to be able to let ongoing HTTP requests take longer:
# must be higher than ${ditto.gateway.http.coordinated-shutdown-timeout} !
timeout = 12s
timeout = 70s
timeout = ${?AKKA_COORDINATED_SHUTDOWN_PHASES_SERVICE_REQUESTS_DONE_TIMEOUT}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import akka.http.javadsl.model.ResponseEntity;
import akka.http.javadsl.model.StatusCodes;
import akka.util.ByteString;
import scala.concurrent.duration.FiniteDuration;

/**
* Unit test for {@link HttpRequestActor}.
Expand Down Expand Up @@ -628,4 +629,32 @@ public void liveCommandWithoutAckRequestWithEventualValidResponseSucceeds()
.isEqualTo(StatusCodes.NO_CONTENT);
}

@Test
public void actorShutsDownAfterServiceRequestDoneMessageWasReceived() {

final var thingId = ThingId.generateRandom();
final var attributeName = "foo";
final var attributePointer = JsonPointer.of(attributeName);
final long timeout = 2L;
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder()
.timeout(Duration.ofSeconds(timeout))
.build();

final var proxyActorProbe = ACTOR_SYSTEM_RESOURCE.newTestProbe();
final var modifyAttribute =
ModifyAttribute.of(thingId, attributePointer, JsonValue.of("bar"), dittoHeaders);
final var responseFuture = new CompletableFuture<HttpResponse>();
final var underTest =
createHttpRequestActor(proxyActorProbe.ref(), getHttpRequest(modifyAttribute), responseFuture);

final var supervisor = ACTOR_SYSTEM_RESOURCE.newTestProbe();
supervisor.watch(underTest);

underTest.tell(modifyAttribute, ActorRef.noSender());
proxyActorProbe.expectMsgClass(ModifyAttribute.class);

underTest.tell(AbstractHttpRequestActor.Control.SERVICE_REQUESTS_DONE, ActorRef.noSender());
supervisor.expectTerminated(underTest, FiniteDuration.apply(timeout + 1L, TimeUnit.SECONDS));
}

}

0 comments on commit acdade1

Please sign in to comment.