Skip to content

Commit

Permalink
Provide possibility to send generic JSON streams as chunked HTTP enti…
Browse files Browse the repository at this point in the history
…ties (WIP).

* Unified DittoDevOpsCommandActor with DevOpsCommandActor as there won't be further sub-classes.
* Added custom serializer in order to send `JsonValue` throughout the Akka cluster.
* Added custom message that wraps a SourceRef of JsonValues.
* Added Akka Jackson serializer in order to serialize custom SourceRef-wrapping message.

Signed-off-by: Juergen Fickel <juergen.fickel@bosch.io>
  • Loading branch information
Juergen Fickel committed Feb 22, 2021
1 parent 696341c commit 749e16a
Show file tree
Hide file tree
Showing 28 changed files with 1,330 additions and 144 deletions.
5 changes: 5 additions & 0 deletions bom/pom.xml
Expand Up @@ -272,6 +272,11 @@
<artifactId>akka-lease-kubernetes_${scala.version}</artifactId>
<version>${akka-management.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-serialization-jackson_${scala.version}</artifactId>
<version>${akka.version}</version>
</dependency>

<dependency>
<groupId>org.mongodb</groupId>
Expand Down
5 changes: 4 additions & 1 deletion json/src/main/java/org/eclipse/ditto/json/CborFactory.java
Expand Up @@ -45,6 +45,7 @@ public interface CborFactory {
*
* @param bytes the bytes to parse with CBOR.
* @return the parsed JsonValue.
* @throws JsonParseException if the content of {@code bytes} cannot be deserialized to a {@link JsonValue}.
*/
JsonValue readFrom(byte[] bytes);

Expand All @@ -54,8 +55,9 @@ public interface CborFactory {
*
* @param bytes the bytes to parse with CBOR.
* @param offset the offset where to start reading from.
* @param length the lenght of how much bytes to read.
* @param length the length of how much bytes to read.
* @return the parsed JsonValue.
* @throws JsonParseException if the content of {@code bytes} cannot be deserialized to a {@link JsonValue}.
*/
JsonValue readFrom(byte[] bytes, int offset, int length);

Expand All @@ -64,6 +66,7 @@ public interface CborFactory {
*
* @param byteBuffer the ByteBuffer to parse with CBOR.
* @return the parsed JsonValue.
* @throws JsonParseException if the content of {@code byteBuffer} cannot be deserialized to a {@link JsonValue}.
*/
JsonValue readFrom(ByteBuffer byteBuffer);

Expand Down
Expand Up @@ -36,7 +36,6 @@
import org.eclipse.ditto.services.utils.config.ScopedConfig;
import org.eclipse.ditto.services.utils.config.raw.RawConfigSupplier;
import org.eclipse.ditto.services.utils.devops.DevOpsCommandsActor;
import org.eclipse.ditto.services.utils.devops.DittoDevOpsCommandsActor;
import org.eclipse.ditto.services.utils.devops.LogbackLoggingFacade;
import org.eclipse.ditto.services.utils.health.status.StatusSupplierActor;
import org.eclipse.ditto.services.utils.metrics.config.MetricsConfig;
Expand Down Expand Up @@ -364,7 +363,7 @@ protected void startStatusSupplierActor(final ActorSystem actorSystem) {
startActor(actorSystem, StatusSupplierActor.props(rootActorName), StatusSupplierActor.ACTOR_NAME);
}

protected ActorRef startActor(final ActorSystem actorSystem, final Props actorProps, final String actorName) {
private ActorRef startActor(final ActorSystem actorSystem, final Props actorProps, final String actorName) {
logStartingActor(actorName);
return actorSystem.actorOf(actorProps, actorName);
}
Expand All @@ -374,13 +373,13 @@ private void logStartingActor(final String actorName) {
}

/**
* Starts the {@link org.eclipse.ditto.services.utils.devops.DittoDevOpsCommandsActor}.
* Starts the {@link org.eclipse.ditto.services.utils.devops.DevOpsCommandsActor}.
* May be overridden to change the way how the actor is started.
*
* @param actorSystem Akka actor system for starting actors.
*/
protected void startDevOpsCommandsActor(final ActorSystem actorSystem) {
startActor(actorSystem, DittoDevOpsCommandsActor.props(LogbackLoggingFacade.newInstance(), serviceName,
startActor(actorSystem, DevOpsCommandsActor.props(LogbackLoggingFacade.newInstance(), serviceName,
InstanceIdentifierSupplier.getInstance().get()), DevOpsCommandsActor.ACTOR_NAME);
}

Expand Down
4 changes: 4 additions & 0 deletions services/connectivity/messaging/src/test/resources/test.conf
Expand Up @@ -232,6 +232,8 @@ akka {
serializers {
json = "org.eclipse.ditto.services.utils.cluster.JsonJsonifiableSerializer"
cbor = "org.eclipse.ditto.services.utils.cluster.CborJsonifiableSerializer"
cbor-json-value = "org.eclipse.ditto.services.utils.cluster.CborJsonValueSerializer"
jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
}

serialization-bindings {
Expand All @@ -240,6 +242,8 @@ akka {
"org.eclipse.ditto.model.base.json.Jsonifiable" = cbor
"org.eclipse.ditto.model.base.exceptions.DittoRuntimeException" = cbor
"org.eclipse.ditto.signals.commands.devops.DevOpsCommandResponse" = json # to ensure readability
"org.eclipse.ditto.json.JsonValue" = cbor-json-value
"org.eclipse.ditto.services.utils.cluster.AkkaJacksonCborSerializable" = jackson-cbor
}
}

Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.eclipse.ditto.services.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.cluster.JsonValueSourceRef;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.acks.base.Acknowledgements;
import org.eclipse.ditto.signals.base.Signal;
Expand Down Expand Up @@ -178,6 +179,7 @@ private static HttpResponse createHttpResponse(final HttpStatus httpStatus) {

private void handleCommand(final Command<?> command) {
try {
logger.setCorrelationId(command);
incomingCommandHeaders = command.getDittoHeaders();
ackregatorStarter.start(command,
this::onAggregatedResponseOrError,
Expand All @@ -200,7 +202,7 @@ private Void onAggregatedResponseOrError(final Object responseOrError) {
}

private Void handleCommandWithAckregator(final Signal<?> command, final ActorRef aggregator) {
logger.withCorrelationId(command).debug("Got <{}>. Telling the target actor about it.", command);
logger.debug("Got <{}>. Telling the target actor about it.", command);
proxyActor.tell(command, aggregator);
return null;
}
Expand All @@ -220,8 +222,7 @@ private void completeAcknowledgements(final Acknowledgements acks) {
}

private void handleCommandAndAcceptImmediately(final Signal<?> command) {
logger.withCorrelationId(command)
.debug("Received <{}> that doesn't expect a response. Answering with status code 202 ..", command);
logger.debug("Received <{}> that doesn't expect a response. Answering with status code 202 ...", command);
proxyActor.tell(command, getSelf());
completeWithResult(createHttpResponse(HttpStatus.ACCEPTED));
}
Expand Down Expand Up @@ -287,7 +288,7 @@ private static AuthorizationContext getAuthContextWithPrefixedSubjectsFromHeader
}

private void handleCommandWithResponse(final Signal<?> command, final Receive awaitCommandResponseBehavior) {
logger.withCorrelationId(command).debug("Got <{}>. Telling the target actor about it.", command);
logger.debug("Got <{}>. Telling the target actor about it.", command);
proxyActor.tell(command, getSelf());

final ActorContext context = getContext();
Expand Down Expand Up @@ -355,6 +356,7 @@ private Receive getResponseAwaitingBehavior(final Supplier<DittoRuntimeException
" 'WithOptionalEntity': <{}>!", commandResponse);
completeWithResult(createHttpResponse(HttpStatus.INTERNAL_SERVER_ERROR));
})
.match(JsonValueSourceRef.class, this::handleJsonValueSourceRef)
.match(Status.Failure.class, f -> f.cause() instanceof AskTimeoutException, failure -> {
final Throwable cause = failure.cause();
logger.error(cause, "Got <{}> when a command response was expected: <{}>!",
Expand Down Expand Up @@ -641,6 +643,14 @@ private static boolean shallAcceptImmediately(final WithDittoHeaders<?> withDitt
return !dittoHeaders.isResponseRequired() && dittoHeaders.getAcknowledgementRequests().isEmpty();
}

private void handleJsonValueSourceRef(final JsonValueSourceRef jsonValueSourceRef) {
logger.debug("Received <{}> from <{}>.", jsonValueSourceRef.getClass().getSimpleName(), getSender());
final var jsonValueSourceToHttpResponse = JsonValueSourceToHttpResponse.getInstance();
final var httpResponse = jsonValueSourceToHttpResponse.apply(jsonValueSourceRef.getSource());
enhanceResponseWithExternalDittoHeaders(httpResponse, incomingCommandHeaders);
completeWithResult(httpResponse);
}

private static final class HttpAcknowledgementConfig implements AcknowledgementConfig {

private final HttpConfig httpConfig;
Expand Down
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.gateway.endpoints.actors;

import java.util.function.Function;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.common.HttpStatus;

import akka.NotUsed;
import akka.http.javadsl.model.ContentType;
import akka.http.javadsl.model.ContentTypes;
import akka.http.javadsl.model.HttpEntities;
import akka.http.javadsl.model.HttpEntity;
import akka.http.javadsl.model.HttpResponse;
import akka.stream.Attributes;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

/**
* Transforms a {@link Source} of {@link JsonValue}s into a {@link HttpResponse}.
* The response's entity is a chunked stream of newline delimited JSON values (NDJSON).
*
* @see <a href="https://github.com/ndjson/ndjson-spec">NDJSON</a>
*/
@Immutable
final class JsonValueSourceToHttpResponse implements Function<Source<JsonValue, NotUsed>, HttpResponse> {

/**
* The content type of the HttpResponse this function returns.
*/
static final ContentType CONTENT_TYPE_NDJSON = ContentTypes.parse("application/x-ndjson");

private JsonValueSourceToHttpResponse() {
super();
}

/**
* Returns an instance of {@code JsonValueSourceToHttpResponse}.
*
* @return the instance.
*/
static JsonValueSourceToHttpResponse getInstance() {
return new JsonValueSourceToHttpResponse();
}

@Override
public HttpResponse apply(final Source<JsonValue, NotUsed> source) {
ConditionChecker.checkNotNull(source, "source");
return HttpResponse.create()
.withEntity(getChunkedHttpEntity(getRenderedCompactJsonArraySource(source)))
.withStatus(HttpStatus.OK.getCode());
}

private static Source<ByteString, NotUsed> getRenderedCompactJsonArraySource(final Source<JsonValue, NotUsed> source) {
return source.map(JsonValue::toString)
.via(intersperseWithNewlineDelimiter())
.map(ByteString::fromString)
.withAttributes(Attributes.logLevels(Attributes.logLevelDebug(), Attributes.logLevelDebug(),
Attributes.logLevelError()))
.log(JsonValueSourceToHttpResponse.class.getSimpleName());
}

private static Flow<String, String, NotUsed> intersperseWithNewlineDelimiter() {
return Flow.of(String.class).intersperse("\n");
}

private static HttpEntity.Chunked getChunkedHttpEntity(final Source<ByteString, NotUsed> source) {
return HttpEntities.createChunked(CONTENT_TYPE_NDJSON, source);
}

}
Expand Up @@ -33,7 +33,7 @@
import org.eclipse.ditto.services.gateway.endpoints.routes.QueryParametersToHeadersMap;
import org.eclipse.ditto.services.gateway.util.config.endpoints.CommandConfig;
import org.eclipse.ditto.services.gateway.util.config.endpoints.HttpConfig;
import org.eclipse.ditto.services.utils.devops.DittoDevOpsCommandsActor;
import org.eclipse.ditto.services.utils.devops.DevOpsCommandsActor;
import org.eclipse.ditto.signals.commands.common.RetrieveConfig;
import org.eclipse.ditto.signals.commands.devops.ChangeLogLevel;
import org.eclipse.ditto.signals.commands.devops.DevOpsCommand;
Expand Down Expand Up @@ -269,7 +269,7 @@ private Route routeConfig(final RequestContext ctx,
final DittoHeaders dittoHeaders) {

final DittoHeaders headersWithAggregate = dittoHeaders.toBuilder()
.putHeader(DittoDevOpsCommandsActor.AGGREGATE_HEADER,
.putHeader(DevOpsCommandsActor.AGGREGATE_HEADER,
String.valueOf(serviceName == null || instance == null))
.build();

Expand Down

0 comments on commit 749e16a

Please sign in to comment.