Skip to content

Commit

Permalink
handle ErrorResponses correctly in AckForwarder
Browse files Browse the repository at this point in the history
* don't invalidate correaltion-id of "live" responses to live commands immediately in ResponseReceiverCache, but only after their initial timeout in order to support handling one potentially false response followed by a correct one

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jul 14, 2022
1 parent 5f49779 commit 4896fe0
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 25 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.AcknowledgementCorrelationIdMissingException;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.ErrorResponse;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
Expand Down Expand Up @@ -105,12 +106,14 @@ private void forwardCommandResponse(final WithDittoHeaders acknowledgementOrResp
final DittoHeaders dittoHeaders = acknowledgementOrResponse.getDittoHeaders();
final String ackregatorAddress = dittoHeaders.getOrDefault(
DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(), ackgregatorAddressFallback);
if (null != ackregatorAddress && acknowledgementOrResponse instanceof Acknowledgement acknowledgement) {
if (null != ackregatorAddress &&
(acknowledgementOrResponse instanceof Acknowledgement ||
acknowledgementOrResponse instanceof ErrorResponse)) {
final ActorSelection acknowledgementRequester = getContext().actorSelection(ackregatorAddress);
log.withCorrelationId(acknowledgement)
.debug("Received Acknowledgement / live CommandResponse, forwarding to acknowledgement " +
"aggregator <{}>: " + "<{}>", acknowledgementRequester, acknowledgement);
acknowledgementRequester.tell(acknowledgement, getSender());
log.withCorrelationId(acknowledgementOrResponse)
.debug("Received Acknowledgement / ErrorResponse, forwarding to acknowledgement " +
"aggregator <{}>: " + "<{}>", acknowledgementRequester, acknowledgementOrResponse);
acknowledgementRequester.tell(acknowledgementOrResponse, getSender());
} else {
log.withCorrelationId(acknowledgementOrResponse)
.debug("Received live CommandResponse <{}>, forwarding to command forwarder: {}",
Expand Down
Expand Up @@ -64,11 +64,14 @@ public final class ResponseReceiverCache implements Extension {
private static final ExtensionId EXTENSION_ID = new ExtensionId();
private static final Duration DEFAULT_ENTRY_EXPIRY = Duration.ofMinutes(2L);

private final ActorSystem actorSystem;
private final Duration fallBackEntryExpiry;
private final Cache<CorrelationIdKey, ResponseReceiverCacheEntry> cache;

private ResponseReceiverCache(final Duration fallBackEntryExpiry,
private ResponseReceiverCache(final ActorSystem actorSystem,
final Duration fallBackEntryExpiry,
final Cache<CorrelationIdKey, ResponseReceiverCacheEntry> cache) {
this.actorSystem = actorSystem;
this.fallBackEntryExpiry = fallBackEntryExpiry;
this.cache = cache;
}
Expand All @@ -87,26 +90,28 @@ public static ResponseReceiverCache lookup(final ActorSystem actorSystem) {
/**
* Returns a new instance of {@code ResponseReceiverCache} with a hard-coded fall-back entry expiry.
*
* @param actorSystem the ActorSystem.
* @return the instance.
*/
static ResponseReceiverCache newInstance() {
return newInstance(DEFAULT_ENTRY_EXPIRY);
static ResponseReceiverCache newInstance(final ActorSystem actorSystem) {
return newInstance(actorSystem, DEFAULT_ENTRY_EXPIRY);
}

/**
* Returns a new instance of {@code ResponseReceiverCache} with the specified fall-back entry expiry.
*
* @param actorSystem the ActorSystem.
* @param fallBackEntryExpiry the expiry to be used for cache entries of commands without a timeout.
* @return the instance.
* @throws NullPointerException if {@code fallBackEntryExpiry} is {@code null}.
* @throws IllegalArgumentException if {@code fallBackEntryExpiry} is not positive.
*/
static ResponseReceiverCache newInstance(final Duration fallBackEntryExpiry) {
static ResponseReceiverCache newInstance(final ActorSystem actorSystem, final Duration fallBackEntryExpiry) {
ConditionChecker.checkArgument(checkNotNull(fallBackEntryExpiry, "fallBackEntryExpiry"),
Predicate.not(Duration::isZero).and(Predicate.not(Duration::isNegative)),
() -> "The fallBackEntryExpiry must be positive.");

return new ResponseReceiverCache(fallBackEntryExpiry, createCache(fallBackEntryExpiry));
return new ResponseReceiverCache(actorSystem, fallBackEntryExpiry, createCache(fallBackEntryExpiry));
}

private static Cache<CorrelationIdKey, ResponseReceiverCacheEntry> createCache(final Duration fallBackEntryExpiry) {
Expand Down Expand Up @@ -169,7 +174,7 @@ public CompletableFuture<Optional<ResponseReceiverCacheEntry>> get(final CharSeq
* @throws NullPointerException if {@code correlationId} is {@code null}.
* @throws IllegalArgumentException if {@code correlationId} is empty or blank.
*/
public void invalidate(final CharSequence correlationId) {
private void invalidate(final CharSequence correlationId) {
final var correlationIdString = checkNotNull(correlationId, "correlationId").toString();
ConditionChecker.checkArgument(correlationIdString,
Predicate.not(String::isBlank),
Expand All @@ -191,6 +196,13 @@ public <S extends Signal<?>, T> CompletionStage<T> insertResponseReceiverConflic
final Function<S, ActorRef> receiverCreator,
final BiFunction<S, ActorRef, T> responseHandler) {

signal.getDittoHeaders().getCorrelationId().ifPresent(correlationId ->
actorSystem.getScheduler().scheduleOnce(
signal.getDittoHeaders().getTimeout().orElse(fallBackEntryExpiry),
() -> invalidate(correlationId),
actorSystem.dispatcher()
)
);
return insertResponseReceiverConflictFreeWithFuture(signal,
receiverCreator,
responseHandler.andThen(CompletableFuture::completedStage));
Expand Down Expand Up @@ -301,7 +313,7 @@ static final class ExtensionId extends AbstractExtensionId<ResponseReceiverCache

@Override
public ResponseReceiverCache createExtension(final ExtendedActorSystem system) {
return newInstance();
return newInstance(system);
}

}
Expand Down
Expand Up @@ -248,7 +248,6 @@ private CompletionStage<TargetActorWithMessage> returnCommandResponseContext(
Duration.ZERO, // ZERO duration means that no "ask" is used, but "tell" - not expecting an answer
Function.identity()
);
responseReceiverCache.invalidate(correlationId);
} else {
log.withCorrelationId(liveResponse)
.warning("Got <{}> with unknown correlation ID: <{}>", liveResponse.getType(),
Expand Down
Expand Up @@ -34,11 +34,16 @@
import org.eclipse.ditto.base.model.correlationid.TestNameCorrelationId;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;

/**
* Unit test for {@link ResponseReceiverCache}.
Expand All @@ -47,39 +52,48 @@ public final class ResponseReceiverCacheTest {

public static final AuthorizationContext AUTHORIZATION_CONTEXT = AuthorizationContext.newInstance(
DittoAuthorizationContextType.UNSPECIFIED, AuthorizationSubject.newInstance("foo:bar"));

private static ActorSystem actorSystem;

@Rule
public final TestNameCorrelationId testNameCorrelationId = TestNameCorrelationId.newInstance();

@Rule
public final JUnitSoftAssertions softly = new JUnitSoftAssertions();

@BeforeClass
public static void setupTest() {
final Config config = ConfigFactory.load("test");
actorSystem = ActorSystem.create("AkkaTestSystem", config);
}

@Test
public void newInstanceWithNullDurationThrowsException() {
assertThatNullPointerException()
.isThrownBy(() -> ResponseReceiverCache.newInstance(null))
.isThrownBy(() -> ResponseReceiverCache.newInstance(actorSystem, null))
.withMessage("The fallBackEntryExpiry must not be null!")
.withNoCause();
}

@Test
public void newInstanceWithNegativeDurationThrowsException() {
assertThatIllegalArgumentException()
.isThrownBy(() -> ResponseReceiverCache.newInstance(Duration.ofSeconds(-1)))
.isThrownBy(() -> ResponseReceiverCache.newInstance(actorSystem, Duration.ofSeconds(-1)))
.withMessage("The fallBackEntryExpiry must be positive.")
.withNoCause();
}

@Test
public void newInstanceWithZeroDurationThrowsException() {
assertThatIllegalArgumentException()
.isThrownBy(() -> ResponseReceiverCache.newInstance(Duration.ZERO))
.isThrownBy(() -> ResponseReceiverCache.newInstance(actorSystem, Duration.ZERO))
.withMessage("The fallBackEntryExpiry must be positive.")
.withNoCause();
}

@Test
public void cacheNullSignalThrowsException() {
final var underTest = ResponseReceiverCache.newInstance();
final var underTest = ResponseReceiverCache.newInstance(actorSystem);

assertThatNullPointerException()
.isThrownBy(() -> underTest.cacheSignalResponseReceiver(null, null))
Expand All @@ -89,7 +103,7 @@ public void cacheNullSignalThrowsException() {

@Test
public void cacheNullResponseReceiverThrowsException() {
final var underTest = ResponseReceiverCache.newInstance();
final var underTest = ResponseReceiverCache.newInstance(actorSystem);
final var command = Mockito.mock(Command.class);
Mockito.when(command.getDittoHeaders())
.thenReturn(DittoHeaders.newBuilder().correlationId(testNameCorrelationId.getCorrelationId()).build());
Expand All @@ -102,7 +116,7 @@ public void cacheNullResponseReceiverThrowsException() {

@Test
public void getWithNullCorrelationIdThrowsException() {
final var underTest = ResponseReceiverCache.newInstance();
final var underTest = ResponseReceiverCache.newInstance(actorSystem);

assertThatNullPointerException()
.isThrownBy(() -> underTest.get(null))
Expand All @@ -112,7 +126,7 @@ public void getWithNullCorrelationIdThrowsException() {

@Test
public void getWithEmptyCorrelationIdThrowsException() {
final var underTest = ResponseReceiverCache.newInstance();
final var underTest = ResponseReceiverCache.newInstance(actorSystem);

assertThatIllegalArgumentException()
.isThrownBy(() -> underTest.get(""))
Expand All @@ -122,7 +136,7 @@ public void getWithEmptyCorrelationIdThrowsException() {

@Test
public void getWithBlankCorrelationIdThrowsException() {
final var underTest = ResponseReceiverCache.newInstance();
final var underTest = ResponseReceiverCache.newInstance(actorSystem);

assertThatIllegalArgumentException()
.isThrownBy(() -> underTest.get(" "))
Expand All @@ -137,7 +151,7 @@ public void getExistingEntryWithinExpiryReturnsResponseReceiver() {
final var correlationId = testNameCorrelationId.getCorrelationId();
Mockito.when(command.getDittoHeaders())
.thenReturn(getDittoHeadersWithCorrelationIdAndTimeout(correlationId, expiry));
final var underTest = ResponseReceiverCache.newInstance();
final var underTest = ResponseReceiverCache.newInstance(actorSystem);

final var mockReceiver = Mockito.mock(ActorRef.class);
underTest.cacheSignalResponseReceiver(command, mockReceiver);
Expand All @@ -155,7 +169,7 @@ public void getPreviouslyPutEntryAfterExpiryReturnsEmptyOptional() throws Execut
final var correlationId = testNameCorrelationId.getCorrelationId();
Mockito.when(command.getDittoHeaders())
.thenReturn(getDittoHeadersWithCorrelationIdAndTimeout(correlationId, expiry));
final var underTest = ResponseReceiverCache.newInstance();
final var underTest = ResponseReceiverCache.newInstance(actorSystem);

underTest.cacheSignalResponseReceiver(command, Mockito.mock(ActorRef.class));

Expand All @@ -170,7 +184,7 @@ public void getPreviouslyPutEntryAfterExpiryReturnsEmptyOptional() throws Execut
public void getEntryFromEmptyCacheReturnsEmptyOptional()
throws ExecutionException, InterruptedException, TimeoutException {

final var underTest = ResponseReceiverCache.newInstance();
final var underTest = ResponseReceiverCache.newInstance(actorSystem);

final var cacheEntryFuture = underTest.get(testNameCorrelationId.getCorrelationId());

Expand Down Expand Up @@ -202,7 +216,7 @@ public void getEntriesWithDifferentExpiryReturnsExpected() {
Mockito.mock(ActorRef.class), AUTHORIZATION_CONTEXT))
.collect(Collectors.toList());

final var underTest = ResponseReceiverCache.newInstance();
final var underTest = ResponseReceiverCache.newInstance(actorSystem);
IntStream.range(0, expirySequence.size())
.forEach(index -> underTest.cacheSignalResponseReceiver(commands.get(index), responseReceivers.get(index).sender()));

Expand Down

0 comments on commit 4896fe0

Please sign in to comment.