Skip to content

Commit

Permalink
extracted an abstract class AbstractActorWithShutdownBehavior which h…
Browse files Browse the repository at this point in the history
…andles the shutdown behaviour of Actors;

refactored several Actors to use the new abstract class;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Sep 27, 2022
1 parent b964079 commit f635d09
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.WithEntity;
import org.eclipse.ditto.internal.utils.akka.actors.AbstractActorWithShutdownBehavior;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
Expand All @@ -53,7 +54,6 @@

import akka.Done;
import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.cluster.pubsub.DistributedPubSubMediator;
Expand All @@ -71,7 +71,7 @@
* {@link Thing}s one after one in a stream. That ensures that the cluster messages size must not be increased when
* streaming a larger amount of Things in the cluster.
*/
public final class ThingsAggregatorProxyActor extends AbstractActor {
public final class ThingsAggregatorProxyActor extends AbstractActorWithShutdownBehavior {

/**
* The name of this Actor in the ActorSystem.
Expand Down Expand Up @@ -108,12 +108,11 @@ public static Props props(final ActorRef pubSubMediator) {
}

@Override
public Receive createReceive() {
public Receive handleMessage() {
return ReceiveBuilder.create()
.match(RetrieveThings.class, rt -> handleRetrieveThings(rt, rt))
.match(SudoRetrieveThings.class, srt -> handleSudoRetrieveThings(srt, srt))
.matchEquals(Control.OP_COMPLETE, this::opComplete)
.matchEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone)
.matchAny(m -> {
log.warning("Got unknown message: {}", m);
unhandled(m);
Expand Down Expand Up @@ -315,7 +314,13 @@ private void opComplete(final Control opComplete) {
}
}

private void serviceRequestsDone(final Control serviceRequestsDone) {
@Override
protected void serviceUnbind(final Control serviceUnbind) {
// nothing to do
}

@Override
public void serviceRequestsDone(final Control serviceRequestsDone) {
if (ongoingRequests == 0) {
log.info("{}: no ongoing requests", serviceRequestsDone);
getSender().tell(Done.getInstance(), getSelf());
Expand Down Expand Up @@ -386,8 +391,4 @@ public String toString() {

}

enum Control {
OP_COMPLETE,
SERVICE_REQUESTS_DONE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.eclipse.ditto.gateway.service.util.config.endpoints.CommandConfig;
import org.eclipse.ditto.gateway.service.util.config.endpoints.HttpConfig;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
import org.eclipse.ditto.internal.utils.akka.actors.AbstractActorWithShutdownBehavior;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.JsonValueSourceRef;
Expand Down Expand Up @@ -103,13 +104,7 @@
* it should fulfill. When it receives a command response, exception, status or timeout message, it renders the message
* into an HTTP response and stops itself. Its behavior can be modified by overriding the protected instance methods.
*/
public abstract class AbstractHttpRequestActor extends AbstractActor {

/**
* Ask-timeout in shutdown tasks. Its duration should be long enough for http requests to succeed but
* ultimately does not matter because each shutdown phase has its own timeout.
*/
private static final Duration SHUTDOWN_ASK_TIMEOUT = Duration.ofMinutes(2L);
public abstract class AbstractHttpRequestActor extends AbstractActorWithShutdownBehavior {

/**
* Signals the completion of a stream request.
Expand Down Expand Up @@ -212,8 +207,15 @@ public void postStop() {
cancellableShutdownTask.cancel();
}

private static HttpResponse createHttpResponse(final HttpStatus httpStatus) {
final var statusCode = StatusCodes.lookup(httpStatus.getCode())
.orElse(StatusCodes.custom(httpStatus.getCode(), "custom", "custom"));

return HttpResponse.create().withStatus(statusCode);
}

@Override
public AbstractActor.Receive createReceive() {
public AbstractActor.Receive handleMessage() {
return ReceiveBuilder.create()
.match(Status.Failure.class, failure -> {
Throwable cause = failure.cause();
Expand Down Expand Up @@ -251,11 +253,18 @@ public AbstractActor.Receive createReceive() {
.build();
}

private static HttpResponse createHttpResponse(final HttpStatus httpStatus) {
final var statusCode = StatusCodes.lookup(httpStatus.getCode())
.orElse(StatusCodes.custom(httpStatus.getCode(), "custom", "custom"));
return HttpResponse.create().withStatus(statusCode);
@Override
public void serviceUnbind(final Control serviceUnbind) {
// nothing to do
}

@Override
public void serviceRequestsDone(final Control serviceRequestsDone) {
logger.info("{}: waiting to complete the request", serviceRequestsDone);
inCoordinatedShutdown = true;
coordinatedShutdownSender = getSender();
}

private void handleCommand(final Command<?> command) {
try {
logger.setCorrelationId(command);
Expand Down Expand Up @@ -779,12 +788,6 @@ private static Duration getReceiveTimeout(final Signal<?> originatingSignal, fin
}
}

private void serviceRequestsDone(final Control serviceRequestsDone) {
logger.info("{}: waiting to complete the request", serviceRequestsDone);
inCoordinatedShutdown = true;
coordinatedShutdownSender = getSender();
}

private record HttpAcknowledgementConfig(HttpConfig httpConfig) implements AcknowledgementConfig {

private static AcknowledgementConfig of(final HttpConfig httpConfig) {
Expand Down Expand Up @@ -813,8 +816,4 @@ public int getIssuedMaxBytes() {

}

enum Control {
SERVICE_REQUESTS_DONE
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package org.eclipse.ditto.gateway.service.proxy.actors;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand All @@ -22,6 +21,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.gateway.service.util.config.endpoints.HttpConfig;
import org.eclipse.ditto.internal.utils.akka.actors.AbstractActorWithShutdownBehavior;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
Expand All @@ -41,7 +41,6 @@
import org.eclipse.ditto.thingsearch.model.signals.commands.query.QueryThingsResponse;

import akka.Done;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.CoordinatedShutdown;
Expand All @@ -58,13 +57,7 @@
* respond to searches with max. 200 search results.
* </p>
*/
final class QueryThingsPerRequestActor extends AbstractActor {

/**
* Ask-timeout in shutdown tasks. Its duration should be long enough for http requests to succeed but
* ultimately does not matter because each shutdown phase has its own timeout.
*/
private static final Duration SHUTDOWN_ASK_TIMEOUT = Duration.ofMinutes(2L);
final class QueryThingsPerRequestActor extends AbstractActorWithShutdownBehavior {

private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

Expand Down Expand Up @@ -133,7 +126,7 @@ public void postStop() {
}

@Override
public Receive createReceive() {
public Receive handleMessage() {
return receiveBuilder()
.match(ReceiveTimeout.class, receiveTimeout -> {
log.debug("Got ReceiveTimeout");
Expand Down Expand Up @@ -191,7 +184,6 @@ public Receive createReceive() {

stopMyself();
})
.matchEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone)
.matchAny(any -> {
// all other messages (e.g. DittoRuntimeExceptions) are directly returned to the sender:
originatingSender.tell(any, getSender());
Expand All @@ -200,6 +192,18 @@ public Receive createReceive() {
.build();
}

@Override
public void serviceUnbind(final Control serviceUnbind) {
// nothing to do
}

@Override
public void serviceRequestsDone(final Control serviceRequestsDone) {
log.info("{}: waiting to complete the request", serviceRequestsDone);
inCoordinatedShutdown = true;
coordinatedShutdownSender = getSender();
}

private boolean queryThingsOnlyContainsThingIdSelector() {
final Optional<JsonFieldSelector> fields = queryThings.getFields();
return fields.isPresent() && fields.get().getPointers()
Expand Down Expand Up @@ -275,14 +279,4 @@ private void stopMyself() {
inCoordinatedShutdown = false;
}

private void serviceRequestsDone(final Control serviceRequestsDone) {
log.info("{}: waiting to complete the request", serviceRequestsDone);
inCoordinatedShutdown = true;
coordinatedShutdownSender = getSender();
}

enum Control {
SERVICE_REQUESTS_DONE
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.utils.akka.actors;

import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.time.Duration;

import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;

/**
* Actor that handles commands during graceful shutdown.
*/
public abstract class AbstractActorWithShutdownBehavior extends AbstractActor {

/**
* Ask-timeout in shutdown tasks. Its duration should be long enough but ultimately does not
* matter because each shutdown phase has its own timeout.
*/
public static final Duration SHUTDOWN_ASK_TIMEOUT = Duration.ofMinutes(2L);

protected AbstractActorWithShutdownBehavior() {}

/**
* @return Actor's usual message handler. It will always be invoked in the actor's thread.
*/
protected abstract Receive handleMessage();

/**
* Handles the service unbind .
*/
protected abstract void serviceUnbind(final Control serviceUnbind);

/**
* Handles waiting for ongoing requests.
*/
protected abstract void serviceRequestsDone(final Control serviceRequestsDone);

/**
* Switches the actor's message handler.
* <p>
* <em>DO NOT call {@code getContext().become()} directly; otherwise the actor loses the ability to lock
* itself.</em>
* </p>
*
* @param receive the new message handler.
*/
protected void become(final Receive receive) {
getContext().become(shutdownBehavior(receive));
}

private Receive shutdownBehavior(final Receive receive) {
checkNotNull(receive, "actor's message handler");
return ReceiveBuilder.create()
.matchEquals(Control.SERVICE_UNBIND, this::serviceUnbind)
.matchEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone)
.matchAny(message -> {
final PartialFunction<Object, ?> handler = receive.onMessage();
if (handler.isDefinedAt(message)) {
handler.apply(message);
} else {
unhandled(message);
}
})
.build();
}

/**
* Switches the actor's message handler.
* <p>
* <em>DO NOT call {@code getContext().become()} directly; otherwise the actor loses the ability to lock
* itself.</em>
* </p>
*
* @param receive the new message handler.
* @param discardOld whether the old handler should be discarded.
*/
protected void become(final Receive receive, final boolean discardOld) {
getContext().become(shutdownBehavior(receive), discardOld);
}

@Override
public final Receive createReceive() {
return shutdownBehavior(handleMessage());
}

public enum Control {
SERVICE_UNBIND,

SERVICE_REQUESTS_DONE,

OP_COMPLETE
}

}
Loading

0 comments on commit f635d09

Please sign in to comment.