Skip to content

Commit

Permalink
Revert "Make acknowledgement forwarder actor notify outbound dispatch…
Browse files Browse the repository at this point in the history
…ing actor when acknowledgements are forwarded."

This reverts commit 1612e74.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 20, 2022
1 parent 1612e74 commit 385825d
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 44 deletions.
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.connectivity.service.messaging;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -43,9 +44,11 @@
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.CoordinatedShutdown;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;

/**
* This Actor makes the decision whether to dispatch outbound signals and their acknowledgements or to drop them.
Expand Down Expand Up @@ -87,7 +90,6 @@ public Receive createReceive() {
.match(DittoRuntimeException.class, this::forwardWithoutCheck)
.match(Signal.class, this::handleSignal)
.match(Terminated.class, this::actorTerminated)
.matchEquals(AcknowledgementForwarderActor.Control.ACKNOWLEDGEMENT_FORWARDED, this::ackForwarded)
.matchEquals(Control.SHUTDOWN, this::gracefulShutdown)
.matchAny(message -> logger.warning("Unknown message: <{}>", message))
.build();
Expand Down Expand Up @@ -179,8 +181,8 @@ private Signal<?> adjustSignalAndStartAckForwarder(final Signal<?> signal, final
entityId,
signal,
settings.getAcknowledgementConfig(),
this::isSourceDeclaredOrTargetIssuedAck,
getSelf());
this::isSourceDeclaredOrTargetIssuedAck
);
acksCounter.register(signalWithAckForwarder, getContext());
return signalWithAckForwarder.first();
} else {
Expand All @@ -201,14 +203,6 @@ private void actorTerminated(final Terminated event) {
}
}

private void ackForwarded(final AcknowledgementForwarderActor.Control trigger) {
acksCounter.countDown(getSender());
if (acksCounter.shouldTerminateOutboundDispatchingActor()) {
logger.debug("All tasks done, shutting down");
getContext().stop(getSelf());
}
}

private static boolean isLiveCommandExpectingResponse(final Signal<?> signal) {
final var headers = signal.getDittoHeaders();
return Command.isCommand(signal) &&
Expand All @@ -233,6 +227,17 @@ private void handleInboundResponseOrAcknowledgement(final Signal<?> responseOrAc
if (ackForwarderOptional.isPresent()) {
final var acknowledgementForwarder = ackForwarderOptional.get();
acknowledgementForwarder.forward(responseOrAck, context);
acksCounter.countDown(acknowledgementForwarder);
if (acksCounter.shouldTerminateOutboundDispatchingActor()) {
logger.debug("Terminating after dispatching the last acknowledgement");
// wait for ack forwarder to process the final response or acknowledgement before stopping self
final Duration shutdownTimeout = Duration.ofMinutes(2);
final var ping = AcknowledgementForwarderActor.Control.PING;
final var poisonPillFuture =
Patterns.ask(acknowledgementForwarder, ping, shutdownTimeout)
.handle((result, error) -> PoisonPill.getInstance());
Patterns.pipe(poisonPillFuture, getContext().dispatcher()).to(getSelf());
}
} else {
final var forwarderActorClassName = AcknowledgementForwarderActor.class.getSimpleName();
final var template = "No {} found. Forwarding signal to proxy actor: <{}>";
Expand Down
Expand Up @@ -62,12 +62,11 @@ public final class AcknowledgementForwarderActor extends AbstractActor {
private final ActorSelection commandForwarder;
private final String correlationId;
@Nullable private final String ackgregatorAddressFallback;
@Nullable private final ActorRef notificationReceiver;
private final DittoDiagnosticLoggingAdapter log;

@SuppressWarnings("unused")
private AcknowledgementForwarderActor(final ActorSelection commandForwarder, final DittoHeaders dittoHeaders,
final Duration defaultTimeout, @Nullable final ActorRef notificationReceiver) {
final Duration defaultTimeout) {

this.commandForwarder = commandForwarder;
correlationId = dittoHeaders.getCorrelationId()
Expand All @@ -76,7 +75,6 @@ private AcknowledgementForwarderActor(final ActorSelection commandForwarder, fin
getSelf().path().name().replace(ACTOR_NAME_PREFIX, "")
);
ackgregatorAddressFallback = dittoHeaders.get(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey());
this.notificationReceiver = notificationReceiver;
log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

getContext().setReceiveTimeout(dittoHeaders.getTimeout().orElse(defaultTimeout));
Expand All @@ -88,20 +86,19 @@ private AcknowledgementForwarderActor(final ActorSelection commandForwarder, fin
* @param commandForwarder the actor ref of the edge command forwarder actor.
* @param dittoHeaders the DittoHeaders of the Signal which contained the request for Acknowledgements.
* @param defaultTimeout the default timeout to apply when {@code dittoHeaders} did not contain a specific timeout.
* @param notificationReceiver receiver of notifications when an acknowledgement is received.
* @return the Akka configuration Props object.
*/
static Props props(final ActorSelection commandForwarder, final DittoHeaders dittoHeaders,
final Duration defaultTimeout, @Nullable final ActorRef notificationReceiver) {
return Props.create(AcknowledgementForwarderActor.class, commandForwarder, dittoHeaders, defaultTimeout,
notificationReceiver);
final Duration defaultTimeout) {
return Props.create(AcknowledgementForwarderActor.class, commandForwarder, dittoHeaders, defaultTimeout);
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(CommandResponse.class, this::forwardCommandResponse)
.match(ReceiveTimeout.class, this::handleReceiveTimeout)
.matchEquals(Control.PING, ping -> getSender().tell(Control.PONG, getSelf()))
.matchAny(m -> log.warning("Received unexpected message: <{}>", m))
.build();
}
Expand All @@ -124,7 +121,6 @@ private void forwardCommandResponse(final WithDittoHeaders acknowledgementOrResp
acknowledgementOrResponse.getClass().getSimpleName(), dittoHeaders);
commandForwarder.tell(acknowledgementOrResponse, ActorRef.noSender());
}
notifyReceiver();
}

private void handleReceiveTimeout(final ReceiveTimeout receiveTimeout) {
Expand Down Expand Up @@ -191,7 +187,7 @@ public static Signal<?> startAcknowledgementForwarder(final ActorRefFactory acto
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {

return startAcknowledgementForwarderForSignal(actorRefFactory, parent, commandForwarder, entityId, signal,
acknowledgementConfig, isAckLabelAllowed, null).first();
acknowledgementConfig, isAckLabelAllowed).first();
}

/**
Expand All @@ -208,7 +204,6 @@ public static Signal<?> startAcknowledgementForwarder(final ActorRefFactory acto
* @param signal the signal for which acknowledgements are expected.
* @param acknowledgementConfig the AcknowledgementConfig to use for looking up config values.
* @param isAckLabelAllowed predicate for whether an ack label is allowed for publication at this channel.
* @param notificationReceiver receiver of notifications when acknowledgements are received.
* @return the signal for which a suitable ack forwarder has started whenever required, and the actor reference
* of the forwarder if it started.
* @throws NullPointerException if any argument is {@code null}.
Expand All @@ -220,30 +215,23 @@ public static Pair<Signal<?>, Optional<ActorRef>> startAcknowledgementForwarderF
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig,
final Predicate<AcknowledgementLabel> isAckLabelAllowed,
@Nullable final ActorRef notificationReceiver) {
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {
final AcknowledgementForwarderActorStarter starter =
AcknowledgementForwarderActorStarter.getInstance(actorRefFactory, parent, commandForwarder, entityId,
signal, acknowledgementConfig, isAckLabelAllowed);
final DittoHeadersBuilder<?, ?> dittoHeadersBuilder = signal.getDittoHeaders().toBuilder();
final var forwarderOptional = starter.getConflictFreeWithActorRef(notificationReceiver);
final var forwarderOptional = starter.getConflictFreeWithActorRef();
forwarderOptional.map(Pair::first).ifPresent(dittoHeadersBuilder::correlationId);
if (!signal.getDittoHeaders().getAcknowledgementRequests().isEmpty()) {
dittoHeadersBuilder.acknowledgementRequests(starter.getAllowedAckRequests());
}
return Pair.create(signal.setDittoHeaders(dittoHeadersBuilder.build()), forwarderOptional.map(Pair::second));
}

private void notifyReceiver() {
if (notificationReceiver != null) {
notificationReceiver.tell(Control.ACKNOWLEDGEMENT_FORWARDED, getSelf());
}
}

/**
* Notification for when an acknowledgement is forwarded.
* Ping command and response for the acknowledgement forwarder actor.
*/
public enum Control {
ACKNOWLEDGEMENT_FORWARDED
PING, PONG
}
}
Expand Up @@ -21,8 +21,6 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
Expand Down Expand Up @@ -134,7 +132,7 @@ public Optional<ActorRef> get() {
ActorRef actorRef = null;
if (hasEffectiveAckRequests(signal, acknowledgementRequests)) {
try {
actorRef = startAckForwarderActor(dittoHeaders, null);
actorRef = startAckForwarderActor(dittoHeaders);
} catch (final InvalidActorNameException e) {
// In case that the actor with that name already existed, the correlation-id was already used recently:
declineAllNonDittoAckRequests(getDuplicateCorrelationIdException(e));
Expand All @@ -151,18 +149,17 @@ public Optional<ActorRef> get() {
* start because no acknowledgement was requested.
*/
public Optional<String> getConflictFree() {
return getConflictFreeWithActorRef(null).map(Pair::first);
return getConflictFreeWithActorRef().map(Pair::first);
}

/**
* Start an acknowledgement forwarder.
* Always succeeds.
*
* @param notificationReceiver receiver of notifications when an acknowledgement is received.
* @return the new correlation ID togeether with the started forwarder if an ack forwarder started, or an empty
* optional if the ack forwarder did not start because no acknowledgement was requested.
*/
public Optional<Pair<String, ActorRef>> getConflictFreeWithActorRef(@Nullable final ActorRef notificationReceiver) {
public Optional<Pair<String, ActorRef>> getConflictFreeWithActorRef() {
if (hasEffectiveAckRequests(signal, acknowledgementRequests)) {
final DittoHeadersBuilder<?, ?> builder = dittoHeaders.toBuilder()
.acknowledgementRequests(acknowledgementRequests);
Expand All @@ -173,8 +170,7 @@ public Optional<Pair<String, ActorRef>> getConflictFreeWithActorRef(@Nullable fi
while (true) {
try {
builder.correlationId(correlationId);
return Optional.of(Pair.create(correlationId,
startAckForwarderActor(builder.build(), notificationReceiver)));
return Optional.of(Pair.create(correlationId, startAckForwarderActor(builder.build())));
} catch (final InvalidActorNameException e) {
// generate a new ID
correlationId = joinPrefixAndCounter(prefix, ++counter);
Expand Down Expand Up @@ -210,10 +206,9 @@ private Pair<String, Integer> parseCorrelationId(final DittoHeaders dittoHeaders
return Pair.create(UUID.randomUUID().toString(), -1);
}

private ActorRef startAckForwarderActor(final DittoHeaders dittoHeaders,
@Nullable final ActorRef notificationReceiver) {
private ActorRef startAckForwarderActor(final DittoHeaders dittoHeaders) {
final Props props = AcknowledgementForwarderActor.props(commandForwarder, dittoHeaders,
acknowledgementConfig.getForwarderFallbackTimeout(), notificationReceiver);
acknowledgementConfig.getForwarderFallbackTimeout());
return actorRefFactory.actorOf(props, AcknowledgementForwarderActor.determineActorName(dittoHeaders));
}

Expand All @@ -239,7 +234,7 @@ private void declineAllNonDittoAckRequests(final DittoRuntimeException dittoRunt
} else {
LOGGER.withCorrelationId(headers)
.error("Received DittoRuntimeException <{}> did not contain header of acknowledgement aggregator " +
"address: {}", dittoRuntimeException.getClass().getSimpleName(), headers);
"address: {}", dittoRuntimeException.getClass().getSimpleName(), headers);
}
}

Expand Down

0 comments on commit 385825d

Please sign in to comment.