Skip to content

Commit

Permalink
fixed error handling for live messages + improved thingId calculation
Browse files Browse the repository at this point in the history
for messages ThingErrorResponses

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch-si.com>
  • Loading branch information
thjaeckle committed Aug 29, 2018
1 parent 10af690 commit c16578f
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 10 deletions.
Expand Up @@ -13,10 +13,14 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import javax.annotation.Nullable;

import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonMissingFieldException;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
Expand All @@ -30,6 +34,8 @@
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.messages.MessageCommandResponse;
import org.eclipse.ditto.signals.commands.messages.MessageErrorRegistry;
import org.eclipse.ditto.signals.commands.policies.exceptions.PolicyErrorRegistry;
import org.eclipse.ditto.signals.commands.things.ThingCommandResponse;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import org.eclipse.ditto.signals.commands.things.exceptions.ThingErrorRegistry;
Expand Down Expand Up @@ -329,17 +335,19 @@ private Signal<?> fromTwinAdaptable(final Adaptable adaptable) {
}
}

@Nullable
private Signal<?> signalFromAdaptable(final Adaptable adaptable, final TopicPath topicPath) {
if (TopicPath.Criterion.COMMANDS.equals(topicPath.getCriterion())) {
final boolean isResponse = adaptable.getPayload().getStatus().isPresent();

if (TopicPath.Action.RETRIEVE.equals(topicPath.getAction().orElse(null))) {
return isResponse ? thingQueryCommandResponseAdapter.fromAdaptable(adaptable) :
thingQueryCommandAdapter.fromAdaptable(adaptable);
if (adaptable.getPayload().getStatus().isPresent()) {
// this was a command response:
return processCommandResponseSignalFromAdaptable(adaptable, topicPath);
} else if (TopicPath.Action.RETRIEVE.equals(topicPath.getAction().orElse(null))) {
return thingQueryCommandAdapter.fromAdaptable(adaptable);
} else {
return isResponse ? thingModifyCommandResponseAdapter.fromAdaptable(adaptable) :
thingModifyCommandAdapter.fromAdaptable(adaptable);
return thingModifyCommandAdapter.fromAdaptable(adaptable);
}

} else if (TopicPath.Criterion.EVENTS.equals(topicPath.getCriterion())) {
return thingEventAdapter.fromAdaptable(adaptable);
} else if (TopicPath.Criterion.ERRORS.equals(topicPath.getCriterion())) {
Expand All @@ -348,6 +356,20 @@ private Signal<?> signalFromAdaptable(final Adaptable adaptable, final TopicPath
return null;
}

private Signal<?> processCommandResponseSignalFromAdaptable(final Adaptable adaptable, final TopicPath topicPath) {
final Optional<HttpStatusCode> status = adaptable.getPayload().getStatus();
final boolean isErrorResponse =
status.isPresent() && status.get().toInt() >= HttpStatusCode.BAD_REQUEST.toInt();

if (TopicPath.Action.RETRIEVE.equals(topicPath.getAction().orElse(null))) {
return isErrorResponse ? thingErrorResponseFromAdaptable(adaptable) :
thingQueryCommandResponseAdapter.fromAdaptable(adaptable);
} else {
return isErrorResponse ? thingErrorResponseFromAdaptable(adaptable) :
thingModifyCommandResponseAdapter.fromAdaptable(adaptable);
}
}

/**
* Creates a {@code ThingErrorResponse} from the given {@code adaptable}.
*
Expand Down Expand Up @@ -396,6 +418,12 @@ public static ProtocolAdapterErrorRegistry newInstance() {
final ThingErrorRegistry thingErrorRegistry = ThingErrorRegistry.newInstance();
thingErrorRegistry.getTypes()
.forEach(type -> parseStrategies.put(type, thingErrorRegistry));
final PolicyErrorRegistry policyErrorRegistry = PolicyErrorRegistry.newInstance();
policyErrorRegistry.getTypes()
.forEach(type -> parseStrategies.put(type, policyErrorRegistry));
final MessageErrorRegistry messageErrorRegistry = MessageErrorRegistry.newInstance();
messageErrorRegistry.getTypes()
.forEach(type -> parseStrategies.put(type, messageErrorRegistry));

// Protocol Adapter exceptions:
parseStrategies.put(UnknownSignalException.ERROR_CODE, UnknownSignalException::fromJson);
Expand Down
Expand Up @@ -19,5 +19,4 @@
* state of that object but a new object with the altered state is returned instead. This is the same behavior like it
* is shown by java.lang.String for example.
*/
@org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault
package org.eclipse.ditto.protocoladapter;
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
import org.eclipse.ditto.model.base.json.Jsonifiable;
import org.eclipse.ditto.model.messages.MessageHeaders;
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.JsonifiableAdaptable;
import org.eclipse.ditto.protocoladapter.ProtocolAdapter;
Expand Down Expand Up @@ -296,7 +297,14 @@ private static Function<Jsonifiable.WithPredicate<JsonObject, JsonField>, String
}

final Adaptable adaptable;
if (jsonifiable instanceof Signal && isLiveSignal((Signal<?>) jsonifiable)) {
if (jsonifiable instanceof WithDittoHeaders
&& ((WithDittoHeaders) jsonifiable).getDittoHeaders().getChannel().isPresent()) {
// if channel was present in headers, use that one:
final TopicPath.Channel channel =
TopicPath.Channel.forName(((WithDittoHeaders) jsonifiable).getDittoHeaders().getChannel().get())
.orElse(TopicPath.Channel.TWIN);
adaptable = jsonifiableToAdaptable(jsonifiable, channel, adapter);
} else if (jsonifiable instanceof Signal && isLiveSignal((Signal<?>) jsonifiable)) {
adaptable = jsonifiableToAdaptable(jsonifiable, TopicPath.Channel.LIVE, adapter);
} else {
adaptable = jsonifiableToAdaptable(jsonifiable, TopicPath.Channel.TWIN, adapter);
Expand Down Expand Up @@ -347,8 +355,14 @@ private static Adaptable jsonifiableToAdaptable(final Jsonifiable.WithPredicate<
final DittoHeaders enhancedHeaders = ((DittoRuntimeException) jsonifiable).getDittoHeaders().toBuilder()
.channel(channel.getName())
.build();
final ThingErrorResponse errorResponse =
ThingErrorResponse.of((DittoRuntimeException) jsonifiable, enhancedHeaders);
ThingErrorResponse errorResponse;
try {
errorResponse = ThingErrorResponse.of(MessageHeaders.of(enhancedHeaders).getThingId(),
(DittoRuntimeException) jsonifiable, enhancedHeaders);
} catch (final IllegalStateException e) {
// thrown if headers did not contain the thing ID:
errorResponse = ThingErrorResponse.of((DittoRuntimeException) jsonifiable, enhancedHeaders);
}
adaptable = adapter.toAdaptable(errorResponse, channel);
} else {
throw new IllegalArgumentException("Jsonifiable was neither Command nor CommandResponse nor"
Expand Down

0 comments on commit c16578f

Please sign in to comment.