Skip to content

Commit

Permalink
fixed that CommandAndCommandResponseMatchingValidator.validateCorrela…
Browse files Browse the repository at this point in the history
…tionIdsMatch also match when the correlation-id of a commandResponse starts with the correlation-id of the command

* in addition, adjust concierge's  ResponseReceiverCache to not generate a random new UUID in case of a collision, but append a new UUID to the collided correlation-id instead

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jan 28, 2022
1 parent 73da18e commit e457776
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -202,24 +203,34 @@ private <S extends Signal<?>> CompletionStage<S> setUniqueCorrelationIdForGlobal
final boolean refreshCorrelationId) {

final String correlationId;
final AtomicBoolean correlationIdRefreshed = new AtomicBoolean(false);
if (refreshCorrelationId) {
correlationId = UUID.randomUUID().toString();
final String newId = UUID.randomUUID().toString();
correlationId = SignalInformationPoint.getCorrelationId(signal)
.map(existingId -> existingId + "_" + newId)
.orElse(newId);
correlationIdRefreshed.set(true);
} else {
correlationId = SignalInformationPoint.getCorrelationId(signal)
.orElseGet(() -> UUID.randomUUID().toString());
.orElseGet(() -> {
correlationIdRefreshed.set(true);
return UUID.randomUUID().toString();
});
}

return get(correlationId)
.thenCompose(entry -> {
final CompletionStage<S> result;
if (entry.isPresent()) {
result = setUniqueCorrelationIdForGlobalDispatching(signal, true);
} else {
} else if (correlationIdRefreshed.get()) {
result = CompletableFuture.completedFuture(
(S) signal.setDittoHeaders(DittoHeaders.newBuilder(signal.getDittoHeaders())
.correlationId(correlationId)
.build())
);
} else {
result = CompletableFuture.completedFuture(signal);
}
return result;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,42 @@ private static MatchingValidationResult validateCorrelationIdsMatch(final Comman
final CommandResponse<?> commandResponse) {

final MatchingValidationResult result;

final var commandCorrelationId = getCorrelationId(command);
final var commandResponseCorrelationId = getCorrelationId(commandResponse);
if (commandCorrelationId.equals(commandResponseCorrelationId)) {
if (doCommandAndResponseCorrelationIdsMatch(command, commandResponse)) {
result = MatchingValidationResult.success();
} else {
final var pattern = "Correlation ID of live response <{0}> differs from" +
" correlation ID of command <{1}>.";
result = MatchingValidationResult.failure(command,
commandResponse,
MessageFormat.format(pattern,
commandResponseCorrelationId.orElse(null),
commandCorrelationId.orElse(null)));
getCorrelationId(commandResponse).orElse(null),
getCorrelationId(command).orElse(null)));
}

return result;
}

/**
* Checks whether the {@code command} and {@code commandResponse} correlation-ids are either completely equal or
* if the response's correlation-id "starts with" the command's correlation-id.
* That could be the case for correlation-id collisions detected in concierge's {@code ResponseReceiverCache} in
* which case a newly created UUID is appended to the collided previous correlation-id.
*
* @param command the command to extract the correlation-id to check.
* @param commandResponse the commandResponse to extract the correlation-id to check.
* @return {@code true} if the IDs match.
*/
private static boolean doCommandAndResponseCorrelationIdsMatch(final Command<?> command,
final CommandResponse<?> commandResponse) {

final var commandCorrelationId = getCorrelationId(command);
final var commandResponseCorrelationId = getCorrelationId(commandResponse);
return commandCorrelationId.equals(commandResponseCorrelationId) ||
commandResponseCorrelationId.filter(responseId ->
commandCorrelationId.filter(responseId::startsWith).isPresent()
).isPresent();
}

private static Optional<String> getCorrelationId(final Signal<?> signal) {
return SignalInformationPoint.getCorrelationId(signal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.util.UUID;

import org.assertj.core.api.JUnitSoftAssertions;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.common.HttpStatus;
Expand Down Expand Up @@ -132,6 +134,24 @@ public void applyCommandWithCorrelationIdAndCommandResponseWithoutCorrelationId(
correlationIdCommand));
}

@Test
public void applyCommandWithCorrelationIdAndCommandResponseWithSuffixedSameCorrelationId() {
final var correlationIdCommand = testNameCorrelationId.getCorrelationId();
final var dittoHeaders = DittoHeaders.newBuilder().correlationId(correlationIdCommand).build();
final var command = RetrieveThing.of(THING_ID, dittoHeaders);
final var correlationIdCommandResponse = correlationIdCommand.withSuffix("_" +
UUID.randomUUID());
final var responseDittoHeaders = DittoHeaders.newBuilder()
.correlationId(correlationIdCommandResponse)
.build();
final var commandResponse = RetrieveThingResponse.of(THING_ID, JsonObject.empty(), responseDittoHeaders);
final var underTest = CommandAndCommandResponseMatchingValidator.getInstance();

final var validationResult = underTest.apply(command, commandResponse);

assertThat(validationResult.isSuccess()).isTrue();
}

@Test
public void applyCommandWithoutCorrelationIdAndCommandResponseWithCorrelationId() {
final var command = RetrieveThing.of(THING_ID, DittoHeaders.empty());
Expand Down

0 comments on commit e457776

Please sign in to comment.