Skip to content

Commit

Permalink
fixed test in HttpPublisherActorTest;
Browse files Browse the repository at this point in the history
adapt optional chaining in CommandAndCommandResponseMatchingValidator to fix the tests;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Oct 21, 2021
1 parent b626f0c commit 6d0e231
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ private void sendMultiMappedOutboundSignal(final OutboundSignal.MultiMapped mult
.exceptionally(e -> {
logger.withCorrelationId(multiMapped.getSource())
.error(e, "Message sending failed unexpectedly: <{}>", multiMapped);

return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Stream;

import javax.annotation.concurrent.NotThreadSafe;

Expand Down Expand Up @@ -68,9 +69,13 @@ static CommandAndCommandResponseMatchingValidator newInstance(final ConnectionLo

@Override
public void accept(final SignalWithEntityId<?> sentCommand, final CommandResponse<?> commandResponse) {
final var validationError = validateCorrelationIdsMatch(sentCommand, commandResponse)
.flatMap(aVoid -> validateTypesMatch(sentCommand, commandResponse))
.flatMap(aVoid -> validateEntityIdsMatch(sentCommand, commandResponse));
final Optional<MessageSendingFailedException> validationError =
Stream.of(validateCorrelationIdsMatch(sentCommand, commandResponse),
validateTypesMatch(sentCommand, commandResponse),
validateEntityIdsMatch(sentCommand, commandResponse))
.filter(Optional::isPresent)
.map(Optional::get)
.findFirst();

if (validationError.isPresent()) {
final var sendingFailedException = validationError.get();
Expand Down Expand Up @@ -107,6 +112,7 @@ private static Optional<MessageSendingFailedException> validateCorrelationIdsMat
detailMessage = MessageFormat.format(pattern, commandResponseCorrelationIdOptional.get());

}

return Optional.ofNullable(detailMessage).map(toMessageSendingFailedException(commandDittoHeaders));
}

Expand All @@ -123,7 +129,7 @@ private static Function<String, MessageSendingFailedException> toMessageSendingF
private static Optional<MessageSendingFailedException> validateTypesMatch(final SignalWithEntityId<?> command,
final CommandResponse<?> commandResponse) {

if (isAcknowledgement(commandResponse)) {
if (isAcknowledgement(commandResponse) || ResponseType.ERROR == commandResponse.getResponseType()) {
return Optional.empty();
}

Expand All @@ -135,10 +141,7 @@ private static Optional<MessageSendingFailedException> validateTypesMatch(final
final var pattern = "Type of live response <{0}> is not related to type of command <{1}>.";
detailMessage = MessageFormat.format(pattern, commandResponseType, command.getType());
}
if (ResponseType.ERROR == commandResponse.getResponseType()) {
return Optional.empty();
}
if (isMessagesSignalDomain(semanticCommandResponseType)) {
else if (isMessagesSignalDomain(semanticCommandResponseType)) {
if (!areCorrespondingMessageSignals(command.getName(), commandResponse.getName())) {
final var pattern =
"Type of live message response <{0}> is not related to type of message command <{1}>.";
Expand All @@ -148,6 +151,7 @@ private static Optional<MessageSendingFailedException> validateTypesMatch(final
final var pattern = "Type of live response <{0}> is not related to type of command <{1}>.";
detailMessage = MessageFormat.format(pattern, commandResponseType, command.getType());
}

return Optional.ofNullable(detailMessage).map(toMessageSendingFailedException(command.getDittoHeaders()));
}

Expand All @@ -167,6 +171,7 @@ private static boolean isMessagesSignalDomain(final SemanticSignalType semanticC

private static boolean areCorrespondingMessageSignals(final String commandName, final String commandResponseName) {
final var indexOfResponseMessageSuffix = commandResponseName.indexOf("ResponseMessage");

return commandName.startsWith(commandResponseName.substring(0, indexOfResponseMessageSuffix));
}

Expand All @@ -190,6 +195,7 @@ private static Optional<MessageSendingFailedException> validateEntityIdsMatch(fi
final var pattern = "Live response has no entity ID while command has entity ID <{0}>";
detailMessage = MessageFormat.format(pattern, command.getEntityId());
}

return Optional.ofNullable(detailMessage).map(toMessageSendingFailedException(command.getDittoHeaders()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ protected CompletionStage<SendResult> publishMessage(final Signal<?> signal,
newContext(signal, autoAckTarget, request, message, maxTotalMessageSize, ackSizeQuota, resultFuture);
sourceQueue.offer(Pair.create(request, context))
.handle(handleQueueOfferResult(message, resultFuture));

return resultFuture;
}

Expand Down Expand Up @@ -355,6 +356,7 @@ private HttpRequest newRequestWithoutEntity(final HttpPublishTarget publishTarge

final var request = factory.newRequest(publishTarget).addHeaders(headers);
final var messageHeaders = message.getHeaders();

return request.withUri(setPathAndQuery(request.getUri(),
messageHeaders.get(ReservedHeaders.HTTP_PATH.name),
messageHeaders.get(ReservedHeaders.HTTP_QUERY.name)));
Expand All @@ -377,6 +379,7 @@ private BiFunction<QueueOfferResult, Throwable, Void> handleQueueOfferResult(fin
.dittoHeaders(message.getInternalHeaders())
.build());
}

return null;
};
}
Expand Down Expand Up @@ -419,8 +422,8 @@ private HttpPushContext newContext(final Signal<?> signal,
error.getMessage());
}
resultFuture.completeExceptionally(error);
escalate(error,
MessageFormat.format("Failed to send HTTP request to <{0}>.", stripUserInfo(request.getUri())));
escalate(error, MessageFormat.format("Failed to send HTTP request to <{0}>.",
stripUserInfo(request.getUri())));
}
};
}
Expand Down Expand Up @@ -520,6 +523,7 @@ private CompletionStage<SendResult> toCommandResponseOrAcknowledgement(final Sig
} else {
sendFailure = null;
}

return new SendResult(result, sendFailure, mergedDittoHeaders);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,14 @@ public abstract class AbstractPublisherActorTest {
protected static final Config CONFIG = ConfigFactory.load("test");
protected static final ThingId THING_ID = ThingId.of("thing", "id");
protected ActorSystem actorSystem;
protected TestProbe proxyActorTestProbe;
protected ActorRef proxyActor;

@Before
public void setUp() {
actorSystem = ActorSystem.create("AkkaTestSystem", CONFIG);
proxyActor = TestProbe.apply("proxyActor", actorSystem).ref();
proxyActorTestProbe = TestProbe.apply("proxyActor", actorSystem);
proxyActor = proxyActorTestProbe.ref();
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ public void testLiveCommandHttpPushCreatesLiveCommandResponseFromProtocolMessage
publisherActor.tell(signalToMultiMapped(command, target, testKit.getRef()), testKit.getRef());

// Assert
final var responseSignal = testKit.expectMsgClass(Signal.class);
final var responseSignal = proxyActorTestProbe.expectMsgClass(Signal.class);
assertThat(responseSignal).isInstanceOfSatisfying(RetrieveThingResponse.class, retrieveThingResponse -> {
assertThat((CharSequence) retrieveThingResponse.getEntityId()).isEqualTo(thingId);
assertThat(retrieveThingResponse.getHttpStatus()).isEqualTo(retrieveThingMockResponse.getHttpStatus());
Expand Down Expand Up @@ -535,7 +535,7 @@ public void sendingLiveResponseWithWrongCorrelationIdDoesNotWork() {
OutboundSignalFactory.newMultiMappedOutboundSignal(Collections.singletonList(mapped), getRef()),
getRef());

final var acknowledgements = expectMsgClass(Acknowledgements.class);
final var acknowledgements = expectMsgClass(Duration.ofSeconds(5), Acknowledgements.class);
assertThat((CharSequence) acknowledgements.getEntityId()).isEqualTo(TestConstants.Things.THING_ID);
assertThat(acknowledgements.getHttpStatus()).isEqualTo(HttpStatus.BAD_REQUEST);
assertThat(acknowledgements.getDittoHeaders().getCorrelationId())
Expand Down Expand Up @@ -936,6 +936,7 @@ private HttpRequest publishMessageWithHeaders(final Map<String, String> headers)
// THEN: reserved headers do not appear as HTTP headers
published.setValue(received.take());
}};

return published.getValue();
}

Expand All @@ -953,13 +954,10 @@ private T getValue() {

}

private OutboundSignal.MultiMapped newMultiMappedWithContentType(final Target target,
final ActorRef sender) {
private OutboundSignal.MultiMapped newMultiMappedWithContentType(final Target target, final ActorRef sender) {
return OutboundSignalFactory.newMultiMappedOutboundSignal(
List.of(getMockOutboundSignal(target, "requested-acks",
JsonArray.of(JsonValue.of("please-verify")).toString())),
sender
);
JsonArray.of(JsonValue.of("please-verify")).toString())), sender);
}

private HttpPushFactory mockHttpPushFactory(final String contentType, final HttpStatus httpStatus,
Expand Down Expand Up @@ -992,13 +990,15 @@ public HttpRequest newRequest(final HttpPublishTarget httpPublishTarget) {
final var separator = httpPublishTarget.getPathWithQuery().startsWith("/") ? "" : "/";
final var uri =
Uri.create("http://" + hostname + ":12345" + separator + httpPublishTarget.getPathWithQuery());

return HttpRequest.create().withMethod(httpPublishTarget.getMethod()).withUri(uri);
}

@Override
public <T> Flow<Pair<HttpRequest, T>, Pair<Try<HttpResponse>, T>, ?> createFlow(final ActorSystem system,
final LoggingAdapter log, final Duration requestTimeout, @Nullable final PreparedTimer timer,
@Nullable final Consumer<Duration> consumer) {

return Flow.<Pair<HttpRequest, T>>create()
.map(pair -> Pair.create(Try.apply(() -> mapper.apply(pair.first())), pair.second()));
}
Expand Down

0 comments on commit 6d0e231

Please sign in to comment.