Skip to content

Commit

Permalink
Invalidate ResponseReceiver cache after a response is received.
Browse files Browse the repository at this point in the history
In order to restore behaviour, that a message with the same correlation-id as a previous message gets handled wit the same correlation-id (not a suffixed correlation-id, which we only want if 2 messages with the same correlation-id are processed at the same time). Previous the default expiry of 2 minutes leads to changed correlation-ids in the second message, even if the first message was already answered.

Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Feb 28, 2022
1 parent 8abf9d5 commit 2583c7e
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ private CompletionStage<Contextual<WithDittoHeaders>> returnCommandResponseConte
final var receiver = responseReceiverEntry.get();
log().info("Scheduling CommandResponse <{}> to original sender <{}>", liveResponse, receiver);
commandResponseContextual = withMessageToReceiver(liveResponse, receiver);
responseReceiverCache.invalidate(correlationId);
} else {
log().info("Got <{}> with unknown correlation ID: <{}>", liveResponse.getType(), correlationId);
commandResponseContextual = withMessageToReceiver(null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,22 @@ public CompletableFuture<Optional<ActorRef>> get(final CharSequence correlationI
return cache.get(CorrelationIdKey.forCacheRetrieval(correlationIdString));
}

/**
* Invalidates the cached response receiver for the specified correlation ID argument.
*
* @param correlationId the correlation ID to invalidate the cached response receiver for.
* @throws NullPointerException if {@code correlationId} is {@code null}.
* @throws IllegalArgumentException if {@code correlationId} is empty or blank.
*/
public void invalidate(final CharSequence correlationId) {
final var correlationIdString = checkNotNull(correlationId, "correlationId").toString();
ConditionChecker.checkArgument(correlationIdString,
Predicate.not(String::isBlank),
() -> "The correlationId must not be blank.");

cache.invalidate(CorrelationIdKey.forCacheRetrieval(correlationIdString));
}

/**
* Insert a response receiver for a live or message command.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,61 @@ public void retrieveLiveThingCommandAndResponseByPolicy() {
}};
}

@Test
public void correlatiIdSameAfterResponseSuccessfull() {
final PolicyId policyId = PolicyId.of("policy", "id");
final JsonObject thingWithPolicy = newThingWithPolicyId(policyId);
final JsonObject policy = PoliciesModelFactory.newPolicyBuilder(policyId)
.setRevision(1L)
.forLabel("authorize-self")
.setSubject(GOOGLE, SUBJECT_ID)
.setGrantedPermissions(PoliciesResourceType.thingResource("/"),
Permissions.newInstance(Permission.READ, Permission.WRITE))
.setRevokedPermissions(PoliciesResourceType.thingResource("/features/x/properties/key2"),
Permissions.newInstance(Permission.READ))
.build()
.toJson(FieldType.all());
final SudoRetrieveThingResponse sudoRetrieveThingResponse =
SudoRetrieveThingResponse.of(thingWithPolicy, DittoHeaders.empty());
final SudoRetrievePolicyResponse sudoRetrievePolicyResponse =
SudoRetrievePolicyResponse.of(policyId, policy, DittoHeaders.empty());

new TestKit(system) {{
mockEntitiesActorInstance.setReply(TestSetup.THING_SUDO, sudoRetrieveThingResponse);
mockEntitiesActorInstance.setReply(TestSetup.POLICY_SUDO, sudoRetrievePolicyResponse);

final ActorRef underTest = newEnforcerActor(getRef());

final DittoHeaders headers = headers();
final ThingCommand<?> read = getRetrieveThingCommand(headers);

underTest.tell(read, getRef());

final var responseHeaders = headers.toBuilder()
.authorizationContext(AuthorizationContext.newInstance(
DittoAuthorizationContextType.PRE_AUTHENTICATED_CONNECTION,
AuthorizationSubject.newInstance("myIssuer:mySubject")))
.build();

final ThingCommandResponse<?> readResponse = getRetrieveThingResponse(responseHeaders);

// Second message right after the response for the first was sent, should have the same correlation-id (Not suffixed).
underTest.tell(readResponse, getRef());
final RetrieveThingResponse retrieveThingResponse =
TestSetup.fishForMsgClass(this, RetrieveThingResponse.class);
assertThat(retrieveThingResponse.getDittoHeaders().getCorrelationId()).isEqualTo(
read.getDittoHeaders().getCorrelationId());

underTest.tell(read, getRef());

underTest.tell(readResponse, getRef());
final RetrieveThingResponse retrieveThingResponse2 =
TestSetup.fishForMsgClass(this, RetrieveThingResponse.class);
assertThat(retrieveThingResponse2.getDittoHeaders().getCorrelationId()).isEqualTo(
read.getDittoHeaders().getCorrelationId());
}};
}

@Test
public void acceptMessageCommandByPolicy() {
final PolicyId policyId = PolicyId.of("policy:id");
Expand Down Expand Up @@ -421,9 +476,9 @@ private static ThingCommand<?> getModifyFeatureCommand(final DittoHeaders header

private static MessageCommand<?, ?> thingMessageCommand() {
final Message<Object> message = Message.newBuilder(
MessageBuilder.newHeadersBuilder(MessageDirection.TO, TestSetup.THING_ID, "my-subject")
.contentType("text/plain")
.build())
MessageBuilder.newHeadersBuilder(MessageDirection.TO, TestSetup.THING_ID, "my-subject")
.contentType("text/plain")
.build())
.payload("Hello you!")
.build();
return SendThingMessage.of(TestSetup.THING_ID, message, headers());
Expand All @@ -441,10 +496,10 @@ private static ThingEvent<?> liveEventRevoked() {

private static MessageCommand<?, ?> featureMessageCommand() {
final Message<?> message = Message.newBuilder(
MessageBuilder.newHeadersBuilder(MessageDirection.TO, TestSetup.THING_ID, "my-subject")
.contentType("text/plain")
.featureId("foo")
.build())
MessageBuilder.newHeadersBuilder(MessageDirection.TO, TestSetup.THING_ID, "my-subject")
.contentType("text/plain")
.featureId("foo")
.build())
.payload("Hello you!")
.build();
return SendFeatureMessage.of(TestSetup.THING_ID, "foo", message, headers());
Expand Down

0 comments on commit 2583c7e

Please sign in to comment.