getComponentDataTypeConverters(String scheme) {
+ if (!dataTypeConverters.containsKey(scheme)) {
+ dataTypeConverters.put(scheme, new ArrayList<>());
+ }
+
+ return dataTypeConverters.get(scheme);
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+}
diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
similarity index 69%
rename from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java
rename to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
index c5098c1c6..a15ff3a08 100644
--- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java
+++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java
@@ -14,22 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.kamelets.utils.transform.aws.ddb;
+package org.apache.camel.kamelets.utils.format.converter.aws2.ddb;
+
+import java.io.InputStream;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeProperty;
-import org.apache.camel.InvalidPayloadException;
import org.apache.camel.component.aws2.ddb.Ddb2Constants;
import org.apache.camel.component.aws2.ddb.Ddb2Operations;
+import org.apache.camel.component.jackson.JacksonDataFormat;
+import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
+import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
@@ -40,55 +45,78 @@
*
* Json property names map to attribute keys and Json property values map to attribute values.
*
- * During mapping the Json property types resolve to the respective attribute types ({@code String, StringSet, Boolean, Number, NumberSet, Map, Null}).
- * Primitive typed arrays in Json get mapped to {@code StringSet} or {@code NumberSet} attribute values.
+ * During mapping the Json property types resolve to the respective attribute types
+ * ({@code String, StringSet, Boolean, Number, NumberSet, Map, Null}). Primitive typed arrays in Json get mapped to
+ * {@code StringSet} or {@code NumberSet} attribute values.
+ *
+ * The input type supports the operations: PutItem, UpdateItem, DeleteItem
*
* For PutItem operation the Json body defines all item attributes.
*
* For DeleteItem operation the Json body defines only the primary key attributes that identify the item to delete.
*
- * For UpdateItem operation the Json body defines both key attributes to identify the item to be updated and all item attributes tht get updated on the item.
+ * For UpdateItem operation the Json body defines both key attributes to identify the item to be updated and all item
+ * attributes tht get updated on the item.
+ *
+ * The given Json body can use "operation", "key" and "item" as top level properties. Both define a Json object that
+ * will be mapped to respective attribute value maps:
*
- * The given Json body can use "key" and "item" as top level properties.
- * Both define a Json object that will be mapped to respective attribute value maps:
- * {@code
+ *
+ * {@code
* {
+ * "operation": "PutItem"
* "key": {},
* "item": {}
* }
* }
*
- * The converter will extract the objects and set respective attribute value maps as header entries.
- * This is a comfortable way to define different key and item attribute value maps e.g. on UpdateItem operation.
*
- * In case key and item attribute value maps are identical you can omit the special top level properties completely.
- * The converter will map the whole Json body as is then and use it as source for the attribute value map.
+ * The converter will extract the objects and set respective attribute value maps as header entries. This is a
+ * comfortable way to define different key and item attribute value maps e.g. on UpdateItem operation.
+ *
+ * In case key and item attribute value maps are identical you can omit the special top level properties completely. The
+ * converter will map the whole Json body as is then and use it as source for the attribute value map.
*/
-public class JsonToDdbModelConverter {
+@DataType(scheme = "aws2-ddb", name = "json")
+public class Ddb2JsonInputType implements DataTypeConverter {
+
+ private final JacksonDataFormat dataFormat = new JacksonDataFormat(new ObjectMapper(), JsonNode.class);
- public String process(@ExchangeProperty("operation") String operation, Exchange exchange) throws InvalidPayloadException {
+ @Override
+ public void convert(Exchange exchange) {
if (exchange.getMessage().getHeaders().containsKey(Ddb2Constants.ITEM) ||
exchange.getMessage().getHeaders().containsKey(Ddb2Constants.KEY)) {
- return "";
+ return;
}
- ObjectMapper mapper = new ObjectMapper();
+ JsonNode jsonBody = getBodyAsJsonNode(exchange);
+
+ String operation
+ = Optional.ofNullable(jsonBody.get("operation")).map(JsonNode::asText).orElse(Ddb2Operations.PutItem.name());
+ if (exchange.hasProperties() && exchange.getProperty("operation", String.class) != null) {
+ operation = exchange.getProperty("operation", String.class);
+ }
- JsonNode jsonBody = exchange.getMessage().getMandatoryBody(JsonNode.class);
+ if (exchange.getIn().getHeaders().containsKey(Ddb2Constants.OPERATION)) {
+ operation = exchange.getIn().getHeader(Ddb2Constants.OPERATION, Ddb2Operations.class).name();
+ }
JsonNode key = jsonBody.get("key");
JsonNode item = jsonBody.get("item");
Map keyProps;
if (key != null) {
- keyProps = mapper.convertValue(key, new TypeReference