Skip to content

Commit

Permalink
Issue #106: Do not send error responses via WebSocket for invalid liv…
Browse files Browse the repository at this point in the history
…e command responses.

If for example a device sends an invalid live response it probably is not able to handle an error response from Ditto. However, if it explicitly requests a response it will receive sucha an error response.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Dec 15, 2021
1 parent 7c645b5 commit 73a333b
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 86 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.gateway.service.endpoints.routes.websocket;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.gateway.service.streaming.StreamControlMessage;

/**
* This is a {@code StreamControlMessage} to explicitly express, that the WebSocket stream should remain as it is.
*/
@Immutable
final class NoOp implements StreamControlMessage {

@Nullable private static NoOp instance;

private NoOp() {
super();
}

/**
* Returns an instance of {@code NoOp}.
*
* @return the instance.
*/
static NoOp getInstance() {
var result = instance;
if (null == result) {
result = new NoOp();
instance = result;
}
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -41,6 +42,7 @@
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.base.model.signals.Signal;
Expand Down Expand Up @@ -86,6 +88,7 @@
import org.eclipse.ditto.protocol.JsonifiableAdaptable;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.protocol.mappingstrategies.IllegalAdaptableException;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse;
import org.eclipse.ditto.thingsearch.model.ThingSearchException;
Expand Down Expand Up @@ -390,8 +393,15 @@ private Flow<Message, DittoRuntimeException, NotUsed> createIncoming(final JsonS
.thenApply(result -> Source.repeat((ActorRef) result))
);

final var noOpStreamControlMessage = NoOp.getInstance();
return setAckRequestThenMergeLeftAndRight.zipWith(sessionActorSource, Pair::create)
.to(Sink.foreach(pair -> pair.second().tell(pair.first(), ActorRef.noSender())));
.to(Sink.foreach(pair -> {
final var actorRef = pair.second();
final var message = pair.first();
if (!noOpStreamControlMessage.equals(message)) {
actorRef.tell(message, ActorRef.noSender());
}
}));
}


Expand Down Expand Up @@ -441,46 +451,76 @@ private Flow<Message, String, NotUsed> getStrictifyFlow(final HttpRequest reques
if (streamControlMessage.isPresent()) {
result = Right.apply(Left.apply(streamControlMessage.get()));
} else {
final var initialInternalHeaders =
getInitialInternalHeaders(version, connectionAuthContext, connectionCorrelationId);
try {
final Signal<?> signal = buildSignal(cmdString, version, connectionCorrelationId,
connectionAuthContext, additionalHeaders, adapter, headerTranslator, logger);
final Signal<?> signal = buildSignal(connectionCorrelationId,
initialInternalHeaders,
getJsonifiableAdaptableOrThrow(cmdString, initialInternalHeaders),
additionalHeaders,
adapter,
headerTranslator,
logger);
final StartedTrace trace = DittoTracing.trace(signal, "gw.streaming.in.signal")
.tag(TracingTags.SIGNAL_TYPE, signal.getType())
.start();
final Signal<?> tracedSignal = DittoTracing.propagateContext(trace.getContext(), signal);
result = Right.apply(Right.apply(tracedSignal));
trace.finish();
} catch (final DittoRuntimeException dre) {
} catch (final IllegalAdaptableException e) {
logSignalBuildingFailure(logger.withCorrelationId(e)::info, e, cmdString);
final var failure = e.setDittoHeaders(DittoHeaders.newBuilder(e.getDittoHeaders())
.origin(connectionCorrelationId)
.build());
final var tracedFailure = traceSignalBuildingFailure(failure);
if (isResponseRequired(e.getAdaptable())) {
result = Left.apply(tracedFailure);
} else {
result = Right.apply(Left.apply(NoOp.getInstance()));
}
} catch (final DittoRuntimeException e) {

// This is a client error usually; log at level DEBUG without stack trace.
logger.withCorrelationId(dre)
.debug("DittoRuntimeException building signal from <{}>: <{}>", cmdString, dre);
final StartedTrace trace = DittoTracing.trace(dre, "gw.streaming.in.error")
.start();
trace.fail(dre);
final DittoRuntimeException tracedDre = DittoTracing.propagateContext(trace.getContext(),
dre);
result = Left.apply(tracedDre);
trace.finish();
} catch (final Exception throwable) {
logger.warn("Error building signal from <{}>: {}: <{}>", cmdString,
throwable.getClass().getSimpleName(), throwable.getMessage());
final DittoRuntimeException dittoRuntimeException =
GatewayInternalErrorException.newBuilder()
.cause(throwable)
.build();
final StartedTrace trace = DittoTracing.trace(dittoRuntimeException, "gw.streaming.in.error")
.start();
trace.fail(throwable);
final DittoRuntimeException tracedDre = DittoTracing.propagateContext(trace.getContext(),
dittoRuntimeException);
result = Left.apply(tracedDre);
trace.finish();
logSignalBuildingFailure(logger.withCorrelationId(e)::debug, e, cmdString);
result = Left.apply(traceSignalBuildingFailure(e));
} catch (final Exception e) {
logSignalBuildingFailure(logger::warn, e, cmdString);
result = Left.apply(traceSignalBuildingFailure(GatewayInternalErrorException.newBuilder()
.message(e.getMessage())
.cause(e)
.build()));
}
}
return result;
});
}

private static void logSignalBuildingFailure(final BiConsumer<String, Object[]> logStatement,
final Exception failure,
final String signalJsonString) {

logStatement.accept("Failed to build a Signal from <{}>; {}: {}", new Object[]{
signalJsonString,
failure.getClass().getSimpleName(),
failure.getMessage()
});
}

private static DittoRuntimeException traceSignalBuildingFailure(final DittoRuntimeException failure) {
final var trace = startTrace(failure, "gw.streaming.in.error");
trace.fail(failure);
try {
return DittoTracing.propagateContext(trace.getContext(), failure);
} finally {
trace.finish();
}
}

private static StartedTrace startTrace(final WithDittoHeaders withDittoHeaders, final String name) {
final var trace = DittoTracing.trace(withDittoHeaders, name);
return trace.start();
}

private Pair<Connect, Flow<DittoRuntimeException, Message, NotUsed>> createOutgoing(
final JsonSchemaVersion version,
final CharSequence connectionCorrelationId,
Expand Down Expand Up @@ -595,47 +635,54 @@ private static <T> Flow<DittoRuntimeException, Message, NotUsed> joinOutgoingFlo
});
}

private static Signal<?> buildSignal(final String cmdString,
final JsonSchemaVersion version,
final CharSequence connectionCorrelationId,
private static DittoHeaders getInitialInternalHeaders(final JsonSchemaVersion jsonSchemaVersion,
final AuthorizationContext connectionAuthContext,
final DittoHeaders additionalHeaders,
final ProtocolAdapter adapter,
final HeaderTranslator headerTranslator,
final ThreadSafeDittoLogger logger) {
final CharSequence connectionCorrelationId) {

// initial internal header values
final DittoHeaders initialInternalHeaders = DittoHeaders.newBuilder()
.schemaVersion(version)
return DittoHeaders.newBuilder()
.schemaVersion(jsonSchemaVersion)
.authorizationContext(connectionAuthContext)
.correlationId(connectionCorrelationId) // for logging
.origin(connectionCorrelationId)
.build();
}

if (cmdString.isEmpty()) {
private static JsonifiableAdaptable getJsonifiableAdaptableOrThrow(final String messageJsonString,
final DittoHeaders initialInternalHeaders) {

if (messageJsonString.isEmpty()) {
final RuntimeException cause = new IllegalArgumentException("Empty json.");
throw new DittoJsonException(cause, initialInternalHeaders);
}

final JsonifiableAdaptable jsonifiableAdaptable = wrapJsonRuntimeException(cmdString,
return wrapJsonRuntimeException(messageJsonString,
DittoHeaders.empty(), // unused
(s, unused) -> ProtocolFactory.jsonifiableAdaptableFromJson(JsonFactory.newObject(s)));
}

private static Signal<?> buildSignal(final CharSequence connectionCorrelationId,
final DittoHeaders initialInternalHeaders,
final Adaptable adaptable,
final DittoHeaders additionalHeaders,
final ProtocolAdapter adapter,
final HeaderTranslator headerTranslator,
final ThreadSafeDittoLogger logger) {

final Signal<?> signal;
try {
signal = adapter.fromAdaptable(jsonifiableAdaptable);
signal = adapter.fromAdaptable(adaptable);
} catch (final DittoRuntimeException e) {
throw e.setDittoHeaders(e.getDittoHeaders().toBuilder().origin(connectionCorrelationId).build());
}

final DittoHeadersBuilder<?, ?> internalHeadersBuilder = DittoHeaders.newBuilder();

logger.debug("WebSocket message has been converted to signal <{}>.", signal);
final DittoHeaders signalHeaders = signal.getDittoHeaders();

// add initial internal header values
logger.trace("Adding initialInternalHeaders: <{}>.", initialInternalHeaders);
internalHeadersBuilder.putHeaders(initialInternalHeaders);

final DittoHeadersBuilder<?, ?> internalHeadersBuilder = DittoHeaders.newBuilder(initialInternalHeaders);

// add headers given by parent route first so that protocol message may override them
final Map<String, String> wellKnownAdditionalHeaders =
headerTranslator.retainKnownHeaders(additionalHeaders);
Expand All @@ -655,6 +702,11 @@ private static Signal<?> buildSignal(final String cmdString,
return signal.setDittoHeaders(internalHeadersBuilder.build());
}

private static boolean isResponseRequired(final WithDittoHeaders withDittoHeaders) {
final var dittoHeaders = withDittoHeaders.getDittoHeaders();
return dittoHeaders.isResponseRequired();
}

private Function<SessionedJsonifiable, CompletionStage<Collection<String>>> postprocess(
final ProtocolAdapter adapter, @Nullable final SignalEnrichmentFacade facade,
final ThreadSafeDittoLogger logger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ private T tryToMapAdaptableToSignal(final Adaptable adaptable) {
throw new IllegalAdaptableException(getDetailMessage(adaptable, e),
e.getDescription().orElse(null),
e,
e.getDittoHeaders());
adaptable);
} catch (final Exception e) {
throw new IllegalAdaptableException(getDetailMessage(adaptable, e), null, e, adaptable.getDittoHeaders());
throw new IllegalAdaptableException(getDetailMessage(adaptable, e), null, e, adaptable);
}
}

Expand Down

0 comments on commit 73a333b

Please sign in to comment.