schemes;
+
+ private AvroSchema schema;
+ private String contentClass;
+
+ private boolean validate = true;
+
+ public AvroSchemaResolver() {
+ this.schemes = new ConcurrentHashMap<>();
+ }
+
+ public String getSchema() {
+ if (this.schema != null) {
+ return this.schema.getAvroSchema().toString();
+ }
+
+ return null;
+ }
+
+ public void setSchema(String schema) {
+ if (ObjectHelper.isNotEmpty(schema)) {
+ this.schema = new AvroSchema(new Schema.Parser().setValidate(validate).parse(schema));
+ } else {
+ this.schema = null;
+ }
+ }
+
+ public boolean isValidate() {
+ return validate;
+ }
+
+ public void setValidate(boolean validate) {
+ this.validate = validate;
+ }
+
+ public String getContentClass() {
+ return contentClass;
+ }
+
+ public void setContentClass(String contentClass) {
+ if (ObjectHelper.isNotEmpty(contentClass)) {
+ this.contentClass = contentClass;
+ } else {
+ this.contentClass = null;
+ }
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Object payload = exchange.getMessage().getBody();
+ if (payload == null) {
+ return;
+ }
+
+ AvroSchema answer = computeIfAbsent(exchange);
+
+ if (answer != null) {
+ exchange.setProperty(SchemaHelper.CONTENT_SCHEMA, answer);
+ exchange.setProperty(SchemaHelper.CONTENT_SCHEMA_TYPE, SchemaType.AVRO.type());
+ exchange.setProperty(SchemaHelper.CONTENT_CLASS, SchemaHelper.resolveContentClass(exchange, this.contentClass));
+ }
+ }
+
+ @Override
+ public FormatSchema resolve(Exchange exchange) {
+ AvroSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class);
+ if (answer == null) {
+ answer = computeIfAbsent(exchange);
+ }
+
+ return answer;
+ }
+
+ private AvroSchema computeIfAbsent(Exchange exchange) {
+ if (this.schema != null) {
+ return this.schema;
+ }
+
+ AvroSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class);
+
+ if (answer == null && exchange.getProperties().containsKey(SchemaHelper.SCHEMA)) {
+ String schemaJson = exchange.getProperty(SchemaHelper.SCHEMA, String.class);
+ Schema raw = new Schema.Parser().setValidate(validate).parse(schemaJson);
+ answer = new AvroSchema(raw);
+ }
+
+ if (answer == null) {
+ String contentClass = SchemaHelper.resolveContentClass(exchange, this.contentClass);
+ if (contentClass != null) {
+ answer = this.schemes.computeIfAbsent(contentClass, t -> {
+ Resource res = PluginHelper.getResourceLoader(exchange.getContext())
+ .resolveResource("classpath:schemas/" + SchemaType.AVRO.type() + "/" + t + "." + SchemaType.AVRO.type());
+
+ try {
+ if (res.exists()) {
+ try (InputStream is = res.getInputStream()) {
+ if (is != null) {
+ return Avro.MAPPER.schemaFrom(is);
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Unable to load Avro schema for type: " + t + ", resource: " + res.getLocation(), e);
+ }
+
+ try {
+ return Avro.MAPPER.schemaFor(Class.forName(contentClass));
+ } catch (JsonMappingException | ClassNotFoundException e) {
+ throw new RuntimeException(
+ "Unable to compute Avro schema for type: " + t, e);
+ }
+ });
+ }
+ }
+
+ if (answer != null) {
+ this.schema = answer;
+ }
+
+ return answer;
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroStructDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroStructDataType.java
new file mode 100644
index 000000000..4186d8c16
--- /dev/null
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroStructDataType.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kamelets.utils.format.converter.avro;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.dataformat.avro.AvroSchema;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.kamelets.utils.format.MimeType;
+import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper;
+import org.apache.camel.spi.DataType;
+import org.apache.camel.spi.DataTypeTransformer;
+import org.apache.camel.spi.Transformer;
+
+/**
+ * Data type uses Avro Jackson data format to unmarshal Exchange body to generic JsonNode.
+ * Uses given Avro schema from the Exchange properties when unmarshalling the payload (usually already resolved via schema
+ * resolver Kamelet action).
+ */
+@DataTypeTransformer(name = "avro-x-struct")
+public class AvroStructDataType extends Transformer {
+
+ @Override
+ public void transform(Message message, DataType fromType, DataType toType) {
+ AvroSchema schema = message.getExchange().getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class);
+
+ if (schema == null) {
+ throw new CamelExecutionException("Missing proper avro schema for data type processing", message.getExchange());
+ }
+
+ try {
+ Object unmarshalled = Avro.MAPPER.reader().forType(JsonNode.class).with(schema)
+ .readValue(getBodyAsStream(message));
+ message.setBody(unmarshalled);
+
+ message.setHeader(Exchange.CONTENT_TYPE, MimeType.STRUCT.type());
+ } catch (InvalidPayloadException | IOException e) {
+ throw new CamelExecutionException("Failed to apply Avro x-struct data type on exchange", message.getExchange(), e);
+ }
+ }
+
+ private InputStream getBodyAsStream(Message message) throws InvalidPayloadException {
+ InputStream bodyStream = message.getBody(InputStream.class);
+
+ if (bodyStream == null) {
+ bodyStream = new ByteArrayInputStream(message.getMandatoryBody(byte[].class));
+ }
+
+ return bodyStream;
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
similarity index 58%
rename from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java
rename to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
index c5098c1c6..b9b20f3fd 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
@@ -14,22 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.kamelets.utils.transform.aws.ddb;
+package org.apache.camel.kamelets.utils.format.converter.aws2.ddb;
+
+import java.io.InputStream;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeProperty;
-import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Message;
import org.apache.camel.component.aws2.ddb.Ddb2Constants;
import org.apache.camel.component.aws2.ddb.Ddb2Operations;
+import org.apache.camel.component.jackson.JacksonDataFormat;
+import org.apache.camel.kamelets.utils.format.converter.json.Json;
+import org.apache.camel.spi.DataType;
+import org.apache.camel.spi.DataTypeTransformer;
+import org.apache.camel.spi.Transformer;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
@@ -40,55 +46,78 @@
*
* Json property names map to attribute keys and Json property values map to attribute values.
*
- * During mapping the Json property types resolve to the respective attribute types ({@code String, StringSet, Boolean, Number, NumberSet, Map, Null}).
- * Primitive typed arrays in Json get mapped to {@code StringSet} or {@code NumberSet} attribute values.
+ * During mapping the Json property types resolve to the respective attribute types
+ * ({@code String, StringSet, Boolean, Number, NumberSet, Map, Null}). Primitive typed arrays in Json get mapped to
+ * {@code StringSet} or {@code NumberSet} attribute values.
+ *
+ * The input type supports the operations: PutItem, UpdateItem, DeleteItem
*
* For PutItem operation the Json body defines all item attributes.
*
* For DeleteItem operation the Json body defines only the primary key attributes that identify the item to delete.
*
- * For UpdateItem operation the Json body defines both key attributes to identify the item to be updated and all item attributes tht get updated on the item.
+ * For UpdateItem operation the Json body defines both key attributes to identify the item to be updated and all item
+ * attributes tht get updated on the item.
+ *
+ * The given Json body can use "operation", "key" and "item" as top level properties. Both define a Json object that
+ * will be mapped to respective attribute value maps:
*
- * The given Json body can use "key" and "item" as top level properties.
- * Both define a Json object that will be mapped to respective attribute value maps:
- * {@code
+ *
+ * {@code
* {
+ * "operation": "PutItem"
* "key": {},
* "item": {}
* }
* }
*
- * The converter will extract the objects and set respective attribute value maps as header entries.
- * This is a comfortable way to define different key and item attribute value maps e.g. on UpdateItem operation.
*
- * In case key and item attribute value maps are identical you can omit the special top level properties completely.
- * The converter will map the whole Json body as is then and use it as source for the attribute value map.
+ * The converter will extract the objects and set respective attribute value maps as header entries. This is a
+ * comfortable way to define different key and item attribute value maps e.g. on UpdateItem operation.
+ *
+ * In case key and item attribute value maps are identical you can omit the special top level properties completely. The
+ * converter will map the whole Json body as is then and use it as source for the attribute value map.
*/
-public class JsonToDdbModelConverter {
+@DataTypeTransformer(name = "aws2-ddb:application-json")
+public class Ddb2JsonInputType extends Transformer {
+
+ private final JacksonDataFormat dataFormat = new JacksonDataFormat(Json.MAPPER, JsonNode.class);
- public String process(@ExchangeProperty("operation") String operation, Exchange exchange) throws InvalidPayloadException {
- if (exchange.getMessage().getHeaders().containsKey(Ddb2Constants.ITEM) ||
- exchange.getMessage().getHeaders().containsKey(Ddb2Constants.KEY)) {
- return "";
+ @Override
+ public void transform(Message message, DataType fromType, DataType toType) {
+ if (message.getHeaders().containsKey(Ddb2Constants.ITEM) ||
+ message.getHeaders().containsKey(Ddb2Constants.KEY)) {
+ return;
}
- ObjectMapper mapper = new ObjectMapper();
+ JsonNode jsonBody = getBodyAsJsonNode(message);
+
+ String operation
+ = Optional.ofNullable(jsonBody.get("operation")).map(JsonNode::asText).orElse(Ddb2Operations.PutItem.name());
+ if (message.getExchange().hasProperties() && message.getExchange().getProperty("operation", String.class) != null) {
+ operation = message.getExchange().getProperty("operation", String.class);
+ }
- JsonNode jsonBody = exchange.getMessage().getMandatoryBody(JsonNode.class);
+ if (message.getHeaders().containsKey(Ddb2Constants.OPERATION)) {
+ operation = message.getHeader(Ddb2Constants.OPERATION, Ddb2Operations.class).name();
+ }
JsonNode key = jsonBody.get("key");
JsonNode item = jsonBody.get("item");
Map keyProps;
if (key != null) {
- keyProps = mapper.convertValue(key, new TypeReference