Skip to content

Commit

Permalink
WebsocketRoute: instead of cancelling stream on first error, report them
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Yufei (INST/ECS1) <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Dec 14, 2018
1 parent dfc0555 commit 7c816ca
Showing 1 changed file with 186 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandNotSupportedException;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import org.eclipse.ditto.signals.events.base.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.EventStream;
import akka.event.Logging;
import akka.http.javadsl.model.HttpRequest;
Expand All @@ -74,7 +76,15 @@
import akka.http.javadsl.server.Route;
import akka.japi.function.Function;
import akka.stream.Attributes;
import akka.stream.FlowShape;
import akka.stream.Graph;
import akka.stream.UniformFanInShape;
import akka.stream.UniformFanOutShape;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Merge;
import akka.stream.javadsl.Partition;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

Expand Down Expand Up @@ -187,18 +197,22 @@ private HttpResponse createWebsocket(final UpgradeToWebSocket upgradeToWebSocket
LogUtil.logWithCorrelationId(LOGGER, connectionCorrelationId, logger ->
logger.info("Creating WebSocket for connection authContext: <{}>", authContext));

// build Sink and Source in order to support rpc style patterns as well as server push:
return upgradeToWebSocket.handleMessagesWith(
createSink(version, connectionCorrelationId, authContext, additionalHeaders, adapter, request),
createSource(connectionCorrelationId, adapter, request));
final Flow<Message, DittoRuntimeException, NotUsed> incoming =
createIncoming(version, connectionCorrelationId, authContext, additionalHeaders, adapter, request);
final Flow<DittoRuntimeException, Message, NotUsed> outgoing =
createOutgoing(connectionCorrelationId, adapter, request);

return upgradeToWebSocket.handleMessagesWith(incoming.via(outgoing));
}

private Sink<Message, NotUsed> createSink(final Integer version,
private Flow<Message, DittoRuntimeException, NotUsed> createIncoming(final Integer version,
final String connectionCorrelationId,
final AuthorizationContext connectionAuthContext, final DittoHeaders additionalHeaders,
final ProtocolAdapter adapter,
final HttpRequest request) {
return incomingMessageSniffer.toAsyncFlow(request)

final Flow<Message, String, NotUsed> extractStringFromMessage = Flow.<Message>create()
.via(incomingMessageSniffer.toAsyncFlow(request))
.filter(Message::isText)
.map(Message::asTextMessage)
.map(textMsg -> {
Expand All @@ -216,12 +230,18 @@ private Sink<Message, NotUsed> createSink(final Integer version,
}))
.withAttributes(Attributes.createLogLevels(Logging.DebugLevel(), Logging.DebugLevel(),
Logging.WarningLevel()))
.filter(strictText -> processProtocolMessage(connectionAuthContext, connectionCorrelationId,
strictText))
.map(buildSignal(version, connectionCorrelationId, connectionAuthContext, additionalHeaders, adapter))
.to(Sink.actorSubscriber(
CommandSubscriber.props(streamingActor, subscriberBackpressureQueueSize, eventStream)));
.filter(strictText ->
processProtocolMessage(connectionAuthContext, connectionCorrelationId, strictText));

final Props commandSubscriberProps =
CommandSubscriber.props(streamingActor, subscriberBackpressureQueueSize, eventStream);
final Sink<Signal, ActorRef> commandSubscriber = Sink.actorSubscriber(commandSubscriberProps);

final Flow<String, DittoRuntimeException, NotUsed> signalErrorFlow =
buildSignalErrorFlow(commandSubscriber, version, connectionCorrelationId, connectionAuthContext,
additionalHeaders, adapter);

return extractStringFromMessage.via(signalErrorFlow);
}

private boolean processProtocolMessage(final AuthorizationContext authContext, final String connectionCorrelationId,
Expand Down Expand Up @@ -318,24 +338,53 @@ private static String urlDecode(final String value) {
}
}

private Source<Message, NotUsed> createSource(final String connectionCorrelationId,
private Flow<DittoRuntimeException, Message, NotUsed> createOutgoing(final String connectionCorrelationId,
final ProtocolAdapter adapter,
final HttpRequest request) {
return Source.<Jsonifiable.WithPredicate<JsonObject, JsonField>>actorPublisher(
EventAndResponsePublisher.props(publisherBackpressureBufferSize))
.mapMaterializedValue(actorRef -> {
streamingActor.tell(new Connect(actorRef, connectionCorrelationId, STREAMING_TYPE_WS), null);
return NotUsed.getInstance();
})
.map(this::publishResponsePublishedEvent)
.map(jsonifiableToString(adapter))
.via(Flow.fromFunction(result -> {
LogUtil.logWithCorrelationId(LOGGER, connectionCorrelationId, logger ->
logger.debug("Sending outgoing WebSocket message: {}", result));
return result;
}))
.<Message>map(TextMessage::create)
.via(outgoingMessageSniffer.toAsyncFlow(request));

final Source<Jsonifiable.WithPredicate<JsonObject, JsonField>, NotUsed> eventAndResponseSource =
Source.<Jsonifiable.WithPredicate<JsonObject, JsonField>>actorPublisher(
EventAndResponsePublisher.props(publisherBackpressureBufferSize))
.mapMaterializedValue(actorRef -> {
streamingActor.tell(new Connect(actorRef, connectionCorrelationId, STREAMING_TYPE_WS),
null);
return NotUsed.getInstance();
})
.map(this::publishResponsePublishedEvent);

final Flow<DittoRuntimeException, Jsonifiable.WithPredicate<JsonObject, JsonField>, NotUsed> errorFlow =
Flow.fromFunction(x -> x);

final Flow<Jsonifiable.WithPredicate<JsonObject, JsonField>, Message, NotUsed> messageFlow =
Flow.fromFunction(jsonifiableToString(adapter))
.via(Flow.fromFunction(result -> {
LogUtil.logWithCorrelationId(LOGGER, connectionCorrelationId, logger ->
logger.debug("Sending outgoing WebSocket message: {}", result));
return result;
}))
.<Message>map(TextMessage::create)
.via(outgoingMessageSniffer.toAsyncFlow(request));

return joinOutgoingFlows(eventAndResponseSource, errorFlow, messageFlow);
}

@SuppressWarnings("unchecked")
private static <T> Flow<DittoRuntimeException, Message, NotUsed> joinOutgoingFlows(
final Source<T, NotUsed> eventAndResponseSource,
final Flow<DittoRuntimeException, T, NotUsed> errorFlow,
final Flow<T, Message, NotUsed> messageFlow) {

return Flow.fromGraph(GraphDSL.create3(eventAndResponseSource, errorFlow, messageFlow,
(notUsed1, notUsed2, notUsed3) -> notUsed1,
(builder, eventsAndResponses, errors, messages) -> {
final UniformFanInShape<T, T> merge = builder.add(Merge.<T>create(2));

builder.from(eventsAndResponses).toFanIn(merge);
builder.from(errors).toFanIn(merge);
builder.from(merge.out()).toInlet(messages.in());

return FlowShape.of(errors.in(), messages.out());
}));
}

private Jsonifiable.WithPredicate<JsonObject, JsonField> publishResponsePublishedEvent(
Expand All @@ -349,63 +398,122 @@ private Jsonifiable.WithPredicate<JsonObject, JsonField> publishResponsePublishe
return jsonifiable;
}

private Function<String, Signal> buildSignal(final Integer version, final String connectionCorrelationId,
final AuthorizationContext connectionAuthContext, final DittoHeaders additionalHeaders,
private Flow<String, DittoRuntimeException, NotUsed> buildSignalErrorFlow(
final Sink<Signal, ActorRef> commandSubscriber,
final Integer version,
final String connectionCorrelationId,
final AuthorizationContext connectionAuthContext,
final DittoHeaders additionalHeaders,
final ProtocolAdapter adapter) {
return cmdString -> {
final JsonSchemaVersion jsonSchemaVersion = JsonSchemaVersion.forInt(version)
.orElseThrow(() -> CommandNotSupportedException.newBuilder(version).build());

// initial internal header values
final DittoHeaders initialInternalHeaders = DittoHeaders.newBuilder()
.schemaVersion(jsonSchemaVersion)
.authorizationContext(connectionAuthContext)
.correlationId(connectionCorrelationId) // for logging
.origin(connectionCorrelationId)
.build();

if (cmdString.isEmpty()) {
final RuntimeException cause = new IllegalArgumentException("Empty json.");
throw new DittoJsonException(cause, initialInternalHeaders);
}
final Flow<String, Object, NotUsed> resultOrErrorFlow =
Flow.fromFunction(cmdString -> {
try {
return buildSignal(cmdString, version, connectionCorrelationId, connectionAuthContext,
additionalHeaders, adapter);
} catch (final Throwable throwable) {
// This is a client error usually; log at level INFO without stack trace.
LOGGER.info("Error building signal from <{}>: <{}:{}>", cmdString,
throwable.getClass().getCanonicalName(), throwable.getMessage());
return throwable;
}
});

final JsonifiableAdaptable jsonifiableAdaptable = wrapJsonRuntimeException(cmdString,
DittoHeaders.empty(), // unused
(s, unused) -> ProtocolFactory.jsonifiableAdaptableFromJson(JsonFactory.newObject(s)));
final Graph<UniformFanOutShape<Object, Object>, NotUsed> signalFilterGraph =
Partition.create(2, message -> message instanceof Signal ? 0 : 1);

final Signal<? extends Signal> signal;
try {
signal = adapter.fromAdaptable(jsonifiableAdaptable);
} catch (final DittoRuntimeException e) {
throw e.setDittoHeaders(e.getDittoHeaders().toBuilder().origin(connectionCorrelationId).build());
final Flow<Object, DittoRuntimeException, NotUsed> castErrorFlow =
Flow.fromFunction(x -> x instanceof DittoRuntimeException
? (DittoRuntimeException) x
: x instanceof Throwable
? GatewayInternalErrorException.newBuilder().cause((Throwable) x).build()
: GatewayInternalErrorException.newBuilder().build());

final Sink<Object, NotUsed> signalSinkGraph =
Flow.fromFunction(Signal.class::cast).toMat(commandSubscriber, Keep.none());

return connectSignalErrorFlow(resultOrErrorFlow, signalFilterGraph, signalSinkGraph, castErrorFlow);
}

@SuppressWarnings("unchecked")
private static Flow<String, DittoRuntimeException, NotUsed> connectSignalErrorFlow(
final Flow<String, Object, NotUsed> resultOrErrorFlow,
final Graph<UniformFanOutShape<Object, Object>, NotUsed> signalFilterGraph,
final Sink<Object, NotUsed> signalSinkGraph,
final Flow<Object, DittoRuntimeException, NotUsed> castErrorFlow) {

return Flow.fromGraph(GraphDSL.create(signalSinkGraph, (builder, signalSink) -> {
final FlowShape<String, Object> resultOrError = builder.add(resultOrErrorFlow);
final UniformFanOutShape<Object, Object> signalFilter = builder.add(signalFilterGraph);
final FlowShape<Object, DittoRuntimeException> castError = builder.add(castErrorFlow);

builder.from(resultOrError.out()).toInlet(signalFilter.in());
builder.from(signalFilter.out(0)).to(signalSink);
builder.from(signalFilter.out(1)).toInlet(castError.in());

return FlowShape.of(resultOrError.in(), castError.out());
}));
}

private Signal buildSignal(final String cmdString,
final Integer version,
final String connectionCorrelationId,
final AuthorizationContext connectionAuthContext,
final DittoHeaders additionalHeaders,
final ProtocolAdapter adapter) {

final JsonSchemaVersion jsonSchemaVersion = JsonSchemaVersion.forInt(version)
.orElseThrow(() -> CommandNotSupportedException.newBuilder(version).build());

// initial internal header values
final DittoHeaders initialInternalHeaders = DittoHeaders.newBuilder()
.schemaVersion(jsonSchemaVersion)
.authorizationContext(connectionAuthContext)
.correlationId(connectionCorrelationId) // for logging
.origin(connectionCorrelationId)
.build();

if (cmdString.isEmpty()) {
final RuntimeException cause = new IllegalArgumentException("Empty json.");
throw new DittoJsonException(cause, initialInternalHeaders);
}

final JsonifiableAdaptable jsonifiableAdaptable = wrapJsonRuntimeException(cmdString,
DittoHeaders.empty(), // unused
(s, unused) -> ProtocolFactory.jsonifiableAdaptableFromJson(JsonFactory.newObject(s)));

final Signal<? extends Signal> signal;
try {
signal = adapter.fromAdaptable(jsonifiableAdaptable);
} catch (final DittoRuntimeException e) {
throw e.setDittoHeaders(e.getDittoHeaders().toBuilder().origin(connectionCorrelationId).build());
}

final DittoHeadersBuilder internalHeadersBuilder = DittoHeaders.newBuilder();

LogUtil.logWithCorrelationId(LOGGER, connectionCorrelationId, logger -> {
logger.debug("WebSocket message has been converted to signal <{}>.", signal);
final DittoHeaders signalHeaders = signal.getDittoHeaders();

// add initial internal header values
logger.trace("Adding initialInternalHeaders: <{}>.", initialInternalHeaders);
internalHeadersBuilder.putHeaders(initialInternalHeaders);
// add headers given by parent route first so that protocol message may override them
logger.trace("Adding additionalHeaders: <{}>.", additionalHeaders);
internalHeadersBuilder.putHeaders(additionalHeaders);
// add any headers from protocol adapter to internal headers
logger.trace("Adding signalHeaders: <{}>.", signalHeaders);
internalHeadersBuilder.putHeaders(signalHeaders);
// generate correlation ID if it is not set in protocol message
if (!signalHeaders.getCorrelationId().isPresent()) {
final String correlationId = UUID.randomUUID().toString();
logger.trace("Adding generated correlationId: <{}>.", correlationId);
internalHeadersBuilder.correlationId(correlationId);
}
logger.debug("Generated internalHeaders are: <{}>.", internalHeadersBuilder);
});

final DittoHeadersBuilder internalHeadersBuilder = DittoHeaders.newBuilder();

LogUtil.logWithCorrelationId(LOGGER, connectionCorrelationId, logger -> {
logger.debug("WebSocket message has been converted to signal <{}>.", signal);
final DittoHeaders signalHeaders = signal.getDittoHeaders();

// add initial internal header values
logger.trace("Adding initialInternalHeaders: <{}>.", initialInternalHeaders);
internalHeadersBuilder.putHeaders(initialInternalHeaders);
// add headers given by parent route first so that protocol message may override them
logger.trace("Adding additionalHeaders: <{}>.", additionalHeaders);
internalHeadersBuilder.putHeaders(additionalHeaders);
// add any headers from protocol adapter to internal headers
logger.trace("Adding signalHeaders: <{}>.", signalHeaders);
internalHeadersBuilder.putHeaders(signalHeaders);
// generate correlation ID if it is not set in protocol message
if (!signalHeaders.getCorrelationId().isPresent()) {
final String correlationId = UUID.randomUUID().toString();
logger.trace("Adding generated correlationId: <{}>.", correlationId);
internalHeadersBuilder.correlationId(correlationId);
}
logger.debug("Generated internalHeaders are: <{}>.", internalHeadersBuilder);
});

return signal.setDittoHeaders(internalHeadersBuilder.build());
};
return signal.setDittoHeaders(internalHeadersBuilder.build());
}

private static Function<Jsonifiable.WithPredicate<JsonObject, JsonField>, String> jsonifiableToString(
Expand Down

0 comments on commit 7c816ca

Please sign in to comment.