Skip to content

Commit

Permalink
made coordinated shutdown task in AbstractHttpRequestActor cancellabl…
Browse files Browse the repository at this point in the history
…e and remove task when actor is shutdown;

wait for requests in QueryThingsPerRequestActor;
pass httpConfig to GatewayProxyActor and QueryThingsPerRequestActor;
add test to QueryThingsPerRequestActorTest,

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Sep 15, 2022
1 parent acdade1 commit 7174e8c
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import akka.Done;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.CoordinatedShutdown;
import akka.actor.ReceiveTimeout;
import akka.actor.Status;
Expand Down Expand Up @@ -105,7 +106,7 @@
public abstract class AbstractHttpRequestActor extends AbstractActor {

/**
* Ask-timeout in shutdown tasks. Its duration should be long enough for solution commands to succeed but
* Ask-timeout in shutdown tasks. Its duration should be long enough for http requests to succeed but
* ultimately does not matter because each shutdown phase has its own timeout.
*/
private static final Duration SHUTDOWN_ASK_TIMEOUT = Duration.ofMinutes(2L);
Expand All @@ -128,6 +129,10 @@ public abstract class AbstractHttpRequestActor extends AbstractActor {
private final DittoDiagnosticLoggingAdapter logger;
private Supplier<DittoRuntimeException> timeoutExceptionSupplier;

private Cancellable cancellableShutdownTask;
private boolean inCoordinatedShutdown;
private ActorRef coordinatedShutdownSender;

protected AbstractHttpRequestActor(final ActorRef proxyActor,
final HeaderTranslator headerTranslator,
final HttpRequest request,
Expand Down Expand Up @@ -157,6 +162,7 @@ protected AbstractHttpRequestActor(final ActorRef proxyActor,
responseLocationUri = null;
receivedCommand = null;
timeoutExceptionSupplier = null;
inCoordinatedShutdown = false;
logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
setReceiveTimeout(httpConfig.getRequestTimeout());
}
Expand Down Expand Up @@ -193,13 +199,19 @@ private void setReceiveTimeout(final Duration receiveTimeout) {
public void preStart() {
final var coordinatedShutdown = CoordinatedShutdown.get(getContext().getSystem());

final var serviceRequestsDoneTask = "service-requests-done-http-request-actor" ;
coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone(), serviceRequestsDoneTask,
final var serviceRequestsDoneTask = "service-requests-done-http-request-actor";
cancellableShutdownTask = coordinatedShutdown.addCancellableTask(CoordinatedShutdown.PhaseServiceRequestsDone(),
serviceRequestsDoneTask,
() -> Patterns.ask(getSelf(), Control.SERVICE_REQUESTS_DONE, SHUTDOWN_ASK_TIMEOUT)
.thenApply(reply -> Done.done())
);
}

@Override
public void postStop() {
cancellableShutdownTask.cancel();
}

@Override
public AbstractActor.Receive createReceive() {
return ReceiveBuilder.create()
Expand Down Expand Up @@ -602,15 +614,18 @@ private void completeWithResult(final HttpResponse response) {

private void stop() {
logger.clearMDC();
// complete coordinated shutdown phase - ServiceRequestsDone
getSelf().tell(Done.getInstance(), getSelf());
// destroy ourselves:
if (inCoordinatedShutdown) {
// complete coordinated shutdown phase - ServiceRequestsDone
coordinatedShutdownSender.tell(Done.getInstance(), ActorRef.noSender());
}

// destroy ourselves
getContext().stop(getSelf());
inCoordinatedShutdown = false;
}

private static HttpResponse addEntityAccordingToContentType(final HttpResponse response, final String entityPlain,
final ContentType contentType) {

final ByteString byteString;

if (contentType.isBinary()) {
Expand Down Expand Up @@ -767,6 +782,8 @@ private static Duration getReceiveTimeout(final Signal<?> originatingSignal, fin

private void serviceRequestsDone(final Control serviceRequestsDone) {
logger.info("{}: waiting to complete the request", serviceRequestsDone);
inCoordinatedShutdown = true;
coordinatedShutdownSender = getSender();
}

private record HttpAcknowledgementConfig(HttpConfig httpConfig) implements AcknowledgementConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.gateway.service.util.config.endpoints.HttpConfig;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
Expand Down Expand Up @@ -49,17 +50,19 @@ public final class GatewayProxyActor extends AbstractActor {
private final ActorSelection devOpsCommandsActor;
private final ActorRef edgeCommandForwarder;
private final ActorRef pubSubMediator;
private final HttpConfig httpConfig;

private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

private final ActorRef statisticsActor;

@SuppressWarnings("unused")
private GatewayProxyActor(final ActorRef pubSubMediator, final ActorSelection devOpsCommandsActor,
final ActorRef edgeCommandForwarder) {
final ActorRef edgeCommandForwarder, final HttpConfig httpConfig) {
this.pubSubMediator = pubSubMediator;
this.devOpsCommandsActor = devOpsCommandsActor;
this.edgeCommandForwarder = edgeCommandForwarder;
this.httpConfig = httpConfig;
statisticsActor = getContext().actorOf(StatisticsActor.props(pubSubMediator), StatisticsActor.ACTOR_NAME);
}

Expand All @@ -72,8 +75,9 @@ private GatewayProxyActor(final ActorRef pubSubMediator, final ActorSelection de
* @return the Akka configuration Props object.
*/
public static Props props(final ActorRef pubSubMediator, final ActorSelection devOpsCommandsActor,
final ActorRef edgeCommandForwarder) {
return Props.create(GatewayProxyActor.class, pubSubMediator, devOpsCommandsActor, edgeCommandForwarder);
final ActorRef edgeCommandForwarder, final HttpConfig httpConfig) {
return Props.create(GatewayProxyActor.class, pubSubMediator, devOpsCommandsActor, edgeCommandForwarder,
httpConfig);
}

static boolean isLiveCommandOrEvent(final Signal<?> signal) {
Expand Down Expand Up @@ -117,7 +121,8 @@ public Receive createReceive() {
})
.match(QueryThings.class, qt -> {
final ActorRef responseActor = getContext().actorOf(
QueryThingsPerRequestActor.props(qt, edgeCommandForwarder, getSender(), pubSubMediator)
QueryThingsPerRequestActor.props(qt, edgeCommandForwarder, getSender(), pubSubMediator,
httpConfig)
);
edgeCommandForwarder.tell(qt, responseActor);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.gateway.service.proxy.actors;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand All @@ -20,12 +21,10 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.gateway.service.util.config.endpoints.GatewayHttpConfig;
import org.eclipse.ditto.gateway.service.util.config.endpoints.HttpConfig;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFieldSelector;
Expand All @@ -41,10 +40,14 @@
import org.eclipse.ditto.thingsearch.model.signals.commands.query.QueryThings;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.QueryThingsResponse;

import akka.Done;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.CoordinatedShutdown;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.pattern.Patterns;

/**
* Actor which is started for each {@link QueryThings} command in the gateway handling the response from
Expand All @@ -57,6 +60,12 @@
*/
final class QueryThingsPerRequestActor extends AbstractActor {

/**
* Ask-timeout in shutdown tasks. Its duration should be long enough for http requests to succeed but
* ultimately does not matter because each shutdown phase has its own timeout.
*/
private static final Duration SHUTDOWN_ASK_TIMEOUT = Duration.ofMinutes(2L);

private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

private final QueryThings queryThings;
Expand All @@ -66,22 +75,24 @@ final class QueryThingsPerRequestActor extends AbstractActor {

@Nullable private QueryThingsResponse queryThingsResponse;
@Nullable private List<ThingId> queryThingsResponseThingIds;
private Cancellable cancellableShutdownTask;
private boolean inCoordinatedShutdown;
@Nullable private ActorRef coordinatedShutdownSender;

@SuppressWarnings("unused")
private QueryThingsPerRequestActor(final QueryThings queryThings,
final ActorRef commandForwarderActor,
final ActorRef originatingSender,
final ActorRef pubSubMediator) {
final ActorRef pubSubMediator,
final HttpConfig httpConfig) {

this.queryThings = queryThings;
this.commandForwarderActor = commandForwarderActor;
this.originatingSender = originatingSender;
this.pubSubMediator = pubSubMediator;
queryThingsResponse = null;

final HttpConfig httpConfig = GatewayHttpConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())
);
inCoordinatedShutdown = false;
coordinatedShutdownSender = null;

getContext().setReceiveTimeout(httpConfig.getRequestTimeout());
}
Expand All @@ -94,10 +105,28 @@ private QueryThingsPerRequestActor(final QueryThings queryThings,
static Props props(final QueryThings queryThings,
final ActorRef commandForwarderActor,
final ActorRef originatingSender,
final ActorRef pubSubMediator) {
final ActorRef pubSubMediator,
final HttpConfig httpConfig) {

return Props.create(QueryThingsPerRequestActor.class, queryThings, commandForwarderActor, originatingSender,
pubSubMediator);
pubSubMediator, httpConfig);
}

@Override
public void preStart() {
final var coordinatedShutdown = CoordinatedShutdown.get(getContext().getSystem());

final var serviceRequestsDoneTask = "service-requests-done-query-things-actor" ;
cancellableShutdownTask = coordinatedShutdown.addCancellableTask(CoordinatedShutdown.PhaseServiceRequestsDone(),
serviceRequestsDoneTask,
() -> Patterns.ask(getSelf(), Control.SERVICE_REQUESTS_DONE, SHUTDOWN_ASK_TIMEOUT)
.thenApply(reply -> Done.done())
);
}

@Override
public void postStop() {
cancellableShutdownTask.cancel();
}

@Override
Expand Down Expand Up @@ -159,6 +188,7 @@ public Receive createReceive() {

stopMyself();
})
.matchEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone)
.matchAny(any -> {
// all other messages (e.g. DittoRuntimeExceptions) are directly returned to the sender:
originatingSender.tell(any, getSender());
Expand Down Expand Up @@ -232,8 +262,24 @@ private void notifyOutOfSyncThings(final JsonArray rtrEntity) {
}

private void stopMyself() {
if (inCoordinatedShutdown && coordinatedShutdownSender != null) {
// complete coordinated shutdown phase - ServiceRequestsDone
coordinatedShutdownSender.tell(Done.getInstance(), ActorRef.noSender());
}

getContext().cancelReceiveTimeout();
getContext().stop(getSelf());
inCoordinatedShutdown = false;
}

private void serviceRequestsDone(final Control serviceRequestsDone) {
log.info("{}: waiting to complete the request", serviceRequestsDone);
inCoordinatedShutdown = true;
coordinatedShutdownSender = getSender();
}

enum Control {
SERVICE_REQUESTS_DONE
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private GatewayRootActor(final GatewayConfig gatewayConfig, final ActorRef pubSu
final var dittoExtensionConfig = ScopedConfig.dittoExtension(config);
final var edgeCommandForwarder = startChildActor(EdgeCommandForwarderActor.ACTOR_NAME,
EdgeCommandForwarderActor.props(pubSubMediator, shardRegions));
final var proxyActor = startProxyActor(actorSystem, pubSubMediator, edgeCommandForwarder);
final var proxyActor = startProxyActor(actorSystem, pubSubMediator, edgeCommandForwarder, httpConfig);

pubSubMediator.tell(DistPubSubAccess.put(getSelf()), getSelf());

Expand Down Expand Up @@ -286,12 +286,12 @@ private ActorRef createHealthCheckActor(final HealthCheckConfig healthCheckConfi
}

private ActorRef startProxyActor(final ActorRefFactory actorSystem, final ActorRef pubSubMediator,
final ActorRef edgeCommandForwarder) {
final ActorRef edgeCommandForwarder, final HttpConfig httpConfig) {
final var devOpsCommandsActor =
actorSystem.actorSelection(DevOpsRoute.DEVOPS_COMMANDS_ACTOR_SELECTION);

return startChildActor(GatewayProxyActor.ACTOR_NAME,
GatewayProxyActor.props(pubSubMediator, devOpsCommandsActor, edgeCommandForwarder));
GatewayProxyActor.props(pubSubMediator, devOpsCommandsActor, edgeCommandForwarder, httpConfig));

}

Expand Down
2 changes: 1 addition & 1 deletion gateway/service/src/main/resources/gateway.conf
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ ditto {
request-timeout = ${?REQUEST_TIMEOUT}

# additional media-types which will also be accepted besides JSON, for compatibility/fallback reasons.
# comma seperated list of media types, default: application/octet-stream
# comma separated list of media types, default: application/octet-stream
additional-accepted-media-types = ${?ADDITIONAL_ACCEPTED_MEDIA_TYPES}

query-params-as-headers = [
Expand Down
Loading

0 comments on commit 7174e8c

Please sign in to comment.