Skip to content

Commit

Permalink
Make acknowledgement forwarder actor notify outbound dispatching acto…
Browse files Browse the repository at this point in the history
…r when acknowledgements are forwarded.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 20, 2022
1 parent 9438959 commit 1612e74
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package org.eclipse.ditto.connectivity.service.messaging;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -44,11 +43,9 @@
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.CoordinatedShutdown;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;

/**
* This Actor makes the decision whether to dispatch outbound signals and their acknowledgements or to drop them.
Expand Down Expand Up @@ -90,6 +87,7 @@ public Receive createReceive() {
.match(DittoRuntimeException.class, this::forwardWithoutCheck)
.match(Signal.class, this::handleSignal)
.match(Terminated.class, this::actorTerminated)
.matchEquals(AcknowledgementForwarderActor.Control.ACKNOWLEDGEMENT_FORWARDED, this::ackForwarded)
.matchEquals(Control.SHUTDOWN, this::gracefulShutdown)
.matchAny(message -> logger.warning("Unknown message: <{}>", message))
.build();
Expand Down Expand Up @@ -181,8 +179,8 @@ private Signal<?> adjustSignalAndStartAckForwarder(final Signal<?> signal, final
entityId,
signal,
settings.getAcknowledgementConfig(),
this::isSourceDeclaredOrTargetIssuedAck
);
this::isSourceDeclaredOrTargetIssuedAck,
getSelf());
acksCounter.register(signalWithAckForwarder, getContext());
return signalWithAckForwarder.first();
} else {
Expand All @@ -203,6 +201,14 @@ private void actorTerminated(final Terminated event) {
}
}

private void ackForwarded(final AcknowledgementForwarderActor.Control trigger) {
acksCounter.countDown(getSender());
if (acksCounter.shouldTerminateOutboundDispatchingActor()) {
logger.debug("All tasks done, shutting down");
getContext().stop(getSelf());
}
}

private static boolean isLiveCommandExpectingResponse(final Signal<?> signal) {
final var headers = signal.getDittoHeaders();
return Command.isCommand(signal) &&
Expand All @@ -227,17 +233,6 @@ private void handleInboundResponseOrAcknowledgement(final Signal<?> responseOrAc
if (ackForwarderOptional.isPresent()) {
final var acknowledgementForwarder = ackForwarderOptional.get();
acknowledgementForwarder.forward(responseOrAck, context);
acksCounter.countDown(acknowledgementForwarder);
if (acksCounter.shouldTerminateOutboundDispatchingActor()) {
logger.debug("Terminating after dispatching the last acknowledgement");
// wait for ack forwarder to process the final response or acknowledgement before stopping self
final Duration shutdownTimeout = Duration.ofMinutes(2);
final var ping = AcknowledgementForwarderActor.Control.PING;
final var poisonPillFuture =
Patterns.ask(acknowledgementForwarder, ping, shutdownTimeout)
.handle((result, error) -> PoisonPill.getInstance());
Patterns.pipe(poisonPillFuture, getContext().dispatcher()).to(getSelf());
}
} else {
final var forwarderActorClassName = AcknowledgementForwarderActor.class.getSimpleName();
final var template = "No {} found. Forwarding signal to proxy actor: <{}>";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ public final class AcknowledgementForwarderActor extends AbstractActor {
private final ActorSelection commandForwarder;
private final String correlationId;
@Nullable private final String ackgregatorAddressFallback;
@Nullable private final ActorRef notificationReceiver;
private final DittoDiagnosticLoggingAdapter log;

@SuppressWarnings("unused")
private AcknowledgementForwarderActor(final ActorSelection commandForwarder, final DittoHeaders dittoHeaders,
final Duration defaultTimeout) {
final Duration defaultTimeout, @Nullable final ActorRef notificationReceiver) {

this.commandForwarder = commandForwarder;
correlationId = dittoHeaders.getCorrelationId()
Expand All @@ -75,6 +76,7 @@ private AcknowledgementForwarderActor(final ActorSelection commandForwarder, fin
getSelf().path().name().replace(ACTOR_NAME_PREFIX, "")
);
ackgregatorAddressFallback = dittoHeaders.get(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey());
this.notificationReceiver = notificationReceiver;
log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

getContext().setReceiveTimeout(dittoHeaders.getTimeout().orElse(defaultTimeout));
Expand All @@ -86,19 +88,20 @@ private AcknowledgementForwarderActor(final ActorSelection commandForwarder, fin
* @param commandForwarder the actor ref of the edge command forwarder actor.
* @param dittoHeaders the DittoHeaders of the Signal which contained the request for Acknowledgements.
* @param defaultTimeout the default timeout to apply when {@code dittoHeaders} did not contain a specific timeout.
* @param notificationReceiver receiver of notifications when an acknowledgement is received.
* @return the Akka configuration Props object.
*/
static Props props(final ActorSelection commandForwarder, final DittoHeaders dittoHeaders,
final Duration defaultTimeout) {
return Props.create(AcknowledgementForwarderActor.class, commandForwarder, dittoHeaders, defaultTimeout);
final Duration defaultTimeout, @Nullable final ActorRef notificationReceiver) {
return Props.create(AcknowledgementForwarderActor.class, commandForwarder, dittoHeaders, defaultTimeout,
notificationReceiver);
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(CommandResponse.class, this::forwardCommandResponse)
.match(ReceiveTimeout.class, this::handleReceiveTimeout)
.matchEquals(Control.PING, ping -> getSender().tell(Control.PONG, getSelf()))
.matchAny(m -> log.warning("Received unexpected message: <{}>", m))
.build();
}
Expand All @@ -121,6 +124,7 @@ private void forwardCommandResponse(final WithDittoHeaders acknowledgementOrResp
acknowledgementOrResponse.getClass().getSimpleName(), dittoHeaders);
commandForwarder.tell(acknowledgementOrResponse, ActorRef.noSender());
}
notifyReceiver();
}

private void handleReceiveTimeout(final ReceiveTimeout receiveTimeout) {
Expand Down Expand Up @@ -187,7 +191,7 @@ public static Signal<?> startAcknowledgementForwarder(final ActorRefFactory acto
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {

return startAcknowledgementForwarderForSignal(actorRefFactory, parent, commandForwarder, entityId, signal,
acknowledgementConfig, isAckLabelAllowed).first();
acknowledgementConfig, isAckLabelAllowed, null).first();
}

/**
Expand All @@ -204,6 +208,7 @@ public static Signal<?> startAcknowledgementForwarder(final ActorRefFactory acto
* @param signal the signal for which acknowledgements are expected.
* @param acknowledgementConfig the AcknowledgementConfig to use for looking up config values.
* @param isAckLabelAllowed predicate for whether an ack label is allowed for publication at this channel.
* @param notificationReceiver receiver of notifications when acknowledgements are received.
* @return the signal for which a suitable ack forwarder has started whenever required, and the actor reference
* of the forwarder if it started.
* @throws NullPointerException if any argument is {@code null}.
Expand All @@ -215,23 +220,30 @@ public static Pair<Signal<?>, Optional<ActorRef>> startAcknowledgementForwarderF
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig,
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {
final Predicate<AcknowledgementLabel> isAckLabelAllowed,
@Nullable final ActorRef notificationReceiver) {
final AcknowledgementForwarderActorStarter starter =
AcknowledgementForwarderActorStarter.getInstance(actorRefFactory, parent, commandForwarder, entityId,
signal, acknowledgementConfig, isAckLabelAllowed);
final DittoHeadersBuilder<?, ?> dittoHeadersBuilder = signal.getDittoHeaders().toBuilder();
final var forwarderOptional = starter.getConflictFreeWithActorRef();
final var forwarderOptional = starter.getConflictFreeWithActorRef(notificationReceiver);
forwarderOptional.map(Pair::first).ifPresent(dittoHeadersBuilder::correlationId);
if (!signal.getDittoHeaders().getAcknowledgementRequests().isEmpty()) {
dittoHeadersBuilder.acknowledgementRequests(starter.getAllowedAckRequests());
}
return Pair.create(signal.setDittoHeaders(dittoHeadersBuilder.build()), forwarderOptional.map(Pair::second));
}

private void notifyReceiver() {
if (notificationReceiver != null) {
notificationReceiver.tell(Control.ACKNOWLEDGEMENT_FORWARDED, getSelf());
}
}

/**
* Ping command and response for the acknowledgement forwarder actor.
* Notification for when an acknowledgement is forwarded.
*/
public enum Control {
PING, PONG
ACKNOWLEDGEMENT_FORWARDED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
Expand Down Expand Up @@ -132,7 +134,7 @@ public Optional<ActorRef> get() {
ActorRef actorRef = null;
if (hasEffectiveAckRequests(signal, acknowledgementRequests)) {
try {
actorRef = startAckForwarderActor(dittoHeaders);
actorRef = startAckForwarderActor(dittoHeaders, null);
} catch (final InvalidActorNameException e) {
// In case that the actor with that name already existed, the correlation-id was already used recently:
declineAllNonDittoAckRequests(getDuplicateCorrelationIdException(e));
Expand All @@ -149,17 +151,18 @@ public Optional<ActorRef> get() {
* start because no acknowledgement was requested.
*/
public Optional<String> getConflictFree() {
return getConflictFreeWithActorRef().map(Pair::first);
return getConflictFreeWithActorRef(null).map(Pair::first);
}

/**
* Start an acknowledgement forwarder.
* Always succeeds.
*
* @param notificationReceiver receiver of notifications when an acknowledgement is received.
* @return the new correlation ID togeether with the started forwarder if an ack forwarder started, or an empty
* optional if the ack forwarder did not start because no acknowledgement was requested.
*/
public Optional<Pair<String, ActorRef>> getConflictFreeWithActorRef() {
public Optional<Pair<String, ActorRef>> getConflictFreeWithActorRef(@Nullable final ActorRef notificationReceiver) {
if (hasEffectiveAckRequests(signal, acknowledgementRequests)) {
final DittoHeadersBuilder<?, ?> builder = dittoHeaders.toBuilder()
.acknowledgementRequests(acknowledgementRequests);
Expand All @@ -170,7 +173,8 @@ public Optional<Pair<String, ActorRef>> getConflictFreeWithActorRef() {
while (true) {
try {
builder.correlationId(correlationId);
return Optional.of(Pair.create(correlationId, startAckForwarderActor(builder.build())));
return Optional.of(Pair.create(correlationId,
startAckForwarderActor(builder.build(), notificationReceiver)));
} catch (final InvalidActorNameException e) {
// generate a new ID
correlationId = joinPrefixAndCounter(prefix, ++counter);
Expand Down Expand Up @@ -206,9 +210,10 @@ private Pair<String, Integer> parseCorrelationId(final DittoHeaders dittoHeaders
return Pair.create(UUID.randomUUID().toString(), -1);
}

private ActorRef startAckForwarderActor(final DittoHeaders dittoHeaders) {
private ActorRef startAckForwarderActor(final DittoHeaders dittoHeaders,
@Nullable final ActorRef notificationReceiver) {
final Props props = AcknowledgementForwarderActor.props(commandForwarder, dittoHeaders,
acknowledgementConfig.getForwarderFallbackTimeout());
acknowledgementConfig.getForwarderFallbackTimeout(), notificationReceiver);
return actorRefFactory.actorOf(props, AcknowledgementForwarderActor.determineActorName(dittoHeaders));
}

Expand All @@ -234,7 +239,7 @@ private void declineAllNonDittoAckRequests(final DittoRuntimeException dittoRunt
} else {
LOGGER.withCorrelationId(headers)
.error("Received DittoRuntimeException <{}> did not contain header of acknowledgement aggregator " +
"address: {}", dittoRuntimeException.getClass().getSimpleName(), headers);
"address: {}", dittoRuntimeException.getClass().getSimpleName(), headers);
}
}

Expand Down

0 comments on commit 1612e74

Please sign in to comment.