Skip to content

Commit

Permalink
Issue #ditto/757: always throw IllegalStateException on missing expec…
Browse files Browse the repository at this point in the history
…ted acknowledgement label; throw IllegalArgumentException when told to send signals with missing acknowledgement requests.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 21, 2020
1 parent b598297 commit d2457f8
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 15 deletions.
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.client.ack.internal;

import java.util.Set;

import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.acks.AcknowledgementRequest;

/**
* Validate an acknowledgement request from the client.
*/
public final class AcknowledgementRequestsValidator {

private AcknowledgementRequestsValidator() {}

/**
* Validate acknowledgement requests from the client.
*
* @param acknowledgementRequests the acknowledgement requests of a signal sent by the client.
* @param mandatoryLabel the mandatory acknowledgement label for the channel.
*/
public static void validate(final Set<AcknowledgementRequest> acknowledgementRequests,
final AcknowledgementLabel mandatoryLabel) {

if (!acknowledgementRequests.isEmpty() &&
!acknowledgementRequests.contains(AcknowledgementRequest.of(mandatoryLabel))) {
throw new IllegalArgumentException("Expected acknowledgement request for label '" +
mandatoryLabel +
"' not found. Please make sure to always request the '" +
mandatoryLabel +
"' Acknowledgement if you need to process the response in the client.");
}
}

/**
* Create an {@code IllegalStateException} when receiving acknowledgements from the back-end not containing
* an expected acknowledgement label.
*
* @param mandatoryLabel the mandatory acknowledgement label.
* @return the {@code IllegalStateException}.
*/
public static IllegalStateException didNotReceiveAcknowledgement(final AcknowledgementLabel mandatoryLabel) {
return new IllegalStateException("Didn't receive an Acknowledgement for label '" +
mandatoryLabel + "'. Please make sure to always request the '" +
mandatoryLabel + "' Acknowledgement if you need to process the " +
"response in the client.");
}
}
Expand Up @@ -25,6 +25,7 @@

import javax.annotation.Nonnull;

import org.eclipse.ditto.client.ack.internal.AcknowledgementRequestsValidator;
import org.eclipse.ditto.client.internal.bus.Classification;
import org.eclipse.ditto.client.management.AcknowledgementsFailedException;
import org.eclipse.ditto.client.messaging.MessagingProvider;
Expand Down Expand Up @@ -163,7 +164,7 @@ protected <T extends ThingCommand<T>, S extends CommandResponse<?>, R> Completio
final T command,
final Class<S> expectedResponse,
final Function<S, R> onSuccess) {
final ThingCommand<?> commandWithChannel = setChannel(command, channel);
final ThingCommand<?> commandWithChannel = validateAckRequests(setChannel(command, channel));
return sendSignalAndExpectResponse(commandWithChannel, expectedResponse, onSuccess, ErrorResponse.class,
ErrorResponse::getDittoRuntimeException);
}
Expand Down Expand Up @@ -258,19 +259,23 @@ protected static Signal<?> adjustHeadersForLiveSignal(final Signal<?> signal) {
return adjustHeadersForLive((Signal) signal);
}

private ThingCommand<?> validateAckRequests(final ThingCommand<?> thingCommand) {
AcknowledgementRequestsValidator.validate(thingCommand.getDittoHeaders().getAcknowledgementRequests(),
getThingResponseAcknowledgementLabel());
return thingCommand;
}

private CommandResponse<?> extractCommandResponseFromAcknowledgements(final Signal<?> signal,
final Acknowledgements acknowledgements) {
if (areFailedAcknowledgements(acknowledgements.getStatusCode())) {
throw AcknowledgementsFailedException.of(acknowledgements);
} else {
final AcknowledgementLabel expectedLabel = getThingResponseAcknowledgementLabel();
return acknowledgements.stream()
.filter(ack -> ack.getLabel().equals(getThingResponseAcknowledgementLabel()))
.filter(ack -> ack.getLabel().equals(expectedLabel))
.findFirst()
.map(ack -> createThingModifyCommandResponseFromAcknowledgement(signal, ack))
.orElseThrow(() -> new IllegalStateException("Didn't receive an Acknowledgement for label '" +
getThingResponseAcknowledgementLabel() + "'. Please make sure to always request the '" +
getThingResponseAcknowledgementLabel() + "' Acknowledgement if you need to process the " +
"response in the client."));
.orElseThrow(() -> AcknowledgementRequestsValidator.didNotReceiveAcknowledgement(expectedLabel));
}
}

Expand Down
Expand Up @@ -25,9 +25,11 @@
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.client.ack.ResponseConsumer;
import org.eclipse.ditto.client.ack.internal.AcknowledgementRequestsValidator;
import org.eclipse.ditto.client.live.messages.MessageSender;
import org.eclipse.ditto.client.management.AcknowledgementsFailedException;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
Expand All @@ -38,13 +40,12 @@
import org.eclipse.ditto.model.messages.MessageHeadersBuilder;
import org.eclipse.ditto.model.messages.MessagesModelFactory;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.acks.base.Acknowledgements;
import org.eclipse.ditto.signals.base.WithOptionalEntity;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
import org.eclipse.ditto.signals.commands.messages.MessageCommandResponse;
import org.eclipse.ditto.signals.commands.messages.MessageDeserializer;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -178,25 +179,24 @@ private static <T> ResponseConsumer<?> createCommandResponseConsumer(final Class
message = null;
errorToPublish = AcknowledgementsFailedException.of((Acknowledgements) response);
} else {
final Optional<Message<Object>> messageOptional =
((Acknowledgements) response).getAcknowledgement(DittoAcknowledgementLabel.LIVE_RESPONSE)
.flatMap(WithOptionalEntity::getEntity)
.map(JsonValue::asObject)
.map(MessageDeserializer::deserializeMessageFromJson);
final AcknowledgementLabel expectedLabel = DittoAcknowledgementLabel.LIVE_RESPONSE;
final Optional<Message<?>> messageOptional =
((Acknowledgements) response).getAcknowledgement(expectedLabel)
.flatMap(ImmutableMessageSender::getMessageResponseInAcknowledgement);
if (messageOptional.isPresent()) {
message = messageOptional.get();
errorToPublish = null;
} else {
message = null;
errorToPublish = AcknowledgementsFailedException.of((Acknowledgements) response);
errorToPublish = AcknowledgementRequestsValidator.didNotReceiveAcknowledgement(expectedLabel);
}
}
} else if (response instanceof MessageCommandResponse) {
message = ((MessageCommandResponse<?, ?>) response).getMessage();
errorToPublish = null;
} else if (response instanceof ErrorResponse) {
message = null;
errorToPublish = ((ThingErrorResponse) response).getDittoRuntimeException();
errorToPublish = ((ErrorResponse<?>) response).getDittoRuntimeException();
} else if (response == null) {
message = null;
errorToPublish = error;
Expand All @@ -213,6 +213,21 @@ private static <T> ResponseConsumer<?> createCommandResponseConsumer(final Class
});
}

private static Optional<Message<?>> getMessageResponseInAcknowledgement(final Acknowledgement ack) {
final Optional<Message<?>> deserializedMessage =
ack.getEntity().map(JsonValue::asObject).map(MessageDeserializer::deserializeMessageFromJson);

return deserializedMessage.map(message ->
MessagesModelFactory.newMessageBuilder(message.getHeaders().toBuilder()
.statusCode(ack.getStatusCode())
.build())
.payload(message.getPayload().orElse(null))
.rawPayload(message.getRawPayload().orElse(null))
.extra(message.getExtra().orElse(null))
.build()
);
}

private static <T> void checkPayloadTypeAndAccept(final Class<T> clazz,
final BiConsumer<Message<T>, Throwable> responseMessageHandler,
final Message<?> message,
Expand Down Expand Up @@ -310,6 +325,8 @@ public SetPayloadOrSend<T> statusCode(final HttpStatusCode statusCode) {

@Override
public SetPayloadOrSend<T> headers(final DittoHeaders additionalHeaders) {
AcknowledgementRequestsValidator.validate(additionalHeaders.getAcknowledgementRequests(),
DittoAcknowledgementLabel.LIVE_RESPONSE);
messageAdditionalHeaders = additionalHeaders;
return this;
}
Expand Down
131 changes: 131 additions & 0 deletions java/src/test/java/org/eclipse/ditto/client/DittoClientLiveTest.java
Expand Up @@ -13,11 +13,14 @@
package org.eclipse.ditto.client;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.eclipse.ditto.client.TestConstants.Policy.POLICY_ID;
import static org.eclipse.ditto.client.TestConstants.Thing.THING_ID;
import static org.eclipse.ditto.model.base.acks.AcknowledgementRequest.parseAcknowledgementRequest;
import static org.eclipse.ditto.model.base.acks.DittoAcknowledgementLabel.LIVE_RESPONSE;

import java.util.AbstractMap;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand All @@ -30,16 +33,20 @@
import org.eclipse.ditto.client.live.messages.MessageRegistration;
import org.eclipse.ditto.client.live.messages.MessageSender;
import org.eclipse.ditto.client.live.messages.RepliableMessage;
import org.eclipse.ditto.client.management.AcknowledgementsFailedException;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.messages.MessageDirection;
import org.eclipse.ditto.model.messages.MessagePayloadSizeTooLargeException;
import org.eclipse.ditto.model.things.Feature;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.model.things.ThingsModelFactory;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.acks.base.Acknowledgements;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.live.modify.CreateThingLiveCommand;
import org.eclipse.ditto.signals.commands.live.modify.CreateThingLiveCommandAnswerBuilder;
Expand Down Expand Up @@ -138,6 +145,130 @@ public void sendMessageAndGetErrorResponse() {
}
}

@Test
public void sendThingMessageAndGetSuccessAcknowledgements() {
final CompletableFuture<Void> future = new CompletableFuture<>();
client.live().message().from(THING_ID)
.subject("request")
.headers(DittoHeaders.newBuilder()
.putHeader("my-header", "my-header-value")
.acknowledgementRequest(
parseAcknowledgementRequest("live-response"),
parseAcknowledgementRequest("custom")
)
.build())
.payload("payload")
.contentType("text/plain")
.send(String.class, (message, error) -> {
try {
if (error != null) {
future.completeExceptionally(error);
} else {
assertThat(message.getStatusCode()).contains(HttpStatusCode.ALREADY_REPORTED);
assertThat(message.getHeaders())
.contains(new AbstractMap.SimpleEntry<>("my-header", "my-header-value"));
assertThat(message.getHeaders().getDirection()).isEqualTo(MessageDirection.FROM);
assertThat(message.getSubject()).isEqualTo("request");
assertThat(message.getPayload()).contains("payload");
future.complete(null);
}
} catch (final Throwable e) {
future.completeExceptionally(e);
}
});

final SendThingMessage<?> command = expectMsgClass(SendThingMessage.class);
final SendThingMessageResponse<?> response =
SendThingMessageResponse.of(command.getEntityId(), command.getMessage(),
HttpStatusCode.ALREADY_REPORTED, command.getDittoHeaders());
final Acknowledgement liveResponseAck = Acknowledgement.of(
LIVE_RESPONSE,
response.getEntityId(),
response.getStatusCode(),
response.getDittoHeaders(),
response.toJson().getValueOrThrow(MessageCommand.JsonFields.JSON_MESSAGE)
);
final Acknowledgement customAck = Acknowledgement.of(
AcknowledgementLabel.of("custom"),
command.getEntityId(),
HttpStatusCode.NO_CONTENT,
command.getDittoHeaders()
);
reply(Acknowledgements.of(Arrays.asList(liveResponseAck, customAck), command.getDittoHeaders()));
try {
future.get(TIMEOUT, TIME_UNIT);
} catch (final Exception e) {
throw new AssertionError(e);
}
}

@Test
public void sendThingMessageAndGetFailedAcknowledgements() {
final CompletableFuture<Void> future = new CompletableFuture<>();
client.live().message().from(THING_ID)
.subject("request")
.headers(DittoHeaders.newBuilder()
.putHeader("my-header", "my-header-value")
.acknowledgementRequest(
parseAcknowledgementRequest("live-response"),
parseAcknowledgementRequest("custom")
)
.build())
.payload("payload")
.contentType("text/plain")
.send(String.class, (message, error) -> {
try {
assertThat(error).isInstanceOf(AcknowledgementsFailedException.class);
final Acknowledgements acks = ((AcknowledgementsFailedException) error).getAcknowledgements();
assertThat(acks.getStatusCode()).isEqualTo(HttpStatusCode.FAILED_DEPENDENCY);
assertThat(acks.getAcknowledgement(LIVE_RESPONSE).map(Acknowledgement::getStatusCode))
.contains(HttpStatusCode.IM_A_TEAPOT);
assertThat(acks.getAcknowledgement(AcknowledgementLabel.of("custom"))
.map(Acknowledgement::getStatusCode))
.contains(HttpStatusCode.NO_CONTENT);
future.complete(null);
} catch (final Throwable e) {
future.completeExceptionally(e);
}
});

final SendThingMessage<?> command = expectMsgClass(SendThingMessage.class);
final SendThingMessageResponse<?> response =
SendThingMessageResponse.of(command.getEntityId(), command.getMessage(),
HttpStatusCode.IM_A_TEAPOT, command.getDittoHeaders());
final Acknowledgement liveResponseAck = Acknowledgement.of(
LIVE_RESPONSE,
response.getEntityId(),
response.getStatusCode(),
response.getDittoHeaders(),
response.toJson().getValueOrThrow(MessageCommand.JsonFields.JSON_MESSAGE)
);
final Acknowledgement customAck = Acknowledgement.of(
AcknowledgementLabel.of("custom"),
command.getEntityId(),
HttpStatusCode.NO_CONTENT,
command.getDittoHeaders()
);
reply(Acknowledgements.of(Arrays.asList(liveResponseAck, customAck), command.getDittoHeaders()));
try {
future.get(TIMEOUT, TIME_UNIT);
} catch (final Exception e) {
throw new AssertionError(e);
}
}

@Test
public void sendThingMessageWithoutLiveResponseAckRequest() {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> client.live().message().from(THING_ID)
.subject("request")
.headers(DittoHeaders.newBuilder()
.putHeader("my-header", "my-header-value")
.acknowledgementRequest(parseAcknowledgementRequest("custom"))
.build())
);
}

@Test
public void emitEvent() {
client.live().emitEvent(factory -> factory.featureDeleted(THING_ID, FEATURE_ID));
Expand Down
Expand Up @@ -98,6 +98,17 @@ public void testCreateThing() {
expectMsgClass(CreateThing.class).getDittoHeaders()));
}

@Test
public void testCreateThingWithCustomAcknowledgementsOnly() {
final AcknowledgementLabel label1 = AcknowledgementLabel.of("custom-ack-1");
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> getManagement()
.create(THING_ID, Options.headers(DittoHeaders.newBuilder()
.acknowledgementRequest(AcknowledgementRequest.of(label1))
.build()))
);
}

@Test
public void testCreateThingWith2Acknowledgements() {
final AcknowledgementLabel label1 = AcknowledgementLabel.of("custom-ack-1");
Expand Down

0 comments on commit d2457f8

Please sign in to comment.