From fbbd34b138fbb22dfef1fd547c661879546cead7 Mon Sep 17 00:00:00 2001 From: Christoph Deppisch Date: Wed, 9 Mar 2022 16:13:39 +0100 Subject: [PATCH] chore: Add Kamelet aws-ddb-sink - Add sink Kamelet for AWS DynamoDB - Use YAKS version 0.9.0-202203140033 - Add integration test for aws-ddb-sink Kamelet - Introduce unit tests on camel-kamelets-utils module --- .github/workflows/yaks-tests.yaml | 6 +- kamelets/aws-ddb-sink.kamelet.yaml | 135 ++++++++++++ kamelets/aws-ddb-streams-source.kamelet.yaml | 4 +- kamelets/aws-s3-source.kamelet.yaml | 6 +- library/camel-kamelets-utils/pom.xml | 42 ++++ .../aws/ddb/JsonToDdbModelConverter.java | 201 ++++++++++++++++++ .../aws/ddb/JsonToDdbModelConverterTest.java | 194 +++++++++++++++++ .../kamelets/aws-ddb-sink.kamelet.yaml | 135 ++++++++++++ .../aws-ddb-streams-source.kamelet.yaml | 4 +- .../kamelets/aws-s3-source.kamelet.yaml | 6 +- test/aws-ddb-sink/amazonDDBClient.groovy | 53 +++++ test/aws-ddb-sink/aws-ddb-sink-binding.yaml | 50 +++++ .../aws-ddb-sink-deleteItem.feature | 48 +++++ .../aws-ddb-sink/aws-ddb-sink-putItem.feature | 41 ++++ .../aws-ddb-sink-updateItem.feature | 51 +++++ test/aws-ddb-sink/putItem.groovy | 30 +++ test/aws-ddb-sink/verifyItems.groovy | 18 ++ test/aws-ddb-sink/yaks-config.yaml | 71 +++++++ .../earthquake-source.feature | 2 +- .../insert-field-action.feature | 4 +- test/mail-sink/mail-sink.feature | 8 +- test/mail-sink/yaks-config.yaml | 2 + test/timer-source/timer-source.feature | 2 +- 23 files changed, 1092 insertions(+), 21 deletions(-) create mode 100644 kamelets/aws-ddb-sink.kamelet.yaml create mode 100644 library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java create mode 100644 library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java create mode 100644 library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml create mode 100644 test/aws-ddb-sink/amazonDDBClient.groovy create mode 100644 test/aws-ddb-sink/aws-ddb-sink-binding.yaml create mode 100644 test/aws-ddb-sink/aws-ddb-sink-deleteItem.feature create mode 100644 test/aws-ddb-sink/aws-ddb-sink-putItem.feature create mode 100644 test/aws-ddb-sink/aws-ddb-sink-updateItem.feature create mode 100644 test/aws-ddb-sink/putItem.groovy create mode 100644 test/aws-ddb-sink/verifyItems.groovy create mode 100644 test/aws-ddb-sink/yaks-config.yaml diff --git a/.github/workflows/yaks-tests.yaml b/.github/workflows/yaks-tests.yaml index fcede74d3..11cf6ab79 100644 --- a/.github/workflows/yaks-tests.yaml +++ b/.github/workflows/yaks-tests.yaml @@ -42,8 +42,8 @@ concurrency: env: CAMEL_K_VERSION: 1.7.0 - YAKS_VERSION: 0.8.0 - YAKS_IMAGE_NAME: "docker.io/citrusframework/yaks" + YAKS_VERSION: 0.9.0-202203140033 + YAKS_IMAGE_NAME: "docker.io/yaks/yaks" jobs: test: @@ -63,7 +63,7 @@ jobs: rm -r _kamel - name: Get YAKS CLI run: | - curl --fail -L --silent https://github.com/citrusframework/yaks/releases/download/v${YAKS_VERSION}/yaks-${YAKS_VERSION}-linux-64bit.tar.gz -o yaks.tar.gz + curl --fail -L --silent https://github.com/citrusframework/yaks/releases/download/${YAKS_VERSION}/yaks-${YAKS_VERSION}-linux-64bit.tar.gz -o yaks.tar.gz mkdir -p _yaks tar -zxf yaks.tar.gz --directory ./_yaks sudo mv ./_yaks/yaks /usr/local/bin/ diff --git a/kamelets/aws-ddb-sink.kamelet.yaml b/kamelets/aws-ddb-sink.kamelet.yaml new file mode 100644 index 000000000..2080bd289 --- /dev/null +++ b/kamelets/aws-ddb-sink.kamelet.yaml @@ -0,0 +1,135 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: Kamelet +metadata: + name: aws-ddb-sink + annotations: + camel.apache.org/kamelet.support.level: "Preview" + camel.apache.org/catalog.version: "main-SNAPSHOT" + camel.apache.org/kamelet.icon: "" + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "AWS DynamoDB Streams" + labels: + camel.apache.org/kamelet.type: "sink" +spec: + definition: + title: "AWS DynamoDB Sink" + description: |- + Send data to AWS DynamoDB service. The sent data will insert/update/delete an item on the given AWS DynamoDB table. + + Access Key/Secret Key are the basic method for authenticating to the AWS DynamoDB service. These parameters are optional, because the Kamelet provide also the 'useDefaultCredentialsProvider'. + + When using a default Credentials Provider the AWS DynamoDB client will load the credentials through this provider and won't use the static credential. This is reason for not having the access key and secret key as mandatory parameter for this Kamelet. + + This Kamelet expects a JSON as body. The mapping between the JSON fields and table attribute values is done by key, so if you have the input: + + '{"username":"oscerd", "city":"Rome"}' + + The Kamelet will insert/update an item in the given AWS DynamoDB table and set the attributes 'username' and 'city' respectively. Please note that the JSON object must include the primary key values that define the item. + required: + - table + - region + type: object + properties: + table: + title: Table + description: Name of the DynamoDB table to look at + type: string + accessKey: + title: Access Key + description: The access key obtained from AWS + type: string + format: password + x-descriptors: + - urn:alm:descriptor:com.tectonic.ui:password + - urn:camel:group:credentials + secretKey: + title: Secret Key + description: The secret key obtained from AWS + type: string + format: password + x-descriptors: + - urn:alm:descriptor:com.tectonic.ui:password + - urn:camel:group:credentials + region: + title: AWS Region + description: The AWS region to connect to + type: string + example: eu-west-1 + operation: + title: Operation + description: The operation to perform (one of PutItem, UpdateItem, DeleteItem) + type: string + default: PutItem + example: PutItem + writeCapacity: + title: Write Capacity + description: The provisioned throughput to reserved for writing resources to your table + type: integer + default: 1 + useDefaultCredentialsProvider: + title: Default Credentials Provider + description: Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. + type: boolean + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + default: false + uriEndpointOverride: + title: Overwritte Endpoint URI + description: Set the overriding endpoint URI. This option needs to be used in combination with overrideEndpoint option. + type: string + overrideEndpoint: + title: Endpoint Overwrite + description: Set the need for overiding the endpoint URI. This option needs to be used in combination with uriEndpointOverride setting. + type: boolean + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + default: false + types: + in: + mediaType: application/json + dependencies: + - github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT + - "camel:core" + - "camel:jackson" + - "camel:aws2-ddb" + - "camel:kamelet" + template: + from: + uri: "kamelet:source" + steps: + - set-property: + name: operation + constant: "{{operation}}" + - unmarshal: + json: + library: Jackson + unmarshalType: com.fasterxml.jackson.databind.JsonNode + - bean: "org.apache.camel.kamelets.utils.transform.aws.ddb.JsonToDdbModelConverter" + - to: + uri: "aws2-ddb:{{table}}" + parameters: + secretKey: "{{?secretKey}}" + accessKey: "{{?accessKey}}" + region: "{{region}}" + operation: "{{operation}}" + writeCapacity: "{{?writeCapacity}}" + useDefaultCredentialsProvider: "{{useDefaultCredentialsProvider}}" + uriEndpointOverride: "{{?uriEndpointOverride}}" + overrideEndpoint: "{{overrideEndpoint}}" diff --git a/kamelets/aws-ddb-streams-source.kamelet.yaml b/kamelets/aws-ddb-streams-source.kamelet.yaml index 512e13987..31e62ef6e 100644 --- a/kamelets/aws-ddb-streams-source.kamelet.yaml +++ b/kamelets/aws-ddb-streams-source.kamelet.yaml @@ -34,7 +34,7 @@ spec: Receive events from AWS DynamoDB Streams. Access Key/Secret Key are the basic method for authenticating to the AWS DynamoDB Streams Service. These parameters are optional, because the Kamelet provide also the 'useDefaultCredentialsProvider'. - + When using a default Credentials Provider the AWS DynamoDB Streams client will load the credentials through this provider and won't use the static credential. This is reason for not having the access key and secret key as mandatory parameter for this Kamelet. required: - table @@ -103,6 +103,6 @@ spec: useDefaultCredentialsProvider: "{{useDefaultCredentialsProvider}}" steps: - marshal: - json: + json: library: Gson - to: "kamelet:sink" diff --git a/kamelets/aws-s3-source.kamelet.yaml b/kamelets/aws-s3-source.kamelet.yaml index 439777d92..ce9439ae8 100644 --- a/kamelets/aws-s3-source.kamelet.yaml +++ b/kamelets/aws-s3-source.kamelet.yaml @@ -17,7 +17,7 @@ spec: Receive data from AWS S3 Bucket. Access Key/Secret Key are the basic method for authenticating to the AWS S3 Service. These parameters are optional, because the Kamelet provide also the 'useDefaultCredentialsProvider'. - + When using a default Credentials Provider the S3 client will load the credentials through this provider and won't use the static credential. This is reason for not having the access key and secret key as mandatory parameter for this Kamelet. required: - bucketNameOrArn @@ -58,14 +58,14 @@ spec: example: eu-west-1 autoCreateBucket: title: Autocreate Bucket - description: Setting the autocreation of the S3 bucket bucketName. + description: Setting the autocreation of the S3 bucket bucketName. type: boolean x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' default: false includeBody: title: Include Body - description: If it is true, the exchange will be consumed and put into the body and closed. If false the S3Object stream will be put raw into the body and the headers will be set with the S3 object metadata. + description: If it is true, the exchange will be consumed and put into the body and closed. If false the S3Object stream will be put raw into the body and the headers will be set with the S3 object metadata. type: boolean x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' diff --git a/library/camel-kamelets-utils/pom.xml b/library/camel-kamelets-utils/pom.xml index cb71d0705..4f848d36c 100644 --- a/library/camel-kamelets-utils/pom.xml +++ b/library/camel-kamelets-utils/pom.xml @@ -71,6 +71,48 @@ camel-kafka + + + org.apache.camel + camel-aws2-ddb + provided + + + + + org.junit.jupiter + junit-jupiter-api + ${junit.jupiter.version} + test + + + + org.junit.jupiter + junit-jupiter-engine + ${junit.jupiter.version} + test + + + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + test + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + test + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + test + + diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java new file mode 100644 index 000000000..c5098c1c6 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kamelets.utils.transform.aws.ddb; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeProperty; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.component.aws2.ddb.Ddb2Constants; +import org.apache.camel.component.aws2.ddb.Ddb2Operations; +import software.amazon.awssdk.services.dynamodb.model.AttributeAction; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; +import software.amazon.awssdk.services.dynamodb.model.ReturnValue; + +/** + * Maps Json body to DynamoDB attribute value map and sets the attribute map as Camel DynamoDB header entries. + * + * Json property names map to attribute keys and Json property values map to attribute values. + * + * During mapping the Json property types resolve to the respective attribute types ({@code String, StringSet, Boolean, Number, NumberSet, Map, Null}). + * Primitive typed arrays in Json get mapped to {@code StringSet} or {@code NumberSet} attribute values. + * + * For PutItem operation the Json body defines all item attributes. + * + * For DeleteItem operation the Json body defines only the primary key attributes that identify the item to delete. + * + * For UpdateItem operation the Json body defines both key attributes to identify the item to be updated and all item attributes tht get updated on the item. + * + * The given Json body can use "key" and "item" as top level properties. + * Both define a Json object that will be mapped to respective attribute value maps: + *
{@code
+ * {
+ *   "key": {},
+ *   "item": {}
+ * }
+ * }
+ * 
+ * The converter will extract the objects and set respective attribute value maps as header entries. + * This is a comfortable way to define different key and item attribute value maps e.g. on UpdateItem operation. + * + * In case key and item attribute value maps are identical you can omit the special top level properties completely. + * The converter will map the whole Json body as is then and use it as source for the attribute value map. + */ +public class JsonToDdbModelConverter { + + public String process(@ExchangeProperty("operation") String operation, Exchange exchange) throws InvalidPayloadException { + if (exchange.getMessage().getHeaders().containsKey(Ddb2Constants.ITEM) || + exchange.getMessage().getHeaders().containsKey(Ddb2Constants.KEY)) { + return ""; + } + + ObjectMapper mapper = new ObjectMapper(); + + JsonNode jsonBody = exchange.getMessage().getMandatoryBody(JsonNode.class); + + JsonNode key = jsonBody.get("key"); + JsonNode item = jsonBody.get("item"); + + Map keyProps; + if (key != null) { + keyProps = mapper.convertValue(key, new TypeReference>(){}); + } else { + keyProps = mapper.convertValue(jsonBody, new TypeReference>(){}); + } + + Map itemProps; + if (item != null) { + itemProps = mapper.convertValue(item, new TypeReference>(){}); + } else { + itemProps = keyProps; + } + + final Map keyMap = getAttributeValueMap(keyProps); + + switch (Ddb2Operations.valueOf(operation)) { + case PutItem: + exchange.getMessage().setHeader(Ddb2Constants.OPERATION, Ddb2Operations.PutItem); + exchange.getMessage().setHeader(Ddb2Constants.ITEM, getAttributeValueMap(itemProps)); + setHeaderIfNotPresent(Ddb2Constants.RETURN_VALUES, ReturnValue.ALL_OLD.toString(), exchange); + break; + case UpdateItem: + exchange.getMessage().setHeader(Ddb2Constants.OPERATION, Ddb2Operations.UpdateItem); + exchange.getMessage().setHeader(Ddb2Constants.KEY, keyMap); + exchange.getMessage().setHeader(Ddb2Constants.UPDATE_VALUES, getAttributeValueUpdateMap(itemProps)); + setHeaderIfNotPresent(Ddb2Constants.RETURN_VALUES, ReturnValue.ALL_NEW.toString(), exchange); + break; + case DeleteItem: + exchange.getMessage().setHeader(Ddb2Constants.OPERATION, Ddb2Operations.DeleteItem); + exchange.getMessage().setHeader(Ddb2Constants.KEY, keyMap); + setHeaderIfNotPresent(Ddb2Constants.RETURN_VALUES, ReturnValue.ALL_OLD.toString(), exchange); + break; + default: + throw new UnsupportedOperationException(String.format("Unsupported operation '%s'", operation)); + } + + return ""; + } + + private void setHeaderIfNotPresent(String headerName, Object value, Exchange exchange) { + exchange.getMessage().setHeader(headerName, value); + } + + private Map getAttributeValueMap(Map body) { + final Map attributeValueMap = new LinkedHashMap<>(); + + for (Map.Entry attribute : body.entrySet()) { + attributeValueMap.put(attribute.getKey(), getAttributeValue(attribute.getValue())); + } + + return attributeValueMap; + } + + private Map getAttributeValueUpdateMap(Map body) { + final Map attributeValueMap = new LinkedHashMap<>(); + + for (Map.Entry attribute : body.entrySet()) { + attributeValueMap.put(attribute.getKey(), getAttributeValueUpdate(attribute.getValue())); + } + + return attributeValueMap; + } + + private static AttributeValue getAttributeValue(Object value) { + if (value == null) { + return AttributeValue.builder().nul(true).build(); + } + + if (value instanceof String) { + return AttributeValue.builder().s(value.toString()).build(); + } + + if (value instanceof Integer) { + return AttributeValue.builder().n(value.toString()).build(); + } + + if (value instanceof Boolean) { + return AttributeValue.builder().bool((Boolean) value).build(); + } + + if (value instanceof String[]) { + return AttributeValue.builder().ss((String[]) value).build(); + } + + if (value instanceof int[]) { + return AttributeValue.builder().ns(Stream.of((int[]) value).map(Object::toString).collect(Collectors.toList())).build(); + } + + if (value instanceof List) { + List values = ((List) value); + + if (values.isEmpty()) { + return AttributeValue.builder().ss().build(); + } else if (values.get(0) instanceof Integer) { + return AttributeValue.builder().ns(values.stream().map(Object::toString).collect(Collectors.toList())).build(); + } else { + return AttributeValue.builder().ss(values.stream().map(Object::toString).collect(Collectors.toList())).build(); + } + } + + if (value instanceof Map) { + Map nestedAttributes = new LinkedHashMap<>(); + + for (Map.Entry nested : ((Map) value).entrySet()) { + nestedAttributes.put(nested.getKey().toString(), getAttributeValue(nested.getValue())); + } + + return AttributeValue.builder().m(nestedAttributes).build(); + } + + return AttributeValue.builder().s(value.toString()).build(); + } + + private static AttributeValueUpdate getAttributeValueUpdate(Object value) { + return AttributeValueUpdate.builder() + .action(AttributeAction.PUT) + .value(getAttributeValue(value)).build(); + } +} diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java new file mode 100644 index 000000000..9f3053ac8 --- /dev/null +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kamelets.utils.transform.aws.ddb; + +import java.util.Map; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.component.aws2.ddb.Ddb2Constants; +import org.apache.camel.component.aws2.ddb.Ddb2Operations; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.dynamodb.model.AttributeAction; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; +import software.amazon.awssdk.services.dynamodb.model.ReturnValue; + +class JsonToDdbModelConverterTest { + + private DefaultCamelContext camelContext; + + private final ObjectMapper mapper = new ObjectMapper(); + + private final JsonToDdbModelConverter processor = new JsonToDdbModelConverter(); + + private final String keyJson = "{" + + "\"name\": \"Rajesh Koothrappali\"" + + "}"; + + private final String itemJson = "{" + + "\"name\": \"Rajesh Koothrappali\"," + + "\"age\": 29," + + "\"super-heroes\": [\"batman\", \"spiderman\", \"wonderwoman\"]," + + "\"issues\": [5, 3, 9, 1]," + + "\"girlfriend\": null," + + "\"doctorate\": true" + + "}"; + + @BeforeEach + void setup() { + this.camelContext = new DefaultCamelContext(); + } + + @Test + @SuppressWarnings("unchecked") + void shouldMapPutItemHeaders() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree(itemJson)); + + processor.process(Ddb2Operations.PutItem.name(), exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals(Ddb2Operations.PutItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); + Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES)); + + assertAttributeValueMap(exchange.getMessage().getHeader(Ddb2Constants.ITEM, Map.class)); + } + + @Test + @SuppressWarnings("unchecked") + void shouldMapUpdateItemHeaders() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + ", \"item\": " + itemJson + "}")); + + processor.process(Ddb2Operations.UpdateItem.name(), exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals(Ddb2Operations.UpdateItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); + Assertions.assertEquals(ReturnValue.ALL_NEW.toString(), exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES)); + + Map attributeValueMap = exchange.getMessage().getHeader(Ddb2Constants.KEY, Map.class); + Assertions.assertEquals(1L, attributeValueMap.size()); + Assertions.assertEquals(AttributeValue.builder().s("Rajesh Koothrappali").build(), attributeValueMap.get("name")); + + assertAttributeValueUpdateMap(exchange.getMessage().getHeader(Ddb2Constants.UPDATE_VALUES, Map.class)); + } + + @Test + @SuppressWarnings("unchecked") + void shouldMapDeleteItemHeaders() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + "}")); + + processor.process(Ddb2Operations.DeleteItem.name(), exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals(Ddb2Operations.DeleteItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); + Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES)); + + Map attributeValueMap = exchange.getMessage().getHeader(Ddb2Constants.KEY, Map.class); + Assertions.assertEquals(1L, attributeValueMap.size()); + Assertions.assertEquals(AttributeValue.builder().s("Rajesh Koothrappali").build(), attributeValueMap.get("name")); + } + + @Test + @SuppressWarnings("unchecked") + void shouldMapNestedObjects() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree("{\"user\":" + itemJson + "}")); + + processor.process(Ddb2Operations.PutItem.name(), exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals(Ddb2Operations.PutItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); + Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES)); + + Map attributeValueMap = exchange.getMessage().getHeader(Ddb2Constants.ITEM, Map.class); + Assertions.assertEquals(1L, attributeValueMap.size()); + + Assertions.assertEquals("AttributeValue(M={name=AttributeValue(S=Rajesh Koothrappali), " + + "age=AttributeValue(N=29), " + + "super-heroes=AttributeValue(SS=[batman, spiderman, wonderwoman]), " + + "issues=AttributeValue(NS=[5, 3, 9, 1]), " + + "girlfriend=AttributeValue(NUL=true), " + + "doctorate=AttributeValue(BOOL=true)})", attributeValueMap.get("user").toString()); + } + + @Test + @SuppressWarnings("unchecked") + void shouldMapEmptyJson() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree("{}")); + + processor.process(Ddb2Operations.PutItem.name(), exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + Assertions.assertEquals(Ddb2Operations.PutItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); + Assertions.assertEquals(ReturnValue.ALL_OLD.toString(), exchange.getMessage().getHeader(Ddb2Constants.RETURN_VALUES)); + + Map attributeValueMap = exchange.getMessage().getHeader(Ddb2Constants.ITEM, Map.class); + Assertions.assertEquals(0L, attributeValueMap.size()); + } + + @Test + void shouldFailForWrongBodyType() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody("{}"); + + Assertions.assertThrows(InvalidPayloadException.class, () -> processor.process(Ddb2Operations.PutItem.name(), exchange)); + } + + @Test() + void shouldFailForUnsupportedOperation() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody(mapper.readTree("{}")); + + Assertions.assertThrows(UnsupportedOperationException.class, () -> processor.process(Ddb2Operations.BatchGetItems.name(), exchange)); + } + + private void assertAttributeValueMap(Map attributeValueMap) { + Assertions.assertEquals(6L, attributeValueMap.size()); + Assertions.assertEquals(AttributeValue.builder().s("Rajesh Koothrappali").build(), attributeValueMap.get("name")); + Assertions.assertEquals(AttributeValue.builder().n("29").build(), attributeValueMap.get("age")); + Assertions.assertEquals(AttributeValue.builder().ss("batman", "spiderman", "wonderwoman").build(), attributeValueMap.get("super-heroes")); + Assertions.assertEquals(AttributeValue.builder().ns("5", "3", "9", "1").build(), attributeValueMap.get("issues")); + Assertions.assertEquals(AttributeValue.builder().nul(true).build(), attributeValueMap.get("girlfriend")); + Assertions.assertEquals(AttributeValue.builder().bool(true).build(), attributeValueMap.get("doctorate")); + } + + private void assertAttributeValueUpdateMap(Map attributeValueMap) { + Assertions.assertEquals(6L, attributeValueMap.size()); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().s("Rajesh Koothrappali").build()).action(AttributeAction.PUT).build(), attributeValueMap.get("name")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().n("29").build()).action(AttributeAction.PUT).build(), attributeValueMap.get("age")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ss("batman", "spiderman", "wonderwoman").build()).action(AttributeAction.PUT).build(), attributeValueMap.get("super-heroes")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ns("5", "3", "9", "1").build()).action(AttributeAction.PUT).build(), attributeValueMap.get("issues")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().nul(true).build()).action(AttributeAction.PUT).build(), attributeValueMap.get("girlfriend")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().bool(true).build()).action(AttributeAction.PUT).build(), attributeValueMap.get("doctorate")); + } +} diff --git a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml new file mode 100644 index 000000000..2080bd289 --- /dev/null +++ b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml @@ -0,0 +1,135 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: Kamelet +metadata: + name: aws-ddb-sink + annotations: + camel.apache.org/kamelet.support.level: "Preview" + camel.apache.org/catalog.version: "main-SNAPSHOT" + camel.apache.org/kamelet.icon: "" + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "AWS DynamoDB Streams" + labels: + camel.apache.org/kamelet.type: "sink" +spec: + definition: + title: "AWS DynamoDB Sink" + description: |- + Send data to AWS DynamoDB service. The sent data will insert/update/delete an item on the given AWS DynamoDB table. + + Access Key/Secret Key are the basic method for authenticating to the AWS DynamoDB service. These parameters are optional, because the Kamelet provide also the 'useDefaultCredentialsProvider'. + + When using a default Credentials Provider the AWS DynamoDB client will load the credentials through this provider and won't use the static credential. This is reason for not having the access key and secret key as mandatory parameter for this Kamelet. + + This Kamelet expects a JSON as body. The mapping between the JSON fields and table attribute values is done by key, so if you have the input: + + '{"username":"oscerd", "city":"Rome"}' + + The Kamelet will insert/update an item in the given AWS DynamoDB table and set the attributes 'username' and 'city' respectively. Please note that the JSON object must include the primary key values that define the item. + required: + - table + - region + type: object + properties: + table: + title: Table + description: Name of the DynamoDB table to look at + type: string + accessKey: + title: Access Key + description: The access key obtained from AWS + type: string + format: password + x-descriptors: + - urn:alm:descriptor:com.tectonic.ui:password + - urn:camel:group:credentials + secretKey: + title: Secret Key + description: The secret key obtained from AWS + type: string + format: password + x-descriptors: + - urn:alm:descriptor:com.tectonic.ui:password + - urn:camel:group:credentials + region: + title: AWS Region + description: The AWS region to connect to + type: string + example: eu-west-1 + operation: + title: Operation + description: The operation to perform (one of PutItem, UpdateItem, DeleteItem) + type: string + default: PutItem + example: PutItem + writeCapacity: + title: Write Capacity + description: The provisioned throughput to reserved for writing resources to your table + type: integer + default: 1 + useDefaultCredentialsProvider: + title: Default Credentials Provider + description: Set whether the S3 client should expect to load credentials through a default credentials provider or to expect static credentials to be passed in. + type: boolean + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + default: false + uriEndpointOverride: + title: Overwritte Endpoint URI + description: Set the overriding endpoint URI. This option needs to be used in combination with overrideEndpoint option. + type: string + overrideEndpoint: + title: Endpoint Overwrite + description: Set the need for overiding the endpoint URI. This option needs to be used in combination with uriEndpointOverride setting. + type: boolean + x-descriptors: + - 'urn:alm:descriptor:com.tectonic.ui:checkbox' + default: false + types: + in: + mediaType: application/json + dependencies: + - github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT + - "camel:core" + - "camel:jackson" + - "camel:aws2-ddb" + - "camel:kamelet" + template: + from: + uri: "kamelet:source" + steps: + - set-property: + name: operation + constant: "{{operation}}" + - unmarshal: + json: + library: Jackson + unmarshalType: com.fasterxml.jackson.databind.JsonNode + - bean: "org.apache.camel.kamelets.utils.transform.aws.ddb.JsonToDdbModelConverter" + - to: + uri: "aws2-ddb:{{table}}" + parameters: + secretKey: "{{?secretKey}}" + accessKey: "{{?accessKey}}" + region: "{{region}}" + operation: "{{operation}}" + writeCapacity: "{{?writeCapacity}}" + useDefaultCredentialsProvider: "{{useDefaultCredentialsProvider}}" + uriEndpointOverride: "{{?uriEndpointOverride}}" + overrideEndpoint: "{{overrideEndpoint}}" diff --git a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml index 512e13987..31e62ef6e 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-streams-source.kamelet.yaml @@ -34,7 +34,7 @@ spec: Receive events from AWS DynamoDB Streams. Access Key/Secret Key are the basic method for authenticating to the AWS DynamoDB Streams Service. These parameters are optional, because the Kamelet provide also the 'useDefaultCredentialsProvider'. - + When using a default Credentials Provider the AWS DynamoDB Streams client will load the credentials through this provider and won't use the static credential. This is reason for not having the access key and secret key as mandatory parameter for this Kamelet. required: - table @@ -103,6 +103,6 @@ spec: useDefaultCredentialsProvider: "{{useDefaultCredentialsProvider}}" steps: - marshal: - json: + json: library: Gson - to: "kamelet:sink" diff --git a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml index 439777d92..ce9439ae8 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml @@ -17,7 +17,7 @@ spec: Receive data from AWS S3 Bucket. Access Key/Secret Key are the basic method for authenticating to the AWS S3 Service. These parameters are optional, because the Kamelet provide also the 'useDefaultCredentialsProvider'. - + When using a default Credentials Provider the S3 client will load the credentials through this provider and won't use the static credential. This is reason for not having the access key and secret key as mandatory parameter for this Kamelet. required: - bucketNameOrArn @@ -58,14 +58,14 @@ spec: example: eu-west-1 autoCreateBucket: title: Autocreate Bucket - description: Setting the autocreation of the S3 bucket bucketName. + description: Setting the autocreation of the S3 bucket bucketName. type: boolean x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' default: false includeBody: title: Include Body - description: If it is true, the exchange will be consumed and put into the body and closed. If false the S3Object stream will be put raw into the body and the headers will be set with the S3 object metadata. + description: If it is true, the exchange will be consumed and put into the body and closed. If false the S3Object stream will be put raw into the body and the headers will be set with the S3 object metadata. type: boolean x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' diff --git a/test/aws-ddb-sink/amazonDDBClient.groovy b/test/aws-ddb-sink/amazonDDBClient.groovy new file mode 100644 index 000000000..dc0b2a8bc --- /dev/null +++ b/test/aws-ddb-sink/amazonDDBClient.groovy @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.dynamodb.DynamoDbClient +import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition +import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement +import software.amazon.awssdk.services.dynamodb.model.KeyType +import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType + +DynamoDbClient amazonDDBClient = DynamoDbClient + .builder() + .endpointOverride(URI.create("${YAKS_TESTCONTAINERS_LOCALSTACK_DYNAMODB_URL}")) + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create( + "${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}", + "${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}") + )) + .region(Region.of("${YAKS_TESTCONTAINERS_LOCALSTACK_REGION}")) + .build() + +amazonDDBClient.createTable(b -> { + b.tableName("${aws.ddb.tableName}") + b.keySchema( + KeySchemaElement.builder().attributeName("id").keyType(KeyType.HASH).build(), + ) + b.attributeDefinitions( + AttributeDefinition.builder().attributeName("id").attributeType(ScalarAttributeType.N).build(), + ) + b.provisionedThroughput( + ProvisionedThroughput.builder() + .readCapacityUnits(1L) + .writeCapacityUnits(1L).build()) +}) + +return amazonDDBClient diff --git a/test/aws-ddb-sink/aws-ddb-sink-binding.yaml b/test/aws-ddb-sink/aws-ddb-sink-binding.yaml new file mode 100644 index 000000000..dd0485607 --- /dev/null +++ b/test/aws-ddb-sink/aws-ddb-sink-binding.yaml @@ -0,0 +1,50 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: aws-ddb-sink-binding +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: timer-source + properties: + period: ${timer.source.period} + message: '${aws.ddb.json.data}' + steps: + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: log-sink + propeties: + showHeaders: true + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: aws-ddb-sink + properties: + table: ${aws.ddb.tableName} + operation: ${aws.ddb.operation} + overrideEndpoint: true + uriEndpointOverride: ${YAKS_TESTCONTAINERS_LOCALSTACK_DYNAMODB_URL} + accessKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY} + secretKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY} + region: ${YAKS_TESTCONTAINERS_LOCALSTACK_REGION} diff --git a/test/aws-ddb-sink/aws-ddb-sink-deleteItem.feature b/test/aws-ddb-sink/aws-ddb-sink-deleteItem.feature new file mode 100644 index 000000000..48f0a8197 --- /dev/null +++ b/test/aws-ddb-sink/aws-ddb-sink-deleteItem.feature @@ -0,0 +1,48 @@ +Feature: AWS DDB Sink - DeleteItem + + Background: + Given Kamelet aws-ddb-sink is available + Given Camel K resource polling configuration + | maxAttempts | 200 | + | delayBetweenAttempts | 2000 | + Given variables + | timer.source.period | 10000 | + | aws.ddb.operation | DeleteItem | + | aws.ddb.tableName | movies | + | aws.ddb.item.id | 1 | + | aws.ddb.item.year | 1985 | + | aws.ddb.item.title | Back to the future | + | aws.ddb.json.data | {"id": ${aws.ddb.item.id}} | + + Scenario: Start LocalStack container + Given Enable service DYNAMODB + Given start LocalStack container + And log 'Started LocalStack container: ${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}' + + Scenario: Create AWS-DDB client + Given New global Camel context + Given load to Camel registry amazonDDBClient.groovy + + Scenario: Create item on AWS-DDB + Given run script putItem.groovy + Given variables + | aws.ddb.items | [{year=AttributeValue(N=${aws.ddb.item.year}), id=AttributeValue(N=${aws.ddb.item.id}), title=AttributeValue(S=${aws.ddb.item.title})}] | + Then run script verifyItems.groovy + + Scenario: Create AWS-DDB Kamelet sink binding + When load KameletBinding aws-ddb-sink-binding.yaml + And KameletBinding aws-ddb-sink-binding is available + And Camel K integration aws-ddb-sink-binding is running + And Camel K integration aws-ddb-sink-binding should print Routes startup + Then sleep 10sec + + Scenario: Verify Kamelet sink + Given variables + | aws.ddb.items | [] | + Then run script verifyItems.groovy + + Scenario: Remove Camel K resources + Given delete KameletBinding aws-ddb-sink-binding + + Scenario: Stop container + Given stop LocalStack container diff --git a/test/aws-ddb-sink/aws-ddb-sink-putItem.feature b/test/aws-ddb-sink/aws-ddb-sink-putItem.feature new file mode 100644 index 000000000..d01d5ce7c --- /dev/null +++ b/test/aws-ddb-sink/aws-ddb-sink-putItem.feature @@ -0,0 +1,41 @@ +Feature: AWS DDB Sink - PutItem + + Background: + Given Kamelet aws-ddb-sink is available + Given Camel K resource polling configuration + | maxAttempts | 200 | + | delayBetweenAttempts | 2000 | + Given variables + | timer.source.period | 10000 | + | aws.ddb.operation | PutItem | + | aws.ddb.tableName | movies | + | aws.ddb.item.id | 1 | + | aws.ddb.item.year | 1977 | + | aws.ddb.item.title | Star Wars IV | + | aws.ddb.json.data | { "id":${aws.ddb.item.id}, "year":${aws.ddb.item.year}, "title":"${aws.ddb.item.title}" } | + | aws.ddb.items | [{year=AttributeValue(N=${aws.ddb.item.year}), id=AttributeValue(N=${aws.ddb.item.id}), title=AttributeValue(S=${aws.ddb.item.title})}] | + + Scenario: Start LocalStack container + Given Enable service DYNAMODB + Given start LocalStack container + And log 'Started LocalStack container: ${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}' + + Scenario: Create AWS-DDB client + Given New global Camel context + Given load to Camel registry amazonDDBClient.groovy + + Scenario: Create AWS-DDB Kamelet sink binding + When load KameletBinding aws-ddb-sink-binding.yaml + And KameletBinding aws-ddb-sink-binding is available + And Camel K integration aws-ddb-sink-binding is running + And Camel K integration aws-ddb-sink-binding should print Routes startup + Then sleep 10sec + + Scenario: Verify Kamelet sink + Then run script verifyItems.groovy + + Scenario: Remove Camel K resources + Given delete KameletBinding aws-ddb-sink-binding + + Scenario: Stop container + Given stop LocalStack container diff --git a/test/aws-ddb-sink/aws-ddb-sink-updateItem.feature b/test/aws-ddb-sink/aws-ddb-sink-updateItem.feature new file mode 100644 index 000000000..87fe078ed --- /dev/null +++ b/test/aws-ddb-sink/aws-ddb-sink-updateItem.feature @@ -0,0 +1,51 @@ +Feature: AWS DDB Sink - UpdateItem + + Background: + Given Kamelet aws-ddb-sink is available + Given Camel K resource polling configuration + | maxAttempts | 200 | + | delayBetweenAttempts | 2000 | + Given variables + | timer.source.period | 10000 | + | aws.ddb.operation | UpdateItem | + | aws.ddb.tableName | movies | + | aws.ddb.item.id | 1 | + | aws.ddb.item.year | 1933 | + | aws.ddb.item.title | King Kong | + | aws.ddb.item.title.new | King Kong - Historical | + | aws.ddb.item.directors | ["Merian C. Cooper", "Ernest B. Schoedsack"] | + | aws.ddb.json.data | { "key": {"id": ${aws.ddb.item.id}}, "item": {"title": "${aws.ddb.item.title.new}", "year": ${aws.ddb.item.year}, "directors": ${aws.ddb.item.directors}} } | + + Scenario: Start LocalStack container + Given Enable service DYNAMODB + Given start LocalStack container + And log 'Started LocalStack container: ${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}' + + Scenario: Create AWS-DDB client + Given New global Camel context + Given load to Camel registry amazonDDBClient.groovy + + Scenario: Create item on AWS-DDB + Given run script putItem.groovy + Given variables + | aws.ddb.items | [{year=AttributeValue(N=${aws.ddb.item.year}), id=AttributeValue(N=${aws.ddb.item.id}), title=AttributeValue(S=${aws.ddb.item.title})}] | + Then run script verifyItems.groovy + + Scenario: Create AWS-DDB Kamelet sink binding + When load KameletBinding aws-ddb-sink-binding.yaml + And KameletBinding aws-ddb-sink-binding is available + And Camel K integration aws-ddb-sink-binding is running + And Camel K integration aws-ddb-sink-binding should print Routes startup + Then sleep 10sec + + Scenario: Verify Kamelet sink + Given variables + | aws.ddb.item.directors | [Ernest B. Schoedsack, Merian C. Cooper] | + | aws.ddb.items | [{year=AttributeValue(N=${aws.ddb.item.year}), directors=AttributeValue(SS=${aws.ddb.item.directors}), id=AttributeValue(N=${aws.ddb.item.id}), title=AttributeValue(S=${aws.ddb.item.title.new})}] | + Then run script verifyItems.groovy + + Scenario: Remove Camel K resources + Given delete KameletBinding aws-ddb-sink-binding + + Scenario: Stop container + Given stop LocalStack container diff --git a/test/aws-ddb-sink/putItem.groovy b/test/aws-ddb-sink/putItem.groovy new file mode 100644 index 000000000..fd482f903 --- /dev/null +++ b/test/aws-ddb-sink/putItem.groovy @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import software.amazon.awssdk.services.dynamodb.model.AttributeValue +import software.amazon.awssdk.services.dynamodb.model.ReturnValue + +Map item = new HashMap<>() +item.put("id", AttributeValue.builder().n("${aws.ddb.item.id}").build()) +item.put("year", AttributeValue.builder().n("${aws.ddb.item.year}").build()) +item.put("title", AttributeValue.builder().s("${aws.ddb.item.title}").build()) + +amazonDDBClient.putItem(b -> { + b.tableName("${aws.ddb.tableName}") + b.item(item) + b.returnValues(ReturnValue.ALL_OLD) +}) diff --git a/test/aws-ddb-sink/verifyItems.groovy b/test/aws-ddb-sink/verifyItems.groovy new file mode 100644 index 000000000..b6e9d27ce --- /dev/null +++ b/test/aws-ddb-sink/verifyItems.groovy @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +assert "${aws.ddb.items}".equals(amazonDDBClient.scan(b -> b.tableName("${aws.ddb.tableName}"))?.items()?.toString()) diff --git a/test/aws-ddb-sink/yaks-config.yaml b/test/aws-ddb-sink/yaks-config.yaml new file mode 100644 index 000000000..6118b7b89 --- /dev/null +++ b/test/aws-ddb-sink/yaks-config.yaml @@ -0,0 +1,71 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +config: + namespace: + temporary: false + runtime: + testcontainers: + enabled: true + env: + - name: YAKS_CAMEL_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_KAMELETS_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_TESTCONTAINERS_AUTO_REMOVE_RESOURCES + value: false + - name: CITRUS_TYPE_CONVERTER + value: camel + resources: + - putItem.groovy + - verifyItems.groovy + - amazonDDBClient.groovy + - aws-ddb-sink-binding.yaml + cucumber: + tags: + - "not @ignored" + settings: + dependencies: + - groupId: com.amazonaws + artifactId: aws-java-sdk-dynamodb + version: "@aws-java-sdk.version@" + - groupId: org.apache.camel + artifactId: camel-aws2-ddb + version: "@camel.version@" + - groupId: org.apache.camel + artifactId: camel-jackson + version: "@camel.version@" +pre: + - name: installation + if: env:CI=true + run: | + # Install required Kamelets (these steps may be done globally in future versions) + + kamel install -n $YAKS_NAMESPACE -w + kubectl delete kamelet -n $YAKS_NAMESPACE --all + kubectl apply -f ../../kamelets/timer-source.kamelet.yaml -n $YAKS_NAMESPACE + kubectl apply -f ../../kamelets/log-sink.kamelet.yaml -n $YAKS_NAMESPACE + kubectl apply -f ../../kamelets/aws-ddb-sink.kamelet.yaml -n $YAKS_NAMESPACE +post: + - name: dump + if: env:CI=true + run: | + kamel dump -n $YAKS_NAMESPACE $(basename `pwd`)-dump.log + mkdir -p /tmp/dumps + cp *-dump.log /tmp/dumps diff --git a/test/earthquake-source/earthquake-source.feature b/test/earthquake-source/earthquake-source.feature index e18a2cf14..3a7c1805a 100644 --- a/test/earthquake-source/earthquake-source.feature +++ b/test/earthquake-source/earthquake-source.feature @@ -3,7 +3,7 @@ Feature: Kamelet earthquake-source works Background: Given Disable auto removal of Kamelet resources Given Disable auto removal of Kubernetes resources - Given Camel-K resource polling configuration + Given Camel K resource polling configuration | maxAttempts | 60 | | delayBetweenAttempts | 3000 | diff --git a/test/insert-field-action/insert-field-action.feature b/test/insert-field-action/insert-field-action.feature index b0f0ca515..850903e2b 100644 --- a/test/insert-field-action/insert-field-action.feature +++ b/test/insert-field-action/insert-field-action.feature @@ -3,13 +3,13 @@ Feature: Timer Source Kamelet Background: Given Disable auto removal of Kamelet resources Given Disable auto removal of Kubernetes resources - Given Camel-K resource polling configuration + Given Camel K resource polling configuration | maxAttempts | 60 | | delayBetweenAttempts | 3000 | Scenario: Wait for binding to start Given create Kubernetes service probe-service with target port 8080 - Then Camel-K integration insert-field-action-binding should be running + Then Camel K integration insert-field-action-binding should be running Scenario: Verify binding Given HTTP server "probe-service" diff --git a/test/mail-sink/mail-sink.feature b/test/mail-sink/mail-sink.feature index 489246a26..bb889d675 100644 --- a/test/mail-sink/mail-sink.feature +++ b/test/mail-sink/mail-sink.feature @@ -1,7 +1,7 @@ Feature: Mail Sink Background: - Given Camel-K resource polling configuration + Given Camel K resource polling configuration | maxAttempts | 200 | | delayBetweenAttempts | 2000 | Given variables @@ -17,11 +17,11 @@ Feature: Mail Sink Given load endpoint mail-server.groovy Given create Kubernetes service mail-server with port mapping 25:22222 - Scenario: Create Camel-K resources + Scenario: Create Camel K resources Given Kamelet mail-sink is available Given Kamelet timer-source is available Given load KameletBinding timer-to-mail.yaml - And Camel-K integration timer-to-mail should be running + And Camel K integration timer-to-mail should be running Scenario: Verify mail message sent Then endpoint mail-server should receive body @@ -41,6 +41,6 @@ Feature: Mail Sink } """ - Scenario: Remove Camel-K resources + Scenario: Remove Camel K resources Given delete KameletBinding timer-to-mail And delete Kubernetes service mail-server diff --git a/test/mail-sink/yaks-config.yaml b/test/mail-sink/yaks-config.yaml index c2e8d9570..910c83a29 100644 --- a/test/mail-sink/yaks-config.yaml +++ b/test/mail-sink/yaks-config.yaml @@ -16,6 +16,8 @@ # --------------------------------------------------------------------------- config: + namespace: + temporary: true runtime: env: - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES diff --git a/test/timer-source/timer-source.feature b/test/timer-source/timer-source.feature index 42847258c..6917806af 100644 --- a/test/timer-source/timer-source.feature +++ b/test/timer-source/timer-source.feature @@ -3,7 +3,7 @@ Feature: Timer Source Kamelet Background: Given Disable auto removal of Kamelet resources Given Disable auto removal of Kubernetes resources - Given Camel-K resource polling configuration + Given Camel K resource polling configuration | maxAttempts | 20 | | delayBetweenAttempts | 1000 |