From 385825de199418e4e74cef21a942244d14e43618 Mon Sep 17 00:00:00 2001 From: Yufei Cai Date: Thu, 20 Oct 2022 14:08:42 +0200 Subject: [PATCH] Revert "Make acknowledgement forwarder actor notify outbound dispatching actor when acknowledgements are forwarded." This reverts commit 1612e74a9a13ee3c3f6993b99e067ed3556df011. Signed-off-by: Yufei Cai --- .../messaging/OutboundDispatchingActor.java | 27 ++++++++++------- .../AcknowledgementForwarderActor.java | 30 ++++++------------- .../AcknowledgementForwarderActorStarter.java | 19 +++++------- 3 files changed, 32 insertions(+), 44 deletions(-) diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundDispatchingActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundDispatchingActor.java index ddd54006c5..9bcb577a6e 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundDispatchingActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundDispatchingActor.java @@ -12,6 +12,7 @@ */ package org.eclipse.ditto.connectivity.service.messaging; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -43,9 +44,11 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.CoordinatedShutdown; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; import akka.japi.pf.ReceiveBuilder; +import akka.pattern.Patterns; /** * This Actor makes the decision whether to dispatch outbound signals and their acknowledgements or to drop them. @@ -87,7 +90,6 @@ public Receive createReceive() { .match(DittoRuntimeException.class, this::forwardWithoutCheck) .match(Signal.class, this::handleSignal) .match(Terminated.class, this::actorTerminated) - .matchEquals(AcknowledgementForwarderActor.Control.ACKNOWLEDGEMENT_FORWARDED, this::ackForwarded) .matchEquals(Control.SHUTDOWN, this::gracefulShutdown) .matchAny(message -> logger.warning("Unknown message: <{}>", message)) .build(); @@ -179,8 +181,8 @@ private Signal adjustSignalAndStartAckForwarder(final Signal signal, final entityId, signal, settings.getAcknowledgementConfig(), - this::isSourceDeclaredOrTargetIssuedAck, - getSelf()); + this::isSourceDeclaredOrTargetIssuedAck + ); acksCounter.register(signalWithAckForwarder, getContext()); return signalWithAckForwarder.first(); } else { @@ -201,14 +203,6 @@ private void actorTerminated(final Terminated event) { } } - private void ackForwarded(final AcknowledgementForwarderActor.Control trigger) { - acksCounter.countDown(getSender()); - if (acksCounter.shouldTerminateOutboundDispatchingActor()) { - logger.debug("All tasks done, shutting down"); - getContext().stop(getSelf()); - } - } - private static boolean isLiveCommandExpectingResponse(final Signal signal) { final var headers = signal.getDittoHeaders(); return Command.isCommand(signal) && @@ -233,6 +227,17 @@ private void handleInboundResponseOrAcknowledgement(final Signal responseOrAc if (ackForwarderOptional.isPresent()) { final var acknowledgementForwarder = ackForwarderOptional.get(); acknowledgementForwarder.forward(responseOrAck, context); + acksCounter.countDown(acknowledgementForwarder); + if (acksCounter.shouldTerminateOutboundDispatchingActor()) { + logger.debug("Terminating after dispatching the last acknowledgement"); + // wait for ack forwarder to process the final response or acknowledgement before stopping self + final Duration shutdownTimeout = Duration.ofMinutes(2); + final var ping = AcknowledgementForwarderActor.Control.PING; + final var poisonPillFuture = + Patterns.ask(acknowledgementForwarder, ping, shutdownTimeout) + .handle((result, error) -> PoisonPill.getInstance()); + Patterns.pipe(poisonPillFuture, getContext().dispatcher()).to(getSelf()); + } } else { final var forwarderActorClassName = AcknowledgementForwarderActor.class.getSimpleName(); final var template = "No {} found. Forwarding signal to proxy actor: <{}>"; diff --git a/edge/service/src/main/java/org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementForwarderActor.java b/edge/service/src/main/java/org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementForwarderActor.java index a4cb495b0b..7d87c57003 100644 --- a/edge/service/src/main/java/org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementForwarderActor.java +++ b/edge/service/src/main/java/org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementForwarderActor.java @@ -62,12 +62,11 @@ public final class AcknowledgementForwarderActor extends AbstractActor { private final ActorSelection commandForwarder; private final String correlationId; @Nullable private final String ackgregatorAddressFallback; - @Nullable private final ActorRef notificationReceiver; private final DittoDiagnosticLoggingAdapter log; @SuppressWarnings("unused") private AcknowledgementForwarderActor(final ActorSelection commandForwarder, final DittoHeaders dittoHeaders, - final Duration defaultTimeout, @Nullable final ActorRef notificationReceiver) { + final Duration defaultTimeout) { this.commandForwarder = commandForwarder; correlationId = dittoHeaders.getCorrelationId() @@ -76,7 +75,6 @@ private AcknowledgementForwarderActor(final ActorSelection commandForwarder, fin getSelf().path().name().replace(ACTOR_NAME_PREFIX, "") ); ackgregatorAddressFallback = dittoHeaders.get(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey()); - this.notificationReceiver = notificationReceiver; log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this); getContext().setReceiveTimeout(dittoHeaders.getTimeout().orElse(defaultTimeout)); @@ -88,13 +86,11 @@ private AcknowledgementForwarderActor(final ActorSelection commandForwarder, fin * @param commandForwarder the actor ref of the edge command forwarder actor. * @param dittoHeaders the DittoHeaders of the Signal which contained the request for Acknowledgements. * @param defaultTimeout the default timeout to apply when {@code dittoHeaders} did not contain a specific timeout. - * @param notificationReceiver receiver of notifications when an acknowledgement is received. * @return the Akka configuration Props object. */ static Props props(final ActorSelection commandForwarder, final DittoHeaders dittoHeaders, - final Duration defaultTimeout, @Nullable final ActorRef notificationReceiver) { - return Props.create(AcknowledgementForwarderActor.class, commandForwarder, dittoHeaders, defaultTimeout, - notificationReceiver); + final Duration defaultTimeout) { + return Props.create(AcknowledgementForwarderActor.class, commandForwarder, dittoHeaders, defaultTimeout); } @Override @@ -102,6 +98,7 @@ public Receive createReceive() { return receiveBuilder() .match(CommandResponse.class, this::forwardCommandResponse) .match(ReceiveTimeout.class, this::handleReceiveTimeout) + .matchEquals(Control.PING, ping -> getSender().tell(Control.PONG, getSelf())) .matchAny(m -> log.warning("Received unexpected message: <{}>", m)) .build(); } @@ -124,7 +121,6 @@ private void forwardCommandResponse(final WithDittoHeaders acknowledgementOrResp acknowledgementOrResponse.getClass().getSimpleName(), dittoHeaders); commandForwarder.tell(acknowledgementOrResponse, ActorRef.noSender()); } - notifyReceiver(); } private void handleReceiveTimeout(final ReceiveTimeout receiveTimeout) { @@ -191,7 +187,7 @@ public static Signal startAcknowledgementForwarder(final ActorRefFactory acto final Predicate isAckLabelAllowed) { return startAcknowledgementForwarderForSignal(actorRefFactory, parent, commandForwarder, entityId, signal, - acknowledgementConfig, isAckLabelAllowed, null).first(); + acknowledgementConfig, isAckLabelAllowed).first(); } /** @@ -208,7 +204,6 @@ public static Signal startAcknowledgementForwarder(final ActorRefFactory acto * @param signal the signal for which acknowledgements are expected. * @param acknowledgementConfig the AcknowledgementConfig to use for looking up config values. * @param isAckLabelAllowed predicate for whether an ack label is allowed for publication at this channel. - * @param notificationReceiver receiver of notifications when acknowledgements are received. * @return the signal for which a suitable ack forwarder has started whenever required, and the actor reference * of the forwarder if it started. * @throws NullPointerException if any argument is {@code null}. @@ -220,13 +215,12 @@ public static Pair, Optional> startAcknowledgementForwarderF final EntityId entityId, final Signal signal, final AcknowledgementConfig acknowledgementConfig, - final Predicate isAckLabelAllowed, - @Nullable final ActorRef notificationReceiver) { + final Predicate isAckLabelAllowed) { final AcknowledgementForwarderActorStarter starter = AcknowledgementForwarderActorStarter.getInstance(actorRefFactory, parent, commandForwarder, entityId, signal, acknowledgementConfig, isAckLabelAllowed); final DittoHeadersBuilder dittoHeadersBuilder = signal.getDittoHeaders().toBuilder(); - final var forwarderOptional = starter.getConflictFreeWithActorRef(notificationReceiver); + final var forwarderOptional = starter.getConflictFreeWithActorRef(); forwarderOptional.map(Pair::first).ifPresent(dittoHeadersBuilder::correlationId); if (!signal.getDittoHeaders().getAcknowledgementRequests().isEmpty()) { dittoHeadersBuilder.acknowledgementRequests(starter.getAllowedAckRequests()); @@ -234,16 +228,10 @@ public static Pair, Optional> startAcknowledgementForwarderF return Pair.create(signal.setDittoHeaders(dittoHeadersBuilder.build()), forwarderOptional.map(Pair::second)); } - private void notifyReceiver() { - if (notificationReceiver != null) { - notificationReceiver.tell(Control.ACKNOWLEDGEMENT_FORWARDED, getSelf()); - } - } - /** - * Notification for when an acknowledgement is forwarded. + * Ping command and response for the acknowledgement forwarder actor. */ public enum Control { - ACKNOWLEDGEMENT_FORWARDED + PING, PONG } } diff --git a/edge/service/src/main/java/org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementForwarderActorStarter.java b/edge/service/src/main/java/org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementForwarderActorStarter.java index f21ad861b6..c41cd5203b 100644 --- a/edge/service/src/main/java/org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementForwarderActorStarter.java +++ b/edge/service/src/main/java/org/eclipse/ditto/edge/service/acknowledgements/AcknowledgementForwarderActorStarter.java @@ -21,8 +21,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import javax.annotation.Nullable; - import org.eclipse.ditto.base.model.acks.AcknowledgementLabel; import org.eclipse.ditto.base.model.acks.AcknowledgementRequest; import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel; @@ -134,7 +132,7 @@ public Optional get() { ActorRef actorRef = null; if (hasEffectiveAckRequests(signal, acknowledgementRequests)) { try { - actorRef = startAckForwarderActor(dittoHeaders, null); + actorRef = startAckForwarderActor(dittoHeaders); } catch (final InvalidActorNameException e) { // In case that the actor with that name already existed, the correlation-id was already used recently: declineAllNonDittoAckRequests(getDuplicateCorrelationIdException(e)); @@ -151,18 +149,17 @@ public Optional get() { * start because no acknowledgement was requested. */ public Optional getConflictFree() { - return getConflictFreeWithActorRef(null).map(Pair::first); + return getConflictFreeWithActorRef().map(Pair::first); } /** * Start an acknowledgement forwarder. * Always succeeds. * - * @param notificationReceiver receiver of notifications when an acknowledgement is received. * @return the new correlation ID togeether with the started forwarder if an ack forwarder started, or an empty * optional if the ack forwarder did not start because no acknowledgement was requested. */ - public Optional> getConflictFreeWithActorRef(@Nullable final ActorRef notificationReceiver) { + public Optional> getConflictFreeWithActorRef() { if (hasEffectiveAckRequests(signal, acknowledgementRequests)) { final DittoHeadersBuilder builder = dittoHeaders.toBuilder() .acknowledgementRequests(acknowledgementRequests); @@ -173,8 +170,7 @@ public Optional> getConflictFreeWithActorRef(@Nullable fi while (true) { try { builder.correlationId(correlationId); - return Optional.of(Pair.create(correlationId, - startAckForwarderActor(builder.build(), notificationReceiver))); + return Optional.of(Pair.create(correlationId, startAckForwarderActor(builder.build()))); } catch (final InvalidActorNameException e) { // generate a new ID correlationId = joinPrefixAndCounter(prefix, ++counter); @@ -210,10 +206,9 @@ private Pair parseCorrelationId(final DittoHeaders dittoHeaders return Pair.create(UUID.randomUUID().toString(), -1); } - private ActorRef startAckForwarderActor(final DittoHeaders dittoHeaders, - @Nullable final ActorRef notificationReceiver) { + private ActorRef startAckForwarderActor(final DittoHeaders dittoHeaders) { final Props props = AcknowledgementForwarderActor.props(commandForwarder, dittoHeaders, - acknowledgementConfig.getForwarderFallbackTimeout(), notificationReceiver); + acknowledgementConfig.getForwarderFallbackTimeout()); return actorRefFactory.actorOf(props, AcknowledgementForwarderActor.determineActorName(dittoHeaders)); } @@ -239,7 +234,7 @@ private void declineAllNonDittoAckRequests(final DittoRuntimeException dittoRunt } else { LOGGER.withCorrelationId(headers) .error("Received DittoRuntimeException <{}> did not contain header of acknowledgement aggregator " + - "address: {}", dittoRuntimeException.getClass().getSimpleName(), headers); + "address: {}", dittoRuntimeException.getClass().getSimpleName(), headers); } }