Skip to content

Commit

Permalink
fix preserving DittoHeaders when encountering a JsonParseException
Browse files Browse the repository at this point in the history
* e.g. containing correlation-id, trace-context, etc.
  • Loading branch information
thjaeckle committed Nov 3, 2023
1 parent e6f1636 commit 8be61d5
Show file tree
Hide file tree
Showing 18 changed files with 121 additions and 88 deletions.
Expand Up @@ -21,12 +21,6 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand All @@ -36,6 +30,12 @@
import org.eclipse.ditto.base.model.signals.commands.AbstractCommandResponse;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.WithEntity;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;

/**
* Response to a {@link DevOpsCommand} which wraps the exception thrown while processing the command.
Expand Down Expand Up @@ -97,7 +97,8 @@ public static DevOpsErrorResponse of(@Nullable final String serviceName,
*/
public static DevOpsErrorResponse fromJson(final String jsonString, final DittoHeaders dittoHeaders) {
final JsonObject jsonObject =
DittoJsonException.wrapJsonRuntimeException(() -> JsonFactory.newObject(jsonString));
DittoJsonException.wrapJsonRuntimeException(jsonString, dittoHeaders,
(object, headers) -> JsonFactory.newObject(object));
return fromJson(jsonObject, dittoHeaders);
}

Expand Down
Expand Up @@ -20,12 +20,6 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonMissingFieldException;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand All @@ -34,6 +28,12 @@
import org.eclipse.ditto.base.model.signals.GlobalErrorRegistry;
import org.eclipse.ditto.base.model.signals.commands.AbstractErrorResponse;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonMissingFieldException;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonValue;

/**
* Response to a {@link ConnectivityCommand} which wraps the exception thrown when processing the command.
Expand Down Expand Up @@ -94,7 +94,8 @@ public static ConnectivityErrorResponse of(final DittoRuntimeException dittoRunt
*/
public static ConnectivityErrorResponse fromJson(final String jsonString, final DittoHeaders dittoHeaders) {
final JsonObject jsonObject =
DittoJsonException.wrapJsonRuntimeException(() -> JsonFactory.newObject(jsonString));
DittoJsonException.wrapJsonRuntimeException(jsonString, dittoHeaders,
(object, headers) -> JsonFactory.newObject(object));
return fromJson(jsonObject, dittoHeaders);
}

Expand Down
Expand Up @@ -172,7 +172,8 @@ public static RetrieveConnectionsResponse of(final List<Connection> connections,
* format.
*/
public static RetrieveConnectionsResponse fromJson(final String jsonString, final DittoHeaders dittoHeaders) {
final JsonObject jsonObject = DittoJsonException.wrapJsonRuntimeException(() -> JsonObject.of(jsonString));
final JsonObject jsonObject = DittoJsonException.wrapJsonRuntimeException(jsonString, dittoHeaders,
(object, headers) -> JsonObject.of(object));
return fromJson(jsonObject, dittoHeaders);
}

Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.UUID;

import org.apache.pekko.actor.ActorSystem;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
Expand All @@ -37,8 +38,6 @@

import com.typesafe.config.Config;

import org.apache.pekko.actor.ActorSystem;

/**
* A message mapper implementation for the Mapping incoming CloudEvents to Ditto Protocol.
*/
Expand Down Expand Up @@ -101,8 +100,9 @@ public List<Adaptable> map(final ExternalMessage message) {
final String contentType = message.findContentType().orElse("");
if (contentType.equals(DITTO_PROTOCOL_CONTENT_TYPE)) {
if (isBinaryCloudEvent(message)) {
final JsonifiableAdaptable binaryAdaptable = DittoJsonException.wrapJsonRuntimeException(
() -> ProtocolFactory.jsonifiableAdaptableFromJson(newObject(payload)));
final JsonifiableAdaptable binaryAdaptable = DittoJsonException.wrapJsonRuntimeException(payload,
message.getInternalHeaders(), (thePayload, headers) ->
ProtocolFactory.jsonifiableAdaptableFromJson(newObject(thePayload)));
final DittoHeaders headers = binaryAdaptable.getDittoHeaders()
.toBuilder()
.correlationId(message.getHeaders().get(CE_ID))
Expand Down
Expand Up @@ -16,6 +16,7 @@

import java.util.List;

import org.apache.pekko.actor.ActorSystem;
import org.eclipse.ditto.base.model.common.DittoConstants;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
Expand All @@ -31,8 +32,6 @@

import com.typesafe.config.Config;

import org.apache.pekko.actor.ActorSystem;

/**
* A message mapper implementation for the Ditto Protocol.
* Expect messages to contain a JSON serialized Ditto Protocol message.
Expand Down Expand Up @@ -93,8 +92,9 @@ public MessageMapper createNewMapperInstance() {
@Override
public List<Adaptable> map(final ExternalMessage message) {
final String payload = extractPayloadAsString(message);
final JsonifiableAdaptable jsonifiableAdaptable = DittoJsonException.wrapJsonRuntimeException(() ->
ProtocolFactory.jsonifiableAdaptableFromJson(JsonFactory.newObject(payload))
final JsonifiableAdaptable jsonifiableAdaptable = DittoJsonException.wrapJsonRuntimeException(payload,
message.getInternalHeaders(), (thePayload, headers) ->
ProtocolFactory.jsonifiableAdaptableFromJson(JsonFactory.newObject(thePayload))
);

final DittoHeaders mergedHeaders = jsonifiableAdaptable.getDittoHeaders();
Expand Down
Expand Up @@ -22,6 +22,7 @@

import javax.annotation.Nullable;

import org.apache.pekko.actor.ActorSystem;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand Down Expand Up @@ -57,8 +58,6 @@

import com.typesafe.config.Config;

import org.apache.pekko.actor.ActorSystem;

/**
* This mapper creates a {@link MergeThing} command when a {@link ThingQueryCommandResponse} was received via the
* {@link TopicPath.Channel#LIVE live channel} patching exactly the retrieved "live" data into the twin.
Expand Down Expand Up @@ -142,8 +141,9 @@ public List<Adaptable> map(final ExternalMessage message) {
final JsonifiableAdaptable adaptable;
try {
final String payload = extractPayloadAsString(message);
adaptable = DittoJsonException.wrapJsonRuntimeException(() ->
ProtocolFactory.jsonifiableAdaptableFromJson(JsonFactory.newObject(payload))
adaptable = DittoJsonException.wrapJsonRuntimeException(payload,
message.getInternalHeaders(), (thePayload, headers) ->
ProtocolFactory.jsonifiableAdaptableFromJson(JsonFactory.newObject(thePayload))
);
} catch (final DittoRuntimeException e) {
LOGGER.withCorrelationId(message.getInternalHeaders())
Expand Down
Expand Up @@ -17,12 +17,12 @@
import java.util.List;
import java.util.Optional;

import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.connectivity.api.ExternalMessage;

/**
* The default mapping for incoming messages that maps messages from Ditto protocol format.
Expand All @@ -40,10 +40,12 @@ static DefaultIncomingMapping get() {

@Override
public List<Adaptable> apply(final ExternalMessage message) {
return DittoJsonException.wrapJsonRuntimeException(() -> getPlainStringPayload(message)
.map(JsonFactory::readFrom)
.map(JsonValue::asObject)
.map(ProtocolFactory::jsonifiableAdaptableFromJson))
return DittoJsonException.wrapJsonRuntimeException(message, message.getInternalHeaders(), (msg, headers) ->
getPlainStringPayload(msg)
.map(JsonFactory::readFrom)
.map(JsonValue::asObject)
.map(ProtocolFactory::jsonifiableAdaptableFromJson)
)
.map(Adaptable.class::cast)
.map(Collections::singletonList)
.orElse(Collections.emptyList());
Expand Down
Expand Up @@ -19,6 +19,11 @@

import javax.annotation.Nullable;

import org.apache.pekko.http.javadsl.model.ContentTypes;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.http.javadsl.server.PathMatchers;
import org.apache.pekko.http.javadsl.server.RequestContext;
import org.apache.pekko.http.javadsl.server.Route;
import org.eclipse.ditto.base.api.common.RetrieveConfig;
import org.eclipse.ditto.base.api.devops.ImmutableLoggerConfig;
import org.eclipse.ditto.base.api.devops.signals.commands.ChangeLogLevel;
Expand All @@ -41,12 +46,6 @@
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;

import org.apache.pekko.http.javadsl.model.ContentTypes;
import org.apache.pekko.http.javadsl.model.HttpResponse;
import org.apache.pekko.http.javadsl.server.PathMatchers;
import org.apache.pekko.http.javadsl.server.RequestContext;
import org.apache.pekko.http.javadsl.server.Route;

/**
* Builder for creating Pekko HTTP routes for {@code /devops}.
*/
Expand Down Expand Up @@ -222,8 +221,9 @@ private Route routePiggyback(final RequestContext ctx,
extractDataBytes(payloadSource ->
handlePerRequest(ctx, dittoHeaders, payloadSource,
piggybackCommandJson -> {
JsonObject parsedJson = DittoJsonException.wrapJsonRuntimeException(() ->
JsonFactory.readFrom(piggybackCommandJson).asObject());
JsonObject parsedJson = DittoJsonException.wrapJsonRuntimeException(
piggybackCommandJson, dittoHeaders, (json, headers) ->
JsonFactory.readFrom(json).asObject());
parsedJson = parsedJson.set(Command.JsonFields.TYPE, ExecutePiggybackCommand.TYPE);

// serviceName and instance from URL are preferred
Expand Down
Expand Up @@ -12,6 +12,9 @@
*/
package org.eclipse.ditto.gateway.service.endpoints.routes.things;

import org.apache.pekko.http.javadsl.server.PathMatchers;
import org.apache.pekko.http.javadsl.server.RequestContext;
import org.apache.pekko.http.javadsl.server.Route;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.service.UriEncoding;
Expand Down Expand Up @@ -44,10 +47,6 @@
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveFeatureProperty;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveFeatures;

import org.apache.pekko.http.javadsl.server.PathMatchers;
import org.apache.pekko.http.javadsl.server.RequestContext;
import org.apache.pekko.http.javadsl.server.Route;

/**
* Builder for creating Pekko HTTP routes for {@code /features}.
*/
Expand Down Expand Up @@ -295,10 +294,14 @@ private Route featuresEntryPropertiesEntry(final RequestContext ctx, final Ditto
put(() -> ensureMediaTypeJsonWithFallbacksThenExtractDataBytes(ctx,
dittoHeaders,
payloadSource -> handlePerRequest(ctx, dittoHeaders, payloadSource,
propertyJson -> ModifyFeatureProperty.of(thingId, featureId,
propertyJson -> ModifyFeatureProperty.of(thingId,
featureId,
JsonFactory.newPointer(jsonPointerString),
DittoJsonException.wrapJsonRuntimeException(
() -> JsonFactory.readFrom(propertyJson)),
propertyJson,
dittoHeaders,
(json, headers) -> JsonFactory.readFrom(json)
),
dittoHeaders))
)
),
Expand All @@ -307,9 +310,13 @@ private Route featuresEntryPropertiesEntry(final RequestContext ctx, final Ditto
dittoHeaders,
payloadSource -> handlePerRequest(ctx, dittoHeaders, payloadSource,
propertyJson -> MergeThing.withFeatureProperty(thingId,
featureId, JsonFactory.newPointer(jsonPointerString),
featureId,
JsonFactory.newPointer(jsonPointerString),
DittoJsonException.wrapJsonRuntimeException(
() -> JsonFactory.readFrom(propertyJson)),
propertyJson,
dittoHeaders,
(json, headers) -> JsonFactory.readFrom(json)
),
dittoHeaders))
)
),
Expand Down Expand Up @@ -402,7 +409,10 @@ private Route featuresEntryDesiredPropertiesEntry(final RequestContext ctx, fina
featureId,
JsonFactory.newPointer(jsonPointerString),
DittoJsonException.wrapJsonRuntimeException(
() -> JsonFactory.readFrom(propertyJson)),
propertyJson,
dittoHeaders,
(json, headers) -> JsonFactory.readFrom(json)
),
dittoHeaders))
)
),
Expand All @@ -411,9 +421,13 @@ private Route featuresEntryDesiredPropertiesEntry(final RequestContext ctx, fina
dittoHeaders,
payloadSource -> handlePerRequest(ctx, dittoHeaders, payloadSource,
propertyJson -> MergeThing.withFeatureDesiredProperty(thingId,
featureId, JsonFactory.newPointer(jsonPointerString),
featureId,
JsonFactory.newPointer(jsonPointerString),
DittoJsonException.wrapJsonRuntimeException(
() -> JsonFactory.readFrom(propertyJson)),
propertyJson,
dittoHeaders,
(json, headers) -> JsonFactory.readFrom(json)
),
dittoHeaders))
)
),
Expand Down

0 comments on commit 8be61d5

Please sign in to comment.