Skip to content

Commit

Permalink
Avoid passing the context in AcknowledgementForwarderActorStarter
Browse files Browse the repository at this point in the history
* The sender could be some different when actually starting the forwarder
  actor. Therefore directly pass the sender at the time we know it

Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed Jan 11, 2022
1 parent 0f1aaf7 commit acc5075
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 32 deletions.
Expand Up @@ -152,6 +152,8 @@ private Signal<?> adjustSignalAndStartAckForwarder(final Signal<?> signal, final
if (hasSourceDeclaredAcks || liveCommandExpectingResponse) {
// start ackregator for source declared acks
return AcknowledgementForwarderActor.startAcknowledgementForwarder(getContext(),
self(),
sender(),
entityId,
signal,
settings.getAcknowledgementConfig(),
Expand Down
Expand Up @@ -325,7 +325,10 @@ private void invalidateLoggers(final ConnectionId connectionId) {
mapsKeysToDelete.forEach(loggerKey -> {
// flush logs before removing from loggers:
try {
LOGGERS.get(loggerKey).close();
final ConnectionLogger connectionLogger = LOGGERS.get(loggerKey);
if (connectionLogger != null) {
connectionLogger.close();
}
} catch (final IOException e) {
LOGGER.warn("Exception during closing logger <{}>: <{}>: {}", loggerKey, e.getClass().getSimpleName(),
e.getMessage());
Expand Down
Expand Up @@ -461,6 +461,8 @@ private Signal<?> startAckForwarder(final Signal<?> signal) {
if (entityIdOptional.isPresent()) {
final var entityIdWithType = entityIdOptional.get();
return AcknowledgementForwarderActor.startAcknowledgementForwarder(getContext(),
self(),
sender(),
entityIdWithType,
signal,
acknowledgementConfig,
Expand Down
Expand Up @@ -35,6 +35,7 @@

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;

Expand Down Expand Up @@ -127,13 +128,15 @@ public static String determineActorName(final DittoHeaders dittoHeaders) {
return ACTOR_NAME_PREFIX + URLEncoder.encode(correlationId, Charset.defaultCharset());
}

static Optional<ActorRef> startAcknowledgementForwarderForTest(final akka.actor.ActorContext context,
static Optional<ActorRef> startAcknowledgementForwarderForTest(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final ActorRef ackRequester,
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig) {

final AcknowledgementForwarderActorStarter starter =
AcknowledgementForwarderActorStarter.getInstance(context, entityId, signal, acknowledgementConfig,
final AcknowledgementForwarderActorStarter starter = AcknowledgementForwarderActorStarter
.getInstance(actorRefFactory, parent, ackRequester, entityId, signal, acknowledgementConfig,
label -> true);
return starter.get();
}
Expand All @@ -145,22 +148,26 @@ static Optional<ActorRef> startAcknowledgementForwarderForTest(final akka.actor.
* in case that an Actor with this name already exists, a new correlation ID is generated and the process repeated.
* If the signal does not require acknowledgements, no forwarder starts and the signal itself is returned.
*
* @param context the context ({@code getContext()} of the Actor to start the AcknowledgementForwarderActor in.
* @param actorRefFactory the factory to start the forwarder actor in.
* @param parent the parent of the forwarder actor.
* @param ackRequester the actor which should receive the forwarded acknowledgements.
* @param entityId the entityId of the {@code Signal} which requested the Acknowledgements.
* @param signal the signal for which acknowledgements are expected.
* @param acknowledgementConfig the AcknowledgementConfig to use for looking up config values.
* @param isAckLabelAllowed predicate for whether an ack label is allowed for publication at this channel.
* @return the signal for which a suitable ack forwarder has started whenever required.
* @throws NullPointerException if any argument is {@code null}.
*/
public static Signal<?> startAcknowledgementForwarder(final akka.actor.ActorContext context,
public static Signal<?> startAcknowledgementForwarder(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final ActorRef ackRequester,
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig,
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {
final AcknowledgementForwarderActorStarter starter =
AcknowledgementForwarderActorStarter.getInstance(context, entityId, signal, acknowledgementConfig,
isAckLabelAllowed);
AcknowledgementForwarderActorStarter.getInstance(actorRefFactory, parent, ackRequester, entityId,
signal, acknowledgementConfig, isAckLabelAllowed);
final DittoHeadersBuilder<?, ?> dittoHeadersBuilder = signal.getDittoHeaders().toBuilder();
starter.getConflictFree().ifPresent(dittoHeadersBuilder::correlationId);
if (!signal.getDittoHeaders().getAcknowledgementRequests().isEmpty()) {
Expand Down
Expand Up @@ -28,18 +28,18 @@
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.policies.model.signals.announcements.PolicyAnnouncement;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.AcknowledgementRequestDuplicateCorrelationIdException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.policies.model.signals.announcements.PolicyAnnouncement;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;

import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.InvalidActorNameException;
import akka.actor.Props;
import akka.japi.Pair;
Expand All @@ -54,20 +54,26 @@ final class AcknowledgementForwarderActorStarter implements Supplier<Optional<Ac

private static final String PREFIX_COUNTER_SEPARATOR = "#";

private final ActorContext actorContext;
private final ActorRefFactory actorRefFactory;
private final ActorRef parent;
private final ActorRef ackRequester;
private final EntityId entityId;
private final Signal<?> signal;
private final DittoHeaders dittoHeaders;
private final AcknowledgementConfig acknowledgementConfig;
private final Set<AcknowledgementRequest> acknowledgementRequests;

private AcknowledgementForwarderActorStarter(final ActorContext context,
private AcknowledgementForwarderActorStarter(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final ActorRef ackRequester,
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig,
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {

actorContext = checkNotNull(context, "context");
this.actorRefFactory = checkNotNull(actorRefFactory, "actorRefFactory");
this.parent = parent;
this.ackRequester = ackRequester;
this.entityId = checkNotNull(entityId, "entityId");
this.signal = checkNotNull(signal, "signal");
dittoHeaders = signal.getDittoHeaders();
Expand All @@ -81,8 +87,9 @@ private AcknowledgementForwarderActorStarter(final ActorContext context,
/**
* Returns an instance of {@code ActorStarter}.
*
* @param context the context to start the forwarder actor in. Furthermore provides the sender and self
* reference for forwarding.
* @param actorRefFactory the factory to start the forwarder actor in.
* @param parent the parent of the forwarder actor.
* @param ackRequester the actor which should receive the forwarded acknowledgements.
* @param entityId is used for the NACKs if the forwarder actor cannot be started.
* @param signal the signal for which the forwarder actor is to start.
* @param acknowledgementConfig provides configuration setting regarding acknowledgement handling.
Expand All @@ -91,13 +98,16 @@ private AcknowledgementForwarderActorStarter(final ActorContext context,
* @return a means to start an acknowledgement forwarder actor.
* @throws NullPointerException if any argument is {@code null}.
*/
static AcknowledgementForwarderActorStarter getInstance(final ActorContext context,
static AcknowledgementForwarderActorStarter getInstance(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final ActorRef ackRequester,
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig,
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {

return new AcknowledgementForwarderActorStarter(context, entityId, signal, acknowledgementConfig,
return new AcknowledgementForwarderActorStarter(actorRefFactory, parent, ackRequester, entityId, signal,
acknowledgementConfig,
// live-response is always allowed
isAckLabelAllowed.or(DittoAcknowledgementLabel.LIVE_RESPONSE::equals));
}
Expand Down Expand Up @@ -181,9 +191,9 @@ private Pair<String, Integer> parseCorrelationId(final DittoHeaders dittoHeaders
}

private ActorRef startAckForwarderActor(final DittoHeaders dittoHeaders) {
final Props props = AcknowledgementForwarderActor.props(actorContext.sender(), dittoHeaders,
final Props props = AcknowledgementForwarderActor.props(ackRequester, dittoHeaders,
acknowledgementConfig.getForwarderFallbackTimeout());
return actorContext.actorOf(props, AcknowledgementForwarderActor.determineActorName(dittoHeaders));
return actorRefFactory.actorOf(props, AcknowledgementForwarderActor.determineActorName(dittoHeaders));
}

private DittoRuntimeException getDuplicateCorrelationIdException(final Throwable cause) {
Expand All @@ -195,15 +205,12 @@ private DittoRuntimeException getDuplicateCorrelationIdException(final Throwable
}

private void declineAllNonDittoAckRequests(final DittoRuntimeException dittoRuntimeException) {
final ActorRef sender = actorContext.sender();
final ActorRef self = actorContext.self();

// answer NACKs for all AcknowledgementRequests with labels which were not Ditto-defined
acknowledgementRequests.stream()
.map(AcknowledgementRequest::getLabel)
.filter(Predicate.not(DittoAcknowledgementLabel::contains))
.map(label -> getNack(label, dittoRuntimeException))
.forEach(nack -> sender.tell(nack, self));
.forEach(nack -> ackRequester.tell(nack, parent));
}

private Acknowledgement getNack(final AcknowledgementLabel label,
Expand Down
Expand Up @@ -25,11 +25,11 @@
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.AcknowledgementRequestDuplicateCorrelationIdException;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.models.acks.config.DefaultAcknowledgementConfig;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.AcknowledgementRequestDuplicateCorrelationIdException;
import org.eclipse.ditto.things.model.signals.events.ThingDeleted;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -93,7 +93,6 @@ public void setUp() {
when(actorContext.actorOf(any(Props.class), anyString()))
.thenAnswer((Answer<ActorRef>) invocationOnMock -> actorSystem.actorOf(invocationOnMock.getArgument(0),
invocationOnMock.getArgument(1)));
when(actorContext.sender()).thenReturn(testProbe.ref());
}

@Test
Expand Down Expand Up @@ -155,7 +154,9 @@ public void startForwarderActorWithDuplicateCorrelationId() {
}

private AcknowledgementForwarderActorStarter getActorStarter(final DittoHeaders dittoHeaders) {
return AcknowledgementForwarderActorStarter.getInstance(actorContext, KNOWN_ENTITY_ID,
return AcknowledgementForwarderActorStarter.getInstance(actorContext, TestProbe.apply(actorSystem).ref(),
testProbe.ref(),
KNOWN_ENTITY_ID,
ThingDeleted.of(KNOWN_ENTITY_ID, 1L, Instant.EPOCH, dittoHeaders, null),
acknowledgementConfig, label -> true);
}
Expand Down
Expand Up @@ -53,6 +53,7 @@
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;

/**
Expand Down Expand Up @@ -130,10 +131,10 @@ public void createAcknowledgementForwarderAndThreadAcknowledgementThrough()
Acknowledgement.of(acknowledgementLabel, entityId, HttpStatus.ACCEPTED, dittoHeaders);

new TestKit(actorSystem) {{
when(actorContext.sender()).thenReturn(getRef());

final Optional<ActorRef> underTest =
AcknowledgementForwarderActor.startAcknowledgementForwarderForTest(actorContext, entityId, signal,
AcknowledgementForwarderActor.startAcknowledgementForwarderForTest(actorContext,
TestProbe.apply(actorSystem).ref(), getRef(), entityId, signal,
acknowledgementConfig);

softly.assertThat(underTest).isPresent();
Expand Down

0 comments on commit acc5075

Please sign in to comment.