diff --git a/internal/utils/cluster/src/main/java/org/eclipse/ditto/internal/utils/cluster/AbstractJsonifiableWithDittoHeadersSerializer.java b/internal/utils/cluster/src/main/java/org/eclipse/ditto/internal/utils/cluster/AbstractJsonifiableWithDittoHeadersSerializer.java index 7d10d3075d..110a59fcbf 100755 --- a/internal/utils/cluster/src/main/java/org/eclipse/ditto/internal/utils/cluster/AbstractJsonifiableWithDittoHeadersSerializer.java +++ b/internal/utils/cluster/src/main/java/org/eclipse/ditto/internal/utils/cluster/AbstractJsonifiableWithDittoHeadersSerializer.java @@ -38,13 +38,14 @@ import org.eclipse.ditto.base.model.signals.commands.Command; import org.eclipse.ditto.internal.utils.metrics.DittoMetrics; import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter; +import org.eclipse.ditto.internal.utils.metrics.instruments.tag.Tag; import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartInstant; import org.eclipse.ditto.internal.utils.tracing.DittoTracing; import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName; +import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan; import org.eclipse.ditto.json.JsonFactory; import org.eclipse.ditto.json.JsonFieldDefinition; import org.eclipse.ditto.json.JsonObject; -import org.eclipse.ditto.json.JsonObjectBuilder; import org.eclipse.ditto.json.JsonParseException; import org.eclipse.ditto.json.JsonRuntimeException; import org.eclipse.ditto.json.JsonValue; @@ -56,7 +57,6 @@ import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; -import akka.actor.ActorSystem; import akka.actor.ExtendedActorSystem; import akka.io.BufferPool; import akka.io.DirectByteBufferPool; @@ -109,19 +109,22 @@ public abstract class AbstractJsonifiableWithDittoHeadersSerializer extends Seri * @param serializerName a name to be used for this serializer when reporting metrics, in the log and in error * messages. */ - protected AbstractJsonifiableWithDittoHeadersSerializer(final int identifier, final ExtendedActorSystem actorSystem, - final Function manifestProvider, final String serializerName) { - + protected AbstractJsonifiableWithDittoHeadersSerializer( + final int identifier, + final ExtendedActorSystem actorSystem, + final Function manifestProvider, + final String serializerName + ) { this.identifier = identifier; this.serializerName = serializerName; mappingStrategies = MappingStrategies.loadMappingStrategies(actorSystem); this.manifestProvider = checkNotNull(manifestProvider, "manifestProvider"); - final ActorSystem.Settings settings = actorSystem.settings(); - final Config config = settings.config(); + final var settings = actorSystem.settings(); + final var config = settings.config(); defaultBufferSize = config.withFallback(FALLBACK_CONF).getBytes(CONFIG_DIRECT_BUFFER_SIZE); - final int maxPoolEntries = config.withFallback(FALLBACK_CONF).getInt(CONFIG_DIRECT_BUFFER_POOL_LIMIT); + final var maxPoolEntries = config.withFallback(FALLBACK_CONF).getInt(CONFIG_DIRECT_BUFFER_POOL_LIMIT); byteBufferPool = new DirectByteBufferPool(defaultBufferSize.intValue(), maxPoolEntries); inCounter = DittoMetrics.counter(serializerName.toLowerCase() + METRIC_NAME_SUFFIX) @@ -142,45 +145,31 @@ public String manifest(final Object o) { @Override public void toBinary(final Object object, final ByteBuffer buf) { - if (object instanceof Jsonifiable) { - final JsonObjectBuilder jsonObjectBuilder = JsonObject.newBuilder(); - final DittoHeaders dittoHeaders = getDittoHeadersOrEmpty(object); - final var beforeSerializeInstant = StartInstant.now(); - - final var startedSpan = DittoTracing.newPreparedSpan(dittoHeaders, SpanOperationName.of("serialize")) - .startAt(beforeSerializeInstant); - dittoHeaders.getCorrelationId().ifPresent(startedSpan::correlationId); - final var dittoHeadersWithTraceContext = DittoHeaders.of(startedSpan.propagateContext(dittoHeaders)); - - jsonObjectBuilder.set(JSON_DITTO_HEADERS, dittoHeadersWithTraceContext.toJson()); - - final JsonValue jsonValue; - - if (object instanceof Jsonifiable.WithPredicate withPredicate) { - final JsonSchemaVersion schemaVersion = - dittoHeaders.getSchemaVersion().orElse(JsonSchemaVersion.LATEST); - - jsonValue = withPredicate.toJson(schemaVersion, FieldType.regularOrSpecial()); - } else { - jsonValue = ((Jsonifiable) object).toJson(); - } - - jsonObjectBuilder.set(JSON_PAYLOAD, jsonValue); - final JsonObject jsonObject = jsonObjectBuilder.build(); + if (object instanceof Jsonifiable jsonifiable) { + final var dittoHeaders = getDittoHeadersOrEmpty(object); + final var startedSpan = startTracingSpanForSerialization(dittoHeaders, object.getClass()); + final var jsonObject = JsonObject.newBuilder() + .set(JSON_DITTO_HEADERS, getDittoHeadersWithSpanContextAsJson(dittoHeaders, startedSpan)) + .set(JSON_PAYLOAD, getAsJsonPayload(jsonifiable, dittoHeaders)) + .build(); try { serializeIntoByteBuffer(jsonObject, buf); LOG.trace("toBinary jsonStr about to send 'out': {}", jsonObject); outCounter.increment(); } catch (final BufferOverflowException e) { - final String errorMessage = MessageFormat.format( - "Could not put bytes of JSON string <{0}> into ByteBuffer due to BufferOverflow", jsonObject); + final var errorMessage = MessageFormat.format( + "Could not put bytes of JSON string <{0}> into ByteBuffer due to BufferOverflow", + jsonObject + ); LOG.error(errorMessage, e); startedSpan.fail(e); throw new IllegalArgumentException(errorMessage, e); } catch (final IOException e) { - final String errorMessage = MessageFormat.format( + final var errorMessage = MessageFormat.format( "Serialization failed with {} on Jsonifiable with string representation <{}>", - e.getClass().getName(), jsonObject); + e.getClass().getName(), + jsonObject + ); LOG.warn(errorMessage, e); startedSpan.fail(e); throw new RuntimeException(errorMessage, e); @@ -188,13 +177,52 @@ public void toBinary(final Object object, final ByteBuffer buf) { startedSpan.finish(); } } else { - LOG.error("Could not serialize class <{}> as it does not implement <{}>!", object.getClass(), + LOG.error("Could not serialize class <{}> as it does not implement <{}>!", + object.getClass(), Jsonifiable.WithPredicate.class); - final String error = new NotSerializableException(object.getClass().getName()).getMessage(); + final var error = new NotSerializableException(object.getClass().getName()).getMessage(); buf.put(CHARSET.encode(error)); } } + private static StartedSpan startTracingSpanForSerialization( + final DittoHeaders dittoHeaders, + final Class typeToSerialize + ) { + final var startInstant = StartInstant.now(); + return DittoTracing.newPreparedSpan( + dittoHeaders, + SpanOperationName.of("serialize " + typeToSerialize.getSimpleName()) + ) + .correlationId(dittoHeaders.getCorrelationId().orElse(null)) + .startAt(startInstant); + } + + private static JsonObject getDittoHeadersWithSpanContextAsJson( + final DittoHeaders dittoHeaders, + final StartedSpan startedSpan + ) { + final var dittoHeadersWithSpanContext = DittoHeaders.of(startedSpan.propagateContext(dittoHeaders)); + return dittoHeadersWithSpanContext.toJson(); + } + + @SuppressWarnings("java:S3740") + private static JsonValue getAsJsonPayload( + final Jsonifiable jsonifiable, + final DittoHeaders dittoHeaders + ) { + final JsonValue result; + if (jsonifiable instanceof Jsonifiable.WithPredicate withPredicate) { + result = withPredicate.toJson( + dittoHeaders.getSchemaVersion().orElse(JsonSchemaVersion.LATEST), + FieldType.regularOrSpecial() + ); + } else { + result = jsonifiable.toJson(); + } + return result; + } + /** * Serializes the passed {@code jsonObject} into the passed {@code byteBuffer}. * @@ -206,28 +234,30 @@ public void toBinary(final Object object, final ByteBuffer buf) { @Override public byte[] toBinary(final Object object) { - final ByteBuffer buf = byteBufferPool.acquire(); + final var byteBuffer = byteBufferPool.acquire(); try { - toBinary(object, buf); - buf.flip(); - final byte[] bytes = new byte[buf.remaining()]; - buf.get(bytes); + toBinary(object, byteBuffer); + byteBuffer.flip(); + final var bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); return bytes; } catch (final BufferOverflowException e) { - final String errorMessage = - MessageFormat.format("BufferOverflow when serializing object <{0}>, max buffer size was: <{1}>", - object, defaultBufferSize); + final var errorMessage = MessageFormat.format( + "BufferOverflow when serializing object <{0}>, max buffer size was: <{1}>", + object, + defaultBufferSize + ); LOG.error(errorMessage, e); throw new IllegalArgumentException(errorMessage, e); } finally { - byteBufferPool.release(buf); + byteBufferPool.release(byteBuffer); } } private static DittoHeaders getDittoHeadersOrEmpty(final Object object) { if (object instanceof WithDittoHeaders withDittoHeaders) { - @Nullable final DittoHeaders dittoHeaders = withDittoHeaders.getDittoHeaders(); + @Nullable final var dittoHeaders = withDittoHeaders.getDittoHeaders(); if (null != dittoHeaders) { return dittoHeaders; } @@ -240,9 +270,10 @@ private static DittoHeaders getDittoHeadersOrEmpty(final Object object) { @Override public Object fromBinary(final ByteBuffer buf, final String manifest) { try { - final Jsonifiable jsonifiable = tryToCreateKnownJsonifiableFrom(manifest, buf); + final var jsonifiable = tryToCreateKnownJsonifiableFrom(manifest, buf); if (LOG.isTraceEnabled()) { - LOG.trace("fromBinary {} which got 'in': {}", serializerName, + LOG.trace("fromBinary {} which got 'in': {}", + serializerName, BinaryToHexConverter.createDebugMessageByTryingToConvertToHexString(buf)); } inCounter.increment(); @@ -257,8 +288,10 @@ public Object fromBinary(final byte[] bytes, final String manifest) { return fromBinary(ByteBuffer.wrap(bytes), manifest); } - private Jsonifiable tryToCreateKnownJsonifiableFrom(final String manifest, final ByteBuffer byteBuffer) - throws NotSerializableException { + private Jsonifiable tryToCreateKnownJsonifiableFrom( + final String manifest, + final ByteBuffer byteBuffer + ) throws NotSerializableException { try { return createJsonifiableFrom(manifest, byteBuffer); } catch (final DittoRuntimeException | JsonRuntimeException e) { @@ -268,82 +301,92 @@ private Jsonifiable tryToCreateKnownJsonifiableFrom(final String manifest, fi manifest, serializerName, BinaryToHexConverter.createDebugMessageByTryingToConvertToHexString(byteBuffer), - e); + e + ); throw new NotSerializableException(manifest); } } - private Jsonifiable createJsonifiableFrom(final String manifest, final ByteBuffer bytebuffer) - throws NotSerializableException { - + private Jsonifiable createJsonifiableFrom( + final String manifest, + final ByteBuffer byteBuffer + ) throws NotSerializableException { final var beforeDeserializeInstant = StartInstant.now(); - final JsonValue jsonValue = deserializeFromByteBuffer(bytebuffer); - - final JsonObject jsonObject; - if (jsonValue.isObject()) { - jsonObject = jsonValue.asObject(); - } else if (jsonValue.isNull()) { - jsonObject = JsonFactory.nullObject(); - } else { - LOG.warn("Expected object but received value <{}> with manifest <{}> via {}", jsonValue, manifest, - serializerName); - final String errorMessage = MessageFormat.format("<{}> is not a valid {} object! (It''s a value.)", - BinaryToHexConverter.createDebugMessageByTryingToConvertToHexString(bytebuffer), serializerName); - throw JsonParseException.newBuilder().message(errorMessage).build(); - } - - final JsonObject payload = getPayload(jsonObject); - - final DittoHeadersBuilder dittoHeadersBuilder = jsonObject.getValue(JSON_DITTO_HEADERS) - .map(DittoHeaders::newBuilder) - .orElseGet(DittoHeaders::newBuilder); - - final DittoHeaders dittoHeaders = dittoHeadersBuilder.build(); - final var startedSpan = DittoTracing.newPreparedSpan(dittoHeaders, SpanOperationName.of("deserialize")) - .startAt(beforeDeserializeInstant); + final var jsonObject = deserializeByteBufferAsJsonObjectOrThrow(byteBuffer, manifest); + final var dittoHeaders = deserializeDittoHeaders(jsonObject); + final var payload = deserializePayloadAsJsonObject(jsonObject, dittoHeaders); + final var signalTypeOrErrorCodeOptional = getSignalTypeOrErrorCodeIfPresent(payload); + final var startedSpan = startTracingSpanForDeserialization( + dittoHeaders, + signalTypeOrErrorCodeOptional.orElse(""), + beforeDeserializeInstant + ); + final var result = + deserializeJson(payload, manifest, DittoHeaders.of(startedSpan.propagateContext(dittoHeaders))); try { - return deserializeJson(payload, manifest, DittoHeaders.of(startedSpan.propagateContext(dittoHeaders))); + return result; } finally { + if (signalTypeOrErrorCodeOptional.isEmpty()) { + startedSpan.tag(Tag.of("type", result.getClass().getSimpleName())); + } startedSpan.finish(); } } - - private Jsonifiable deserializeJson(final JsonObject jsonPayload, final String manifest, - final DittoHeaders dittoHeaders) - throws NotSerializableException { - final JsonParsable> mappingStrategy = mappingStrategies.getMappingStrategy(manifest) - .orElseThrow(() -> { - LOG.warn("No strategy found to map manifest <{}> to a Jsonifiable.WithPredicate!", manifest); - return new NotSerializableException(manifest); - }); - return mappingStrategy.parse(jsonPayload, dittoHeaders, innerJson -> - deserializeJson(innerJson, getDefaultManifest(innerJson), dittoHeaders)); - } - - private String getDefaultManifest(final JsonObject jsonObject) throws NotSerializableException { - return jsonObject.getValue(Command.JsonFields.TYPE) - .orElseThrow(() -> new NotSerializableException("No type found for inner JSON!")); + + private JsonObject deserializeByteBufferAsJsonObjectOrThrow(final ByteBuffer byteBuffer, final String manifest) { + final JsonObject result; + final var jsonValue = deserializeFromByteBuffer(byteBuffer); + if (jsonValue.isObject()) { + result = jsonValue.asObject(); + } else if (jsonValue.isNull()) { + result = JsonFactory.nullObject(); + } else { + LOG.warn("Expected object but received value <{}> with manifest <{}> via {}", + jsonValue, + manifest, + serializerName); + throw JsonParseException.newBuilder() + .message(MessageFormat.format("<{}> is not a valid {} object! (It''s a value.)", + BinaryToHexConverter.createDebugMessageByTryingToConvertToHexString(byteBuffer), + serializerName)) + .build(); + } + return result; } /** * Deserializes the passed {@code byteBuffer} into a JsonValue. * - * @param byteBuffer the ByteBuffer to derserialize. + * @param byteBuffer the ByteBuffer to deserialize. * @return the deserialized JsonValue. */ protected abstract JsonValue deserializeFromByteBuffer(ByteBuffer byteBuffer); - private static JsonObject getPayload(final JsonObject sourceJsonObject) { - final JsonObject result; + private static DittoHeaders deserializeDittoHeaders(final JsonObject jsonObject) { + return jsonObject.getValue(JSON_DITTO_HEADERS) + .map(DittoHeaders::newBuilder) + .map(DittoHeadersBuilder::build) + .orElseGet(DittoHeaders::empty); + } - final Optional payloadJsonOptional = sourceJsonObject.getValue(JSON_PAYLOAD); + private static JsonObject deserializePayloadAsJsonObject( + final JsonObject sourceJsonObject, + final DittoHeaders dittoHeaders + ) { + final JsonObject result; + final var payloadJsonOptional = sourceJsonObject.getValue(JSON_PAYLOAD); if (payloadJsonOptional.isPresent()) { - final JsonValue payloadJson = payloadJsonOptional.get(); + final var payloadJson = payloadJsonOptional.get(); if (!payloadJson.isObject()) { - final String msgPattern = "Value <{0}> for <{1}> was not of type <{2}>!"; - final String simpleName = JSON_PAYLOAD.getValueType().getSimpleName(); - final String msg = MessageFormat.format(msgPattern, payloadJson, JSON_PAYLOAD.getPointer(), simpleName); - throw new DittoJsonException(new IllegalArgumentException(msg)); + throw new DittoJsonException( + JsonParseException.newBuilder() + .message(MessageFormat.format("Value <{0}> for <{1}> was not of type <{2}>!", + payloadJson, + JSON_PAYLOAD.getPointer(), + JSON_PAYLOAD.getValueType().getSimpleName())) + .build(), + dittoHeaders + ); } else { result = payloadJson.asObject(); } @@ -354,4 +397,49 @@ private static JsonObject getPayload(final JsonObject sourceJsonObject) { return result; } + private static Optional getSignalTypeOrErrorCodeIfPresent(final JsonObject jsonObject) { + return jsonObject.getValue("type") + .or(() -> jsonObject.getValue("error")) + .filter(JsonValue::isString) + .map(JsonValue::asString); + } + + private static StartedSpan startTracingSpanForDeserialization( + final DittoHeaders dittoHeaders, + final String signalTypeOrErrorCode, + final StartInstant startInstant + ) { + return DittoTracing.newPreparedSpan(dittoHeaders, SpanOperationName.of("deserialize " + signalTypeOrErrorCode)) + .correlationId(dittoHeaders.getCorrelationId().orElse(null)) + .startAt(startInstant); + } + + private Jsonifiable deserializeJson( + final JsonObject jsonPayload, + final String manifest, + final DittoHeaders dittoHeaders + ) throws NotSerializableException { + final var mappingStrategy = getMappingStrategyOrThrow(manifest); + return mappingStrategy.parse( + jsonPayload, + dittoHeaders, + innerJson -> deserializeJson(innerJson, getDefaultManifestOrThrow(innerJson), dittoHeaders) + ); + } + + private JsonParsable> getMappingStrategyOrThrow( + final String manifest + ) throws NotSerializableException { + return mappingStrategies.getMappingStrategy(manifest) + .orElseThrow(() -> { + LOG.warn("No strategy found to map manifest <{}> to a Jsonifiable.WithPredicate!", manifest); + return new NotSerializableException(manifest); + }); + } + + private static String getDefaultManifestOrThrow(final JsonObject jsonObject) throws NotSerializableException { + return jsonObject.getValue(Command.JsonFields.TYPE) + .orElseThrow(() -> new NotSerializableException("No type found for inner JSON!")); + } + }