Skip to content

Commit

Permalink
removed ackReceiverActor from AcknowledgementForwarderActor
Browse files Browse the repository at this point in the history
* also use DittoHeader with "ditto-ackregator-address" for determining where to send Acks to in AcknowledgementForwarderActor
* fix policy registry tests as "messages-model" is no dependency any longer

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jun 15, 2022
1 parent 8931ef4 commit 6915eef
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 91 deletions.
Expand Up @@ -152,7 +152,6 @@ private Signal<?> adjustSignalAndStartAckForwarder(final Signal<?> signal, final
// start ackregator for source declared acks
return AcknowledgementForwarderActor.startAcknowledgementForwarder(getContext(),
self(),
sender(),
entityId,
signal,
settings.getAcknowledgementConfig(),
Expand Down
Expand Up @@ -475,7 +475,6 @@ private Signal<?> startAckForwarder(final Signal<?> signal) {
final var entityIdWithType = entityIdOptional.get();
return AcknowledgementForwarderActor.startAcknowledgementForwarder(getContext(),
self(),
sender(),
entityIdWithType,
signal,
acknowledgementConfig,
Expand Down
Expand Up @@ -22,6 +22,7 @@

import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
Expand All @@ -36,6 +37,7 @@
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSelection;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;

Expand All @@ -54,15 +56,12 @@ public final class AcknowledgementForwarderActor extends AbstractActor {
*/
static final String ACTOR_NAME_PREFIX = "ackForwarder-";

private final ActorRef acknowledgementRequester;
private final String correlationId;
private final DittoDiagnosticLoggingAdapter log;

@SuppressWarnings("unused")
private AcknowledgementForwarderActor(final ActorRef acknowledgementRequester, final DittoHeaders dittoHeaders,
final Duration defaultTimeout) {
private AcknowledgementForwarderActor(final DittoHeaders dittoHeaders, final Duration defaultTimeout) {

this.acknowledgementRequester = acknowledgementRequester;
correlationId = dittoHeaders.getCorrelationId()
.orElseGet(() ->
// fall back using the actor name which also contains the correlation-id
Expand All @@ -76,16 +75,12 @@ private AcknowledgementForwarderActor(final ActorRef acknowledgementRequester, f
/**
* Creates Akka configuration object Props for this AcknowledgementForwarderActor.
*
* @param acknowledgementRequester the ActorRef of the original sender who requested the Acknowledgements.
* @param dittoHeaders the DittoHeaders of the Signal which contained the request for Acknowledgements.
* @param defaultTimeout the default timeout to apply when {@code dittoHeaders} did not contain a specific timeout.
* @return the Akka configuration Props object.
*/
static Props props(final ActorRef acknowledgementRequester, final DittoHeaders dittoHeaders,
final Duration defaultTimeout) {

return Props.create(AcknowledgementForwarderActor.class, acknowledgementRequester, dittoHeaders,
defaultTimeout);
static Props props(final DittoHeaders dittoHeaders, final Duration defaultTimeout) {
return Props.create(AcknowledgementForwarderActor.class, dittoHeaders, defaultTimeout);
}

@Override
Expand All @@ -98,10 +93,20 @@ public Receive createReceive() {
}

private void forwardCommandResponse(final WithDittoHeaders acknowledgementOrResponse) {
log.withCorrelationId(acknowledgementOrResponse)
.debug("Received Acknowledgement / live CommandResponse, forwarding to original requester <{}>: " +
"<{}>", acknowledgementRequester, acknowledgementOrResponse);
acknowledgementRequester.tell(acknowledgementOrResponse, getSender());
final DittoHeaders dittoHeaders = acknowledgementOrResponse.getDittoHeaders();
final String ackregatorAddress = dittoHeaders.get(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey());
if (null != ackregatorAddress) {
final ActorSelection acknowledgementRequester = getContext().actorSelection(ackregatorAddress);
log.withCorrelationId(acknowledgementOrResponse)
.debug("Received Acknowledgement / live CommandResponse, forwarding to original requester <{}>: " +
"<{}>", acknowledgementRequester, acknowledgementOrResponse);
acknowledgementRequester.tell(acknowledgementOrResponse, getSender());
} else {
log.withCorrelationId(acknowledgementOrResponse)
.error("Received Acknowledgement / live CommandResponse <{}> did not contain header of " +
"Ackgregator address: {}", acknowledgementOrResponse.getClass().getSimpleName(),
dittoHeaders);
}
}

private void handleReceiveTimeout(final ReceiveTimeout receiveTimeout) {
Expand Down Expand Up @@ -131,13 +136,12 @@ public static String determineActorName(final DittoHeaders dittoHeaders) {

static Optional<ActorRef> startAcknowledgementForwarderForTest(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final ActorRef ackRequester,
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig) {

final AcknowledgementForwarderActorStarter starter = AcknowledgementForwarderActorStarter
.getInstance(actorRefFactory, parent, ackRequester, entityId, signal, acknowledgementConfig,
.getInstance(actorRefFactory, parent, entityId, signal, acknowledgementConfig,
label -> true);
return starter.get();
}
Expand All @@ -151,7 +155,6 @@ static Optional<ActorRef> startAcknowledgementForwarderForTest(final ActorRefFac
*
* @param actorRefFactory the factory to start the forwarder actor in.
* @param parent the parent of the forwarder actor.
* @param ackRequester the actor which should receive the forwarded acknowledgements.
* @param entityId the entityId of the {@code Signal} which requested the Acknowledgements.
* @param signal the signal for which acknowledgements are expected.
* @param acknowledgementConfig the AcknowledgementConfig to use for looking up config values.
Expand All @@ -161,13 +164,12 @@ static Optional<ActorRef> startAcknowledgementForwarderForTest(final ActorRefFac
*/
public static Signal<?> startAcknowledgementForwarder(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final ActorRef ackRequester, // TODO TJ the ackRequester can probably be removed and instead the ackRequster is determined by the Event's DittoHeader
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig,
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {
final AcknowledgementForwarderActorStarter starter =
AcknowledgementForwarderActorStarter.getInstance(actorRefFactory, parent, ackRequester, entityId,
AcknowledgementForwarderActorStarter.getInstance(actorRefFactory, parent, entityId,
signal, acknowledgementConfig, isAckLabelAllowed);
final DittoHeadersBuilder<?, ?> dittoHeadersBuilder = signal.getDittoHeaders().toBuilder();
starter.getConflictFree().ifPresent(dittoHeadersBuilder::correlationId);
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.signals.Signal;
Expand All @@ -35,9 +36,12 @@
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;

import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSelection;
import akka.actor.InvalidActorNameException;
import akka.actor.Props;
import akka.japi.Pair;
Expand All @@ -50,13 +54,14 @@
*/
final class AcknowledgementForwarderActorStarter implements Supplier<Optional<ActorRef>> {

private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(AcknowledgementForwarderActorStarter.class);

private static final String PREFIX_COUNTER_SEPARATOR = "#";

private static final String LIVE_CHANNEL = "live";

private final ActorRefFactory actorRefFactory;
private final ActorRef parent;
private final ActorRef ackRequester;
private final EntityId entityId;
private final Signal<?> signal;
private final DittoHeaders dittoHeaders;
Expand All @@ -65,15 +70,13 @@ final class AcknowledgementForwarderActorStarter implements Supplier<Optional<Ac

private AcknowledgementForwarderActorStarter(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final ActorRef ackRequester,
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig,
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {

this.actorRefFactory = checkNotNull(actorRefFactory, "actorRefFactory");
this.parent = parent;
this.ackRequester = ackRequester;
this.entityId = checkNotNull(entityId, "entityId");
this.signal = checkNotNull(signal, "signal");
dittoHeaders = signal.getDittoHeaders();
Expand All @@ -89,7 +92,6 @@ private AcknowledgementForwarderActorStarter(final ActorRefFactory actorRefFacto
*
* @param actorRefFactory the factory to start the forwarder actor in.
* @param parent the parent of the forwarder actor.
* @param ackRequester the actor which should receive the forwarded acknowledgements.
* @param entityId is used for the NACKs if the forwarder actor cannot be started.
* @param signal the signal for which the forwarder actor is to start.
* @param acknowledgementConfig provides configuration setting regarding acknowledgement handling.
Expand All @@ -100,13 +102,12 @@ private AcknowledgementForwarderActorStarter(final ActorRefFactory actorRefFacto
*/
static AcknowledgementForwarderActorStarter getInstance(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final ActorRef ackRequester, // TODO TJ the ackRequester can probably be removed
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig,
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {

return new AcknowledgementForwarderActorStarter(actorRefFactory, parent, ackRequester, entityId, signal,
return new AcknowledgementForwarderActorStarter(actorRefFactory, parent, entityId, signal,
acknowledgementConfig,
// live-response is always allowed
isAckLabelAllowed.or(DittoAcknowledgementLabel.LIVE_RESPONSE::equals));
Expand Down Expand Up @@ -191,7 +192,7 @@ private Pair<String, Integer> parseCorrelationId(final DittoHeaders dittoHeaders
}

private ActorRef startAckForwarderActor(final DittoHeaders dittoHeaders) {
final Props props = AcknowledgementForwarderActor.props(ackRequester, dittoHeaders,
final Props props = AcknowledgementForwarderActor.props(dittoHeaders,
acknowledgementConfig.getForwarderFallbackTimeout());
return actorRefFactory.actorOf(props, AcknowledgementForwarderActor.determineActorName(dittoHeaders));
}
Expand All @@ -205,12 +206,21 @@ private DittoRuntimeException getDuplicateCorrelationIdException(final Throwable
}

private void declineAllNonDittoAckRequests(final DittoRuntimeException dittoRuntimeException) {
// answer NACKs for all AcknowledgementRequests with labels which were not Ditto-defined
acknowledgementRequests.stream()
.map(AcknowledgementRequest::getLabel)
.filter(Predicate.not(DittoAcknowledgementLabel::contains))
.map(label -> getNack(label, dittoRuntimeException))
.forEach(nack -> ackRequester.tell(nack, parent));
final DittoHeaders headers = dittoRuntimeException.getDittoHeaders();
final String ackregatorAddress = headers.get(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey());
if (null != ackregatorAddress) {
final ActorSelection acknowledgementRequester = actorRefFactory.actorSelection(ackregatorAddress);
// answer NACKs for all AcknowledgementRequests with labels which were not Ditto-defined
acknowledgementRequests.stream()
.map(AcknowledgementRequest::getLabel)
.filter(Predicate.not(DittoAcknowledgementLabel::contains))
.map(label -> getNack(label, dittoRuntimeException))
.forEach(nack -> acknowledgementRequester.tell(nack, parent));
} else {
LOGGER.withCorrelationId(headers)
.error("Received DittoRuntimeException <{}> did not contain header of Ackgregator address: {}",
dittoRuntimeException.getClass().getSimpleName(), headers);
}
}

private Acknowledgement getNack(final AcknowledgementLabel label,
Expand Down
Expand Up @@ -12,10 +12,6 @@
*/
package org.eclipse.ditto.internal.models.acks;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;

import java.time.Instant;
import java.util.concurrent.TimeUnit;

Expand All @@ -24,6 +20,7 @@
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.AcknowledgementRequestDuplicateCorrelationIdException;
Expand All @@ -38,17 +35,11 @@
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

import com.typesafe.config.ConfigFactory;

import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.TestProbe;
import akka.testkit.javadsl.TestKit;
import scala.concurrent.duration.FiniteDuration;
Expand All @@ -70,9 +61,6 @@ public final class AcknowledgementForwarderActorStarterTest {
@Rule
public final JUnitSoftAssertions softly = new JUnitSoftAssertions();

@Mock
private ActorContext actorContext;

private TestProbe testProbe;

@BeforeClass
Expand All @@ -89,10 +77,6 @@ public static void tearDown() {
@Before
public void setUp() {
testProbe = new TestProbe(actorSystem);

when(actorContext.actorOf(any(Props.class), anyString()))
.thenAnswer((Answer<ActorRef>) invocationOnMock -> actorSystem.actorOf(invocationOnMock.getArgument(0),
invocationOnMock.getArgument(1)));
}

@Test
Expand All @@ -102,7 +86,6 @@ public void getEmptyOptionalIfNoAcknowledgementsRequested() {
final AcknowledgementForwarderActorStarter underTest = getActorStarter(dittoHeaders);

softly.assertThat(underTest.get()).isNotPresent();
Mockito.verifyNoInteractions(actorContext);
}

@Test
Expand All @@ -126,6 +109,7 @@ public void startForwarderActorWithDuplicateCorrelationId() {
.correlationId(correlationId)
.acknowledgementRequest(AcknowledgementRequest.of(DittoAcknowledgementLabel.TWIN_PERSISTED),
AcknowledgementRequest.of(customAckLabel))
.putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), testProbe.ref().path().toSerializationFormat())
.build();
final AcknowledgementRequestDuplicateCorrelationIdException expectedException =
AcknowledgementRequestDuplicateCorrelationIdException.newBuilder(correlationId)
Expand Down Expand Up @@ -154,8 +138,7 @@ public void startForwarderActorWithDuplicateCorrelationId() {
}

private AcknowledgementForwarderActorStarter getActorStarter(final DittoHeaders dittoHeaders) {
return AcknowledgementForwarderActorStarter.getInstance(actorContext, TestProbe.apply(actorSystem).ref(),
testProbe.ref(),
return AcknowledgementForwarderActorStarter.getInstance(actorSystem, TestProbe.apply(actorSystem).ref(),
KNOWN_ENTITY_ID,
ThingDeleted.of(KNOWN_ENTITY_ID, 1L, Instant.EPOCH, dittoHeaders, null),
acknowledgementConfig, label -> true);
Expand Down

0 comments on commit 6915eef

Please sign in to comment.