From a9994f4982a9987b1113cee3dcbc8320705faf57 Mon Sep 17 00:00:00 2001 From: Christoph Deppisch Date: Mon, 14 Nov 2022 13:15:00 +0100 Subject: [PATCH] chore: Introduce Kamelet input/output data types - Introduce data type converters - Add data type processor to auto convert exchange message from/to given data type - Let user choose which data type to use (via Kamelet property) - Add data type registry and annotation based loader to find data type implementations by component scheme and name --- .github/workflows/yaks-tests.yaml | 7 +- kamelets/aws-ddb-sink.kamelet.yaml | 27 ++- kamelets/aws-s3-source.kamelet.yaml | 17 ++ library/camel-kamelets-utils/pom.xml | 7 +- .../format/AnnotationDataTypeLoader.java | 152 +++++++++++++++++ .../utils/format/DataTypeProcessor.java | 67 ++++++++ .../format/DefaultDataTypeConverter.java | 54 ++++++ .../utils/format/DefaultDataTypeRegistry.java | 154 ++++++++++++++++++ .../aws2/ddb/Ddb2JsonInputType.java} | 87 +++++++--- .../aws2/s3/AWS2S3BinaryOutputType.java | 55 +++++++ .../aws2/s3/AWS2S3JsonOutputType.java | 63 +++++++ .../converter/standard/JsonModelDataType.java | 66 ++++++++ .../utils/format/spi/DataTypeConverter.java | 39 +++++ .../utils/format/spi/DataTypeLoader.java | 31 ++++ .../utils/format/spi/DataTypeRegistry.java | 60 +++++++ .../format/spi/annotations/DataType.java | 51 ++++++ .../services/org/apache/camel/DataType | 20 +++ .../format/DefaultDataTypeRegistryTest.java | 57 +++++++ .../aws2/ddb/Ddb2JsonInputTypeTest.java} | 104 ++++++++---- .../aws2/s3/AWS2S3JsonOutputTypeTest.java | 98 +++++++++++ .../standard/JsonModelDataTypeTest.java | 84 ++++++++++ .../src/test/resources/log4j2-test.xml | 32 ++++ .../kamelets/aws-ddb-sink.kamelet.yaml | 27 ++- .../kamelets/aws-s3-source.kamelet.yaml | 17 ++ test/aws-s3/README.md | 76 +++++++++ test/aws-s3/amazonS3Client.groovy | 36 ++++ test/aws-s3/aws-s3-credentials.properties | 7 + test/aws-s3/aws-s3-inmem-binding.feature | 49 ++++++ .../aws-s3-source-property-conf.feature | 37 +++++ test/aws-s3/aws-s3-source-secret-conf.feature | 39 +++++ test/aws-s3/aws-s3-source-uri-conf.feature | 32 ++++ test/aws-s3/aws-s3-to-inmem.yaml | 39 +++++ test/aws-s3/aws-s3-to-log-secret-based.groovy | 21 +++ test/aws-s3/aws-s3-to-log-uri-based.groovy | 29 ++++ test/aws-s3/aws-s3-uri-binding.feature | 35 ++++ test/aws-s3/aws-s3-uri-binding.yaml | 37 +++++ test/aws-s3/yaks-config.yaml | 65 ++++++++ test/utils/inmem-to-log.yaml | 29 ++++ 38 files changed, 1831 insertions(+), 76 deletions(-) create mode 100644 library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/AnnotationDataTypeLoader.java create mode 100644 library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java create mode 100644 library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverter.java create mode 100644 library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java rename library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/{transform/aws/ddb/JsonToDdbModelConverter.java => format/converter/aws2/ddb/Ddb2JsonInputType.java} (69%) create mode 100644 library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3BinaryOutputType.java create mode 100644 library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputType.java create mode 100644 library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java create mode 100644 library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeConverter.java create mode 100644 library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeLoader.java create mode 100644 library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeRegistry.java create mode 100644 library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/annotations/DataType.java create mode 100644 library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType create mode 100644 library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java rename library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/{transform/aws/ddb/JsonToDdbModelConverterTest.java => format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java} (65%) create mode 100644 library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputTypeTest.java create mode 100644 library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java create mode 100644 library/camel-kamelets-utils/src/test/resources/log4j2-test.xml create mode 100644 test/aws-s3/README.md create mode 100644 test/aws-s3/amazonS3Client.groovy create mode 100644 test/aws-s3/aws-s3-credentials.properties create mode 100644 test/aws-s3/aws-s3-inmem-binding.feature create mode 100644 test/aws-s3/aws-s3-source-property-conf.feature create mode 100644 test/aws-s3/aws-s3-source-secret-conf.feature create mode 100644 test/aws-s3/aws-s3-source-uri-conf.feature create mode 100644 test/aws-s3/aws-s3-to-inmem.yaml create mode 100644 test/aws-s3/aws-s3-to-log-secret-based.groovy create mode 100644 test/aws-s3/aws-s3-to-log-uri-based.groovy create mode 100644 test/aws-s3/aws-s3-uri-binding.feature create mode 100644 test/aws-s3/aws-s3-uri-binding.yaml create mode 100644 test/aws-s3/yaks-config.yaml create mode 100644 test/utils/inmem-to-log.yaml diff --git a/.github/workflows/yaks-tests.yaml b/.github/workflows/yaks-tests.yaml index 8dd7007d8..42a9a3f29 100644 --- a/.github/workflows/yaks-tests.yaml +++ b/.github/workflows/yaks-tests.yaml @@ -42,8 +42,8 @@ concurrency: env: CAMEL_K_VERSION: 1.8.2 - YAKS_VERSION: 0.9.0-202203140033 - YAKS_IMAGE_NAME: "docker.io/yaks/yaks" + YAKS_VERSION: 0.11.0 + YAKS_IMAGE_NAME: "docker.io/citrusframework/yaks" YAKS_RUN_OPTIONS: "--timeout=15m" jobs: @@ -75,7 +75,7 @@ jobs: rm -r _kamel - name: Get YAKS CLI run: | - curl --fail -L --silent https://github.com/citrusframework/yaks/releases/download/${YAKS_VERSION}/yaks-${YAKS_VERSION}-linux-64bit.tar.gz -o yaks.tar.gz + curl --fail -L --silent https://github.com/citrusframework/yaks/releases/download/v${YAKS_VERSION}/yaks-${YAKS_VERSION}-linux-64bit.tar.gz -o yaks.tar.gz mkdir -p _yaks tar -zxf yaks.tar.gz --directory ./_yaks sudo mv ./_yaks/yaks /usr/local/bin/ @@ -110,6 +110,7 @@ jobs: run: | echo "Running tests" yaks run test/aws-ddb-sink $YAKS_RUN_OPTIONS + yaks run test/aws-s3 $YAKS_RUN_OPTIONS yaks run test/extract-field-action $YAKS_RUN_OPTIONS yaks run test/insert-field-action $YAKS_RUN_OPTIONS yaks run test/mail-sink $YAKS_RUN_OPTIONS diff --git a/kamelets/aws-ddb-sink.kamelet.yaml b/kamelets/aws-ddb-sink.kamelet.yaml index 5b603abfc..0a77faf15 100644 --- a/kamelets/aws-ddb-sink.kamelet.yaml +++ b/kamelets/aws-ddb-sink.kamelet.yaml @@ -97,6 +97,12 @@ spec: x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' default: false + inputFormat: + title: Input type + description: Specify the input type for this Kamelet. The Kamelet will automatically apply conversion logic in order to transform message content to this data type. + type: string + default: json + example: json types: in: mediaType: application/json @@ -107,17 +113,24 @@ spec: - "camel:aws2-ddb" - "camel:kamelet" template: + beans: + - name: dataTypeRegistry + type: "#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry" + - name: inputTypeProcessor + type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor" + property: + - key: scheme + value: 'aws2-ddb' + - key: format + value: '{{inputFormat}}' from: uri: "kamelet:source" steps: - set-property: - name: operation - constant: "{{operation}}" - - unmarshal: - json: - library: Jackson - unmarshalType: com.fasterxml.jackson.databind.JsonNode - - bean: "org.apache.camel.kamelets.utils.transform.aws.ddb.JsonToDdbModelConverter" + name: operation + constant: "{{operation}}" + - process: + ref: "{{inputTypeProcessor}}" - to: uri: "aws2-ddb:{{table}}" parameters: diff --git a/kamelets/aws-s3-source.kamelet.yaml b/kamelets/aws-s3-source.kamelet.yaml index 6ab2bca41..858bae82a 100644 --- a/kamelets/aws-s3-source.kamelet.yaml +++ b/kamelets/aws-s3-source.kamelet.yaml @@ -107,6 +107,12 @@ spec: description: The number of milliseconds before the next poll of the selected bucket. type: integer default: 500 + outputFormat: + title: Output type + description: Choose the output type for this Kamelet. The Kamelet supports different output types and performs automatic message conversion according to this data type. + type: string + default: binary + example: binary dependencies: - "camel:core" - "camel:aws2-s3" @@ -114,6 +120,15 @@ spec: - "camel:kamelet" template: beans: + - name: dataTypeRegistry + type: "#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry" + - name: outputTypeProcessor + type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor" + property: + - key: scheme + value: 'aws2-s3' + - key: format + value: '{{outputFormat}}' - name: renameHeaders type: "#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders" property: @@ -143,4 +158,6 @@ spec: steps: - process: ref: "{{renameHeaders}}" + - process: + ref: "{{outputTypeProcessor}}" - to: "kamelet:sink" diff --git a/library/camel-kamelets-utils/pom.xml b/library/camel-kamelets-utils/pom.xml index 4f848d36c..5b1441f31 100644 --- a/library/camel-kamelets-utils/pom.xml +++ b/library/camel-kamelets-utils/pom.xml @@ -71,12 +71,17 @@ camel-kafka - + org.apache.camel camel-aws2-ddb provided + + org.apache.camel + camel-aws2-s3 + provided + diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/AnnotationDataTypeLoader.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/AnnotationDataTypeLoader.java new file mode 100644 index 000000000..96ca50eb9 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/AnnotationDataTypeLoader.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.Set; + +import org.apache.camel.TypeConverterLoaderException; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.apache.camel.kamelets.utils.format.spi.DataTypeLoader; +import org.apache.camel.kamelets.utils.format.spi.DataTypeRegistry; +import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; +import org.apache.camel.spi.Injector; +import org.apache.camel.spi.PackageScanClassResolver; +import org.apache.camel.util.IOHelper; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Data type loader scans packages for {@link DataTypeConverter} classes annotated with {@link DataType} annotation. + */ +public class AnnotationDataTypeLoader implements DataTypeLoader { + + public static final String META_INF_SERVICES = "META-INF/services/org/apache/camel/DataType"; + + private static final Logger LOG = LoggerFactory.getLogger(AnnotationDataTypeLoader.class); + + protected final PackageScanClassResolver resolver; + protected final Injector injector; + + protected Set> visitedClasses = new HashSet<>(); + protected Set visitedURIs = new HashSet<>(); + + public AnnotationDataTypeLoader(Injector injector, PackageScanClassResolver resolver) { + this.injector = injector; + this.resolver = resolver; + } + + @Override + public void load(DataTypeRegistry registry) { + Set packages = new HashSet<>(); + + LOG.trace("Searching for {} services", META_INF_SERVICES); + try { + ClassLoader ccl = Thread.currentThread().getContextClassLoader(); + if (ccl != null) { + findPackages(packages, ccl); + } + findPackages(packages, getClass().getClassLoader()); + if (packages.isEmpty()) { + LOG.debug("No package names found to be used for classpath scanning for annotated data types."); + return; + } + } catch (Exception e) { + throw new TypeConverterLoaderException( + "Cannot find package names to be used for classpath scanning for annotated data types.", e); + } + + // if there is any packages to scan and load @DataType classes, then do it + if (LOG.isTraceEnabled()) { + LOG.trace("Found data type packages to scan: {}", String.join(", ", packages)); + } + Set> scannedClasses = resolver.findAnnotated(DataType.class, packages.toArray(new String[]{})); + if (!scannedClasses.isEmpty()) { + LOG.debug("Found {} packages with {} @DataType classes to load", packages.size(), scannedClasses.size()); + + // load all the found classes into the type data type registry + for (Class type : scannedClasses) { + if (acceptClass(type)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Loading data type annotation: {}", ObjectHelper.name(type)); + } + loadDataType(registry, type); + } + } + } + + // now clear the maps so we do not hold references + visitedClasses.clear(); + visitedURIs.clear(); + } + + private void loadDataType(DataTypeRegistry registry, Class type) { + if (visitedClasses.contains(type)) { + return; + } + visitedClasses.add(type); + + try { + if (DataTypeConverter.class.isAssignableFrom(type) && type.isAnnotationPresent(DataType.class)) { + DataType dt = type.getAnnotation(DataType.class); + DataTypeConverter converter = (DataTypeConverter) injector.newInstance(type); + registry.addDataTypeConverter(dt.scheme(), converter); + } + } catch (NoClassDefFoundError e) { + LOG.debug("Ignoring converter type: {} as a dependent class could not be found: {}", + type.getCanonicalName(), e, e); + } + } + + protected boolean acceptClass(Class type) { + return true; + } + + protected void findPackages(Set packages, ClassLoader classLoader) throws IOException { + Enumeration resources = classLoader.getResources(META_INF_SERVICES); + while (resources.hasMoreElements()) { + URL url = resources.nextElement(); + String path = url.getPath(); + if (!visitedURIs.contains(path)) { + // remember we have visited this uri so we wont read it twice + visitedURIs.add(path); + LOG.debug("Loading file {} to retrieve list of packages, from url: {}", META_INF_SERVICES, url); + try (BufferedReader reader = IOHelper.buffered(new InputStreamReader(url.openStream(), StandardCharsets.UTF_8))) { + while (true) { + String line = reader.readLine(); + if (line == null) { + break; + } + line = line.trim(); + if (line.startsWith("#") || line.length() == 0) { + continue; + } + packages.add(line); + } + } + } + } + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java new file mode 100644 index 000000000..859269fe4 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DataTypeProcessor.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format; + +import org.apache.camel.BeanInject; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +/** + * Processor applies data type conversion based on given format name. Searches for matching data type converter + * with given component scheme and format name. + */ +public class DataTypeProcessor implements Processor, CamelContextAware { + + private CamelContext camelContext; + + @BeanInject + private DefaultDataTypeRegistry dataTypeRegistry; + + private String scheme; + private String format; + + @Override + public void process(Exchange exchange) throws Exception { + if (format == null || format.isEmpty()) { + return; + } + + dataTypeRegistry.lookup(scheme, format) + .ifPresent(converter -> converter.convert(exchange)); + } + + public void setFormat(String format) { + this.format = format; + } + + public void setScheme(String scheme) { + this.scheme = scheme; + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverter.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverter.java new file mode 100644 index 000000000..11680b50b --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format; + +import org.apache.camel.Exchange; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; + +/** + * Default data type converter receives a name and a target type in order to use traditional exchange body conversion + * mechanisms in order to transform the message body to a given type. + */ +public class DefaultDataTypeConverter implements DataTypeConverter { + + private final String name; + private final Class type; + + public DefaultDataTypeConverter(String name, Class type) { + this.name = name; + this.type = type; + } + + @Override + public void convert(Exchange exchange) { + if (type.isInstance(exchange.getMessage().getBody())) { + return; + } + + exchange.getMessage().setBody(exchange.getMessage().getBody(type)); + } + + @Override + public String getName() { + return name; + } + + public Class getType() { + return type; + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java new file mode 100644 index 000000000..e7c6e3e87 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.impl.engine.DefaultInjector; +import org.apache.camel.impl.engine.DefaultPackageScanClassResolver; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.apache.camel.kamelets.utils.format.spi.DataTypeLoader; +import org.apache.camel.kamelets.utils.format.spi.DataTypeRegistry; +import org.apache.camel.spi.PackageScanClassResolver; +import org.apache.camel.support.service.ServiceSupport; + +/** + * Default data type registry able to resolve data types converters in the project. Data types may be defined at the component level + * via {@link org.apache.camel.kamelets.utils.format.spi.annotations.DataType} annotations. Also, users can add data types directly + * to the Camel context or manually to the registry. + * + * The registry is able to retrieve converters for a given data type based on the component scheme and the given data type name. + */ +public class DefaultDataTypeRegistry extends ServiceSupport implements DataTypeRegistry, CamelContextAware { + + private CamelContext camelContext; + + private PackageScanClassResolver resolver; + + protected final List dataTypeLoaders = new ArrayList<>(); + + private final Map> dataTypeConverters = new HashMap<>(); + + @Override + public void addDataTypeConverter(String scheme, DataTypeConverter converter) { + this.getComponentDataTypeConverters(scheme).add(converter); + } + + @Override + public Optional lookup(String scheme, String name) { + if (dataTypeLoaders.isEmpty()) { + try { + doInit(); + } catch (Exception e) { + throw new RuntimeCamelException("Failed to initialize data type registry", e); + } + } + + if (name == null) { + return Optional.empty(); + } + + Optional componentDataTypeConverter = getComponentDataTypeConverters(scheme).stream() + .filter(dtc -> name.equals(dtc.getName())) + .findFirst(); + + if (componentDataTypeConverter.isPresent()) { + return componentDataTypeConverter; + } + + return getDefaultDataTypeConverter(name); + } + + @Override + protected void doInit() throws Exception { + super.doInit(); + + if (resolver == null) { + if (camelContext != null) { + resolver = camelContext.adapt(ExtendedCamelContext.class).getPackageScanClassResolver(); + } else { + resolver = new DefaultPackageScanClassResolver(); + } + } + + dataTypeLoaders.add(new AnnotationDataTypeLoader(new DefaultInjector(camelContext), resolver)); + + addDataTypeConverter(new DefaultDataTypeConverter("string", String.class)); + addDataTypeConverter(new DefaultDataTypeConverter("binary", byte[].class)); + + for (DataTypeLoader loader : dataTypeLoaders) { + CamelContextAware.trySetCamelContext(loader, getCamelContext()); + loader.load(this); + } + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + + this.dataTypeConverters.clear(); + } + + /** + * Retrieve default data output type from Camel context for given format name. + * @param name + * @return + */ + private Optional getDefaultDataTypeConverter(String name) { + Optional dataTypeConverter = getComponentDataTypeConverters("camel").stream() + .filter(dtc -> name.equals(dtc.getName())) + .findFirst(); + + if (dataTypeConverter.isPresent()) { + return dataTypeConverter; + } + + return Optional.ofNullable(camelContext.getRegistry().lookupByNameAndType(name, DataTypeConverter.class)); + } + + /** + * Retrieve list of data types defined on the component level for given scheme. + * @param scheme + * @return + */ + private List getComponentDataTypeConverters(String scheme) { + if (!dataTypeConverters.containsKey(scheme)) { + dataTypeConverters.put(scheme, new ArrayList<>()); + } + + return dataTypeConverters.get(scheme); + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java similarity index 69% rename from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java rename to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java index c5098c1c6..a15ff3a08 100644 --- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverter.java +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java @@ -14,22 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.kamelets.utils.transform.aws.ddb; +package org.apache.camel.kamelets.utils.format.converter.aws2.ddb; + +import java.io.InputStream; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.CamelExecutionException; import org.apache.camel.Exchange; -import org.apache.camel.ExchangeProperty; -import org.apache.camel.InvalidPayloadException; import org.apache.camel.component.aws2.ddb.Ddb2Constants; import org.apache.camel.component.aws2.ddb.Ddb2Operations; +import org.apache.camel.component.jackson.JacksonDataFormat; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; import software.amazon.awssdk.services.dynamodb.model.AttributeAction; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; @@ -40,55 +45,78 @@ * * Json property names map to attribute keys and Json property values map to attribute values. * - * During mapping the Json property types resolve to the respective attribute types ({@code String, StringSet, Boolean, Number, NumberSet, Map, Null}). - * Primitive typed arrays in Json get mapped to {@code StringSet} or {@code NumberSet} attribute values. + * During mapping the Json property types resolve to the respective attribute types + * ({@code String, StringSet, Boolean, Number, NumberSet, Map, Null}). Primitive typed arrays in Json get mapped to + * {@code StringSet} or {@code NumberSet} attribute values. + * + * The input type supports the operations: PutItem, UpdateItem, DeleteItem * * For PutItem operation the Json body defines all item attributes. * * For DeleteItem operation the Json body defines only the primary key attributes that identify the item to delete. * - * For UpdateItem operation the Json body defines both key attributes to identify the item to be updated and all item attributes tht get updated on the item. + * For UpdateItem operation the Json body defines both key attributes to identify the item to be updated and all item + * attributes tht get updated on the item. + * + * The given Json body can use "operation", "key" and "item" as top level properties. Both define a Json object that + * will be mapped to respective attribute value maps: * - * The given Json body can use "key" and "item" as top level properties. - * Both define a Json object that will be mapped to respective attribute value maps: - *
{@code
+ * 
+ * {@code
  * {
+ *   "operation": "PutItem"
  *   "key": {},
  *   "item": {}
  * }
  * }
  * 
- * The converter will extract the objects and set respective attribute value maps as header entries. - * This is a comfortable way to define different key and item attribute value maps e.g. on UpdateItem operation. * - * In case key and item attribute value maps are identical you can omit the special top level properties completely. - * The converter will map the whole Json body as is then and use it as source for the attribute value map. + * The converter will extract the objects and set respective attribute value maps as header entries. This is a + * comfortable way to define different key and item attribute value maps e.g. on UpdateItem operation. + * + * In case key and item attribute value maps are identical you can omit the special top level properties completely. The + * converter will map the whole Json body as is then and use it as source for the attribute value map. */ -public class JsonToDdbModelConverter { +@DataType(scheme = "aws2-ddb", name = "json") +public class Ddb2JsonInputType implements DataTypeConverter { + + private final JacksonDataFormat dataFormat = new JacksonDataFormat(new ObjectMapper(), JsonNode.class); - public String process(@ExchangeProperty("operation") String operation, Exchange exchange) throws InvalidPayloadException { + @Override + public void convert(Exchange exchange) { if (exchange.getMessage().getHeaders().containsKey(Ddb2Constants.ITEM) || exchange.getMessage().getHeaders().containsKey(Ddb2Constants.KEY)) { - return ""; + return; } - ObjectMapper mapper = new ObjectMapper(); + JsonNode jsonBody = getBodyAsJsonNode(exchange); + + String operation + = Optional.ofNullable(jsonBody.get("operation")).map(JsonNode::asText).orElse(Ddb2Operations.PutItem.name()); + if (exchange.hasProperties() && exchange.getProperty("operation", String.class) != null) { + operation = exchange.getProperty("operation", String.class); + } - JsonNode jsonBody = exchange.getMessage().getMandatoryBody(JsonNode.class); + if (exchange.getIn().getHeaders().containsKey(Ddb2Constants.OPERATION)) { + operation = exchange.getIn().getHeader(Ddb2Constants.OPERATION, Ddb2Operations.class).name(); + } JsonNode key = jsonBody.get("key"); JsonNode item = jsonBody.get("item"); Map keyProps; if (key != null) { - keyProps = mapper.convertValue(key, new TypeReference>(){}); + keyProps = dataFormat.getObjectMapper().convertValue(key, new TypeReference>() { + }); } else { - keyProps = mapper.convertValue(jsonBody, new TypeReference>(){}); + keyProps = dataFormat.getObjectMapper().convertValue(jsonBody, new TypeReference>() { + }); } Map itemProps; if (item != null) { - itemProps = mapper.convertValue(item, new TypeReference>(){}); + itemProps = dataFormat.getObjectMapper().convertValue(item, new TypeReference>() { + }); } else { itemProps = keyProps; } @@ -115,8 +143,18 @@ public String process(@ExchangeProperty("operation") String operation, Exchange default: throw new UnsupportedOperationException(String.format("Unsupported operation '%s'", operation)); } + } - return ""; + private JsonNode getBodyAsJsonNode(Exchange exchange) { + try { + if (exchange.getMessage().getBody() instanceof JsonNode) { + return exchange.getMessage().getMandatoryBody(JsonNode.class); + } + + return (JsonNode) dataFormat.unmarshal(exchange, exchange.getMessage().getMandatoryBody(InputStream.class)); + } catch (Exception e) { + throw new CamelExecutionException("Failed to get mandatory Json node from message body", exchange, e); + } } private void setHeaderIfNotPresent(String headerName, Object value, Exchange exchange) { @@ -165,11 +203,12 @@ private static AttributeValue getAttributeValue(Object value) { } if (value instanceof int[]) { - return AttributeValue.builder().ns(Stream.of((int[]) value).map(Object::toString).collect(Collectors.toList())).build(); + return AttributeValue.builder().ns(Stream.of((int[]) value).map(Object::toString).collect(Collectors.toList())) + .build(); } if (value instanceof List) { - List values = ((List) value); + List values = (List) value; if (values.isEmpty()) { return AttributeValue.builder().ss().build(); diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3BinaryOutputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3BinaryOutputType.java new file mode 100644 index 000000000..6065ebd10 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3BinaryOutputType.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.aws2.s3; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; +import software.amazon.awssdk.utils.IoUtils; + +/** + * Binary output type. + */ +@DataType(scheme = "aws2-s3", name = "binary") +public class AWS2S3BinaryOutputType implements DataTypeConverter { + + @Override + public void convert(Exchange exchange) { + if (exchange.getMessage().getBody() instanceof byte[]) { + return; + } + + try { + InputStream is = exchange.getMessage().getBody(InputStream.class); + if (is != null) { + exchange.getMessage().setBody(IoUtils.toByteArray(is)); + return; + } + + // Use default Camel converter utils to convert body to byte[] + exchange.getMessage().setBody(exchange.getMessage().getMandatoryBody(byte[].class)); + } catch (IOException | InvalidPayloadException e) { + throw new CamelExecutionException("Failed to convert AWS S3 body to byte[]", exchange, e); + } + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputType.java new file mode 100644 index 000000000..74736d675 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputType.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.aws2.s3; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Exchange; +import org.apache.camel.component.aws2.s3.AWS2S3Constants; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.utils.IoUtils; + +/** + * Json output data type represents file name as key and file content as Json structure. + *

+ * Example Json structure: { "key": "myFile.txt", "content": "Hello", } + */ +@DataType(scheme = "aws2-s3", name = "json") +public class AWS2S3JsonOutputType implements DataTypeConverter { + + private static final String TEMPLATE = "{" + + "\"key\": \"%s\", " + + "\"content\": \"%s\"" + + "}"; + + @Override + public void convert(Exchange exchange) { + String key = exchange.getMessage().getHeader(AWS2S3Constants.KEY, String.class); + + ResponseInputStream bodyInputStream = exchange.getMessage().getBody(ResponseInputStream.class); + if (bodyInputStream != null) { + try { + exchange.getMessage().setBody(String.format(TEMPLATE, key, IoUtils.toUtf8String(bodyInputStream))); + return; + } catch (IOException e) { + throw new CamelExecutionException("Failed to convert AWS S3 body to Json", exchange, e); + } + } + + byte[] bodyContent = exchange.getMessage().getBody(byte[].class); + if (bodyContent != null) { + exchange.getMessage().setBody(String.format(TEMPLATE, key, new String(bodyContent, StandardCharsets.UTF_8))); + } + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java new file mode 100644 index 000000000..047e6dd51 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.standard; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.component.jackson.JacksonDataFormat; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; + +/** + * Data type converter able to unmarshal to given unmarshalType using jackson data format. + *

+ * Unmarshal type should be given as a fully qualified class name in the exchange properties. + */ +@DataType(name = "jsonObject") +public class JsonModelDataType implements DataTypeConverter { + + public static final String JSON_DATA_TYPE_KEY = "CamelJsonModelDataType"; + + @Override + public void convert(Exchange exchange) { + if (!exchange.hasProperties() || !exchange.getProperties().containsKey(JSON_DATA_TYPE_KEY)) { + return; + } + + String type = exchange.getProperty(JSON_DATA_TYPE_KEY, String.class); + try (JacksonDataFormat dataFormat = new JacksonDataFormat(new ObjectMapper(), Class.forName(type))) { + Object unmarshalled = dataFormat.unmarshal(exchange, getBodyAsStream(exchange)); + exchange.getMessage().setBody(unmarshalled); + } catch (Exception e) { + throw new CamelExecutionException( + String.format("Failed to load Json unmarshalling type '%s'", type), exchange, e); + } + } + + private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException { + InputStream bodyStream = exchange.getMessage().getBody(InputStream.class); + + if (bodyStream == null) { + bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class)); + } + + return bodyStream; + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeConverter.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeConverter.java new file mode 100644 index 000000000..d39d30f80 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeConverter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.spi; + +import org.apache.camel.Exchange; +import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; + +@FunctionalInterface +public interface DataTypeConverter { + + void convert(Exchange exchange); + + /** + * Gets the data type converter name. Automatically derives the name from given type annotation. + * @return + */ + default String getName() { + if (this.getClass().isAnnotationPresent(DataType.class)) { + return this.getClass().getAnnotation(DataType.class).name(); + } + + throw new UnsupportedOperationException("Missing data type converter name"); + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeLoader.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeLoader.java new file mode 100644 index 000000000..73f87c696 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeLoader.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.spi; + +/** + * A pluggable strategy to load data types into a {@link DataTypeRegistry}. + */ +public interface DataTypeLoader { + + /** + * A pluggable strategy to load data types into a registry. + * + * @param registry the registry to load the data types into + */ + void load(DataTypeRegistry registry); +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeRegistry.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeRegistry.java new file mode 100644 index 000000000..cb2bedc91 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/DataTypeRegistry.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.spi; + +import java.util.Optional; + +/** + * Registry for data types. Data type loaders should be used to add types to the registry. + *

+ * The registry is able to perform a lookup of a specific data type. + */ +public interface DataTypeRegistry { + + /** + * Registers a new default data type converter. + * @param scheme + * @param converter + */ + void addDataTypeConverter(String scheme, DataTypeConverter converter); + + /** + * Registers a new default data type converter. + * @param converter + */ + default void addDataTypeConverter(DataTypeConverter converter) { + addDataTypeConverter("camel", converter); + } + + /** + * Find data type for given component scheme and data type name. + * @param scheme + * @param name + * @return + */ + Optional lookup(String scheme, String name); + + /** + * Find data type for given data type name. + * @param name + * @return + */ + default Optional lookup(String name) { + return lookup("camel", name); + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/annotations/DataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/annotations/DataType.java new file mode 100644 index 000000000..b1d4f5a9c --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/spi/annotations/DataType.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.spi.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Data type annotation defines a type with its component scheme, a name and input/output types. + */ +@Retention(RetentionPolicy.RUNTIME) +@Documented +@Target({ ElementType.TYPE }) +public @interface DataType { + + /** + * Camel component scheme. + * @return + */ + String scheme() default "camel"; + + /** + * Data type name. + * @return + */ + String name(); + + /** + * The media type associated with this data type. + * @return + */ + String mediaType() default ""; +} diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType new file mode 100644 index 000000000..b51d34040 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataType @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.camel.kamelets.utils.format.converter.standard +org.apache.camel.kamelets.utils.format.converter.aws2.ddb +org.apache.camel.kamelets.utils.format.converter.aws2.s3 \ No newline at end of file diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java new file mode 100644 index 000000000..2ee4113e3 --- /dev/null +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format; + +import java.util.Optional; + +import org.apache.camel.CamelContextAware; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class DefaultDataTypeRegistryTest { + + private DefaultCamelContext camelContext; + + private DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry(); + + @BeforeEach + void setup() { + this.camelContext = new DefaultCamelContext(); + CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext); + } + + @Test + public void shouldLookupDefaultDataTypeConverters() throws Exception { + Optional converter = dataTypeRegistry.lookup( "jsonObject"); + Assertions.assertTrue(converter.isPresent()); + Assertions.assertEquals(JsonModelDataType.class, converter.get().getClass()); + converter = dataTypeRegistry.lookup( "string"); + Assertions.assertTrue(converter.isPresent()); + Assertions.assertEquals(DefaultDataTypeConverter.class, converter.get().getClass()); + Assertions.assertEquals(String.class, ((DefaultDataTypeConverter) converter.get()).getType()); + converter = dataTypeRegistry.lookup( "binary"); + Assertions.assertTrue(converter.isPresent()); + Assertions.assertEquals(DefaultDataTypeConverter.class, converter.get().getClass()); + Assertions.assertEquals(byte[].class, ((DefaultDataTypeConverter) converter.get()).getType()); + } + +} \ No newline at end of file diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java similarity index 65% rename from library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java rename to library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java index 33d27bfe4..7f1f9e9fc 100644 --- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/transform/aws/ddb/JsonToDdbModelConverterTest.java +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java @@ -14,16 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.kamelets.utils.transform.aws.ddb; + +package org.apache.camel.kamelets.utils.format.converter.aws2.ddb; import java.util.Map; +import java.util.Optional; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.CamelContextAware; +import org.apache.camel.CamelExecutionException; import org.apache.camel.Exchange; -import org.apache.camel.InvalidPayloadException; import org.apache.camel.component.aws2.ddb.Ddb2Constants; import org.apache.camel.component.aws2.ddb.Ddb2Operations; import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; import org.apache.camel.support.DefaultExchange; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -33,25 +38,25 @@ import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate; import software.amazon.awssdk.services.dynamodb.model.ReturnValue; -class JsonToDdbModelConverterTest { +public class Ddb2JsonInputTypeTest { private DefaultCamelContext camelContext; private final ObjectMapper mapper = new ObjectMapper(); - private final JsonToDdbModelConverter processor = new JsonToDdbModelConverter(); + private final Ddb2JsonInputType inputType = new Ddb2JsonInputType(); private final String keyJson = "{" + - "\"name\": \"Rajesh Koothrappali\"" + + "\"name\": \"Rajesh Koothrappali\"" + "}"; private final String itemJson = "{" + - "\"name\": \"Rajesh Koothrappali\"," + - "\"age\": 29," + - "\"super-heroes\": [\"batman\", \"spiderman\", \"wonderwoman\"]," + - "\"issues\": [5, 3, 9, 1]," + - "\"girlfriend\": null," + - "\"doctorate\": true" + + "\"name\": \"Rajesh Koothrappali\"," + + "\"age\": 29," + + "\"super-heroes\": [\"batman\", \"spiderman\", \"wonderwoman\"]," + + "\"issues\": [5, 3, 9, 1]," + + "\"girlfriend\": null," + + "\"doctorate\": true" + "}"; @BeforeEach @@ -65,8 +70,8 @@ void shouldMapPutItemHeaders() throws Exception { Exchange exchange = new DefaultExchange(camelContext); exchange.getMessage().setBody(mapper.readTree(itemJson)); - - processor.process(Ddb2Operations.PutItem.name(), exchange); + exchange.setProperty("operation", Ddb2Operations.PutItem.name()); + inputType.convert(exchange); Assertions.assertTrue(exchange.getMessage().hasHeaders()); Assertions.assertEquals(Ddb2Operations.PutItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); @@ -80,9 +85,10 @@ void shouldMapPutItemHeaders() throws Exception { void shouldMapUpdateItemHeaders() throws Exception { Exchange exchange = new DefaultExchange(camelContext); - exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + ", \"item\": " + itemJson + "}")); + exchange.getMessage().setBody(mapper.readTree("{\"operation\": \"" + Ddb2Operations.UpdateItem.name() + "\", \"key\": " + + keyJson + ", \"item\": " + itemJson + "}")); - processor.process(Ddb2Operations.UpdateItem.name(), exchange); + inputType.convert(exchange); Assertions.assertTrue(exchange.getMessage().hasHeaders()); Assertions.assertEquals(Ddb2Operations.UpdateItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); @@ -101,8 +107,9 @@ void shouldMapDeleteItemHeaders() throws Exception { Exchange exchange = new DefaultExchange(camelContext); exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + "}")); + exchange.setProperty("operation", Ddb2Operations.DeleteItem.name()); - processor.process(Ddb2Operations.DeleteItem.name(), exchange); + inputType.convert(exchange); Assertions.assertTrue(exchange.getMessage().hasHeaders()); Assertions.assertEquals(Ddb2Operations.DeleteItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); @@ -119,8 +126,8 @@ void shouldMapNestedObjects() throws Exception { Exchange exchange = new DefaultExchange(camelContext); exchange.getMessage().setBody(mapper.readTree("{\"user\":" + itemJson + "}")); - - processor.process(Ddb2Operations.PutItem.name(), exchange); + exchange.setProperty("operation", Ddb2Operations.PutItem.name()); + inputType.convert(exchange); Assertions.assertTrue(exchange.getMessage().hasHeaders()); Assertions.assertEquals(Ddb2Operations.PutItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); @@ -130,11 +137,12 @@ void shouldMapNestedObjects() throws Exception { Assertions.assertEquals(1L, attributeValueMap.size()); Assertions.assertEquals("AttributeValue(M={name=AttributeValue(S=Rajesh Koothrappali), " + - "age=AttributeValue(N=29), " + - "super-heroes=AttributeValue(SS=[batman, spiderman, wonderwoman]), " + - "issues=AttributeValue(NS=[5, 3, 9, 1]), " + - "girlfriend=AttributeValue(NUL=true), " + - "doctorate=AttributeValue(BOOL=true)})", attributeValueMap.get("user").toString()); + "age=AttributeValue(N=29), " + + "super-heroes=AttributeValue(SS=[batman, spiderman, wonderwoman]), " + + "issues=AttributeValue(NS=[5, 3, 9, 1]), " + + "girlfriend=AttributeValue(NUL=true), " + + "doctorate=AttributeValue(BOOL=true)})", + attributeValueMap.get("user").toString()); } @Test @@ -142,9 +150,10 @@ void shouldMapNestedObjects() throws Exception { void shouldMapEmptyJson() throws Exception { Exchange exchange = new DefaultExchange(camelContext); - exchange.getMessage().setBody(mapper.readTree("{}")); + exchange.getMessage().setBody("{}"); + exchange.getMessage().setHeader(Ddb2Constants.OPERATION, Ddb2Operations.PutItem.name()); - processor.process(Ddb2Operations.PutItem.name(), exchange); + inputType.convert(exchange); Assertions.assertTrue(exchange.getMessage().hasHeaders()); Assertions.assertEquals(Ddb2Operations.PutItem, exchange.getMessage().getHeader(Ddb2Constants.OPERATION)); @@ -154,20 +163,39 @@ void shouldMapEmptyJson() throws Exception { Assertions.assertEquals(0L, attributeValueMap.size()); } - @Test() + @Test + void shouldFailForWrongBodyType() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setBody("Hello"); + + Assertions.assertThrows(CamelExecutionException.class, () -> inputType.convert(exchange)); + } + + @Test void shouldFailForUnsupportedOperation() throws Exception { Exchange exchange = new DefaultExchange(camelContext); exchange.getMessage().setBody(mapper.readTree("{}")); + exchange.setProperty("operation", Ddb2Operations.BatchGetItems.name()); - Assertions.assertThrows(UnsupportedOperationException.class, () -> processor.process(Ddb2Operations.BatchGetItems.name(), exchange)); + Assertions.assertThrows(UnsupportedOperationException.class, () -> inputType.convert(exchange)); + } + + @Test + public void shouldLookupDataType() throws Exception { + DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry(); + CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext); + Optional converter = dataTypeRegistry.lookup("aws2-ddb", "json"); + Assertions.assertTrue(converter.isPresent()); } private void assertAttributeValueMap(Map attributeValueMap) { Assertions.assertEquals(6L, attributeValueMap.size()); Assertions.assertEquals(AttributeValue.builder().s("Rajesh Koothrappali").build(), attributeValueMap.get("name")); Assertions.assertEquals(AttributeValue.builder().n("29").build(), attributeValueMap.get("age")); - Assertions.assertEquals(AttributeValue.builder().ss("batman", "spiderman", "wonderwoman").build(), attributeValueMap.get("super-heroes")); + Assertions.assertEquals(AttributeValue.builder().ss("batman", "spiderman", "wonderwoman").build(), + attributeValueMap.get("super-heroes")); Assertions.assertEquals(AttributeValue.builder().ns("5", "3", "9", "1").build(), attributeValueMap.get("issues")); Assertions.assertEquals(AttributeValue.builder().nul(true).build(), attributeValueMap.get("girlfriend")); Assertions.assertEquals(AttributeValue.builder().bool(true).build(), attributeValueMap.get("doctorate")); @@ -175,11 +203,19 @@ private void assertAttributeValueMap(Map attributeValueM private void assertAttributeValueUpdateMap(Map attributeValueMap) { Assertions.assertEquals(6L, attributeValueMap.size()); - Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().s("Rajesh Koothrappali").build()).action(AttributeAction.PUT).build(), attributeValueMap.get("name")); - Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().n("29").build()).action(AttributeAction.PUT).build(), attributeValueMap.get("age")); - Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ss("batman", "spiderman", "wonderwoman").build()).action(AttributeAction.PUT).build(), attributeValueMap.get("super-heroes")); - Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ns("5", "3", "9", "1").build()).action(AttributeAction.PUT).build(), attributeValueMap.get("issues")); - Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().nul(true).build()).action(AttributeAction.PUT).build(), attributeValueMap.get("girlfriend")); - Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().bool(true).build()).action(AttributeAction.PUT).build(), attributeValueMap.get("doctorate")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().s("Rajesh Koothrappali").build()) + .action(AttributeAction.PUT).build(), attributeValueMap.get("name")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().n("29").build()) + .action(AttributeAction.PUT).build(), attributeValueMap.get("age")); + Assertions.assertEquals( + AttributeValueUpdate.builder().value(AttributeValue.builder().ss("batman", "spiderman", "wonderwoman").build()) + .action(AttributeAction.PUT).build(), + attributeValueMap.get("super-heroes")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().ns("5", "3", "9", "1").build()) + .action(AttributeAction.PUT).build(), attributeValueMap.get("issues")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().nul(true).build()) + .action(AttributeAction.PUT).build(), attributeValueMap.get("girlfriend")); + Assertions.assertEquals(AttributeValueUpdate.builder().value(AttributeValue.builder().bool(true).build()) + .action(AttributeAction.PUT).build(), attributeValueMap.get("doctorate")); } } diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputTypeTest.java new file mode 100644 index 000000000..53357adde --- /dev/null +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3JsonOutputTypeTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kamelets.utils.format.converter.aws2.s3; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.component.aws2.s3.AWS2S3Constants; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.http.AbortableInputStream; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class AWS2S3JsonOutputTypeTest { + + private final DefaultCamelContext camelContext = new DefaultCamelContext(); + + private final AWS2S3JsonOutputType outputType = new AWS2S3JsonOutputType(); + + @Test + void shouldMapFromStringToJsonModel() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setHeader(AWS2S3Constants.KEY, "test1.txt"); + exchange.getMessage().setBody("Test1"); + outputType.convert(exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + assertEquals("test1.txt", exchange.getMessage().getHeader(AWS2S3Constants.KEY)); + + assertJsonModelBody(exchange, "test1.txt", "Test1"); + } + + @Test + void shouldMapFromBytesToJsonModel() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setHeader(AWS2S3Constants.KEY, "test2.txt"); + exchange.getMessage().setBody("Test2".getBytes(StandardCharsets.UTF_8)); + outputType.convert(exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + assertEquals("test2.txt", exchange.getMessage().getHeader(AWS2S3Constants.KEY)); + + assertJsonModelBody(exchange, "test2.txt", "Test2"); + } + + @Test + void shouldMapFromInputStreamToJsonModel() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.getMessage().setHeader(AWS2S3Constants.KEY, "test3.txt"); + exchange.getMessage().setBody(new ResponseInputStream<>(GetObjectRequest.builder().bucket("myBucket").key("test3.txt").build(), + AbortableInputStream.create(new ByteArrayInputStream("Test3".getBytes(StandardCharsets.UTF_8))))); + outputType.convert(exchange); + + Assertions.assertTrue(exchange.getMessage().hasHeaders()); + assertEquals("test3.txt", exchange.getMessage().getHeader(AWS2S3Constants.KEY)); + + assertJsonModelBody(exchange, "test3.txt", "Test3"); + } + + @Test + public void shouldLookupDataType() throws Exception { + DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry(); + CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext); + Optional converter = dataTypeRegistry.lookup("aws2-s3", "json"); + Assertions.assertTrue(converter.isPresent()); + } + + private static void assertJsonModelBody(Exchange exchange, String key, String content) { + assertEquals(String.format("{\"key\": \"%s\", \"content\": \"%s\"}", key, content), exchange.getMessage().getBody()); + } +} diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java new file mode 100644 index 000000000..c175cc6d9 --- /dev/null +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.standard; + +import java.util.Optional; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.apache.camel.support.DefaultExchange; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class JsonModelDataTypeTest { + + private final DefaultCamelContext camelContext = new DefaultCamelContext(); + + private final JsonModelDataType dataType = new JsonModelDataType(); + + @Test + void shouldMapFromStringToJsonModel() throws Exception { + Exchange exchange = new DefaultExchange(camelContext); + + exchange.setProperty(JsonModelDataType.JSON_DATA_TYPE_KEY, Person.class.getName()); + exchange.getMessage().setBody("{ \"name\": \"Sheldon\", \"age\": 29}"); + dataType.convert(exchange); + + assertEquals(Person.class, exchange.getMessage().getBody().getClass()); + assertEquals("Sheldon", exchange.getMessage().getBody(Person.class).getName()); + } + + @Test + public void shouldLookupDataType() throws Exception { + DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry(); + CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext); + Optional converter = dataTypeRegistry.lookup("jsonObject"); + Assertions.assertTrue(converter.isPresent()); + } + + public static class Person { + @JsonProperty + private String name; + + @JsonProperty + private Long age; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Long getAge() { + return age; + } + + public void setAge(Long age) { + this.age = age; + } + } + +} \ No newline at end of file diff --git a/library/camel-kamelets-utils/src/test/resources/log4j2-test.xml b/library/camel-kamelets-utils/src/test/resources/log4j2-test.xml new file mode 100644 index 000000000..1d6d8f383 --- /dev/null +++ b/library/camel-kamelets-utils/src/test/resources/log4j2-test.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + + + diff --git a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml index 5b603abfc..0a77faf15 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/aws-ddb-sink.kamelet.yaml @@ -97,6 +97,12 @@ spec: x-descriptors: - 'urn:alm:descriptor:com.tectonic.ui:checkbox' default: false + inputFormat: + title: Input type + description: Specify the input type for this Kamelet. The Kamelet will automatically apply conversion logic in order to transform message content to this data type. + type: string + default: json + example: json types: in: mediaType: application/json @@ -107,17 +113,24 @@ spec: - "camel:aws2-ddb" - "camel:kamelet" template: + beans: + - name: dataTypeRegistry + type: "#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry" + - name: inputTypeProcessor + type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor" + property: + - key: scheme + value: 'aws2-ddb' + - key: format + value: '{{inputFormat}}' from: uri: "kamelet:source" steps: - set-property: - name: operation - constant: "{{operation}}" - - unmarshal: - json: - library: Jackson - unmarshalType: com.fasterxml.jackson.databind.JsonNode - - bean: "org.apache.camel.kamelets.utils.transform.aws.ddb.JsonToDdbModelConverter" + name: operation + constant: "{{operation}}" + - process: + ref: "{{inputTypeProcessor}}" - to: uri: "aws2-ddb:{{table}}" parameters: diff --git a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml index 6ab2bca41..858bae82a 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/aws-s3-source.kamelet.yaml @@ -107,6 +107,12 @@ spec: description: The number of milliseconds before the next poll of the selected bucket. type: integer default: 500 + outputFormat: + title: Output type + description: Choose the output type for this Kamelet. The Kamelet supports different output types and performs automatic message conversion according to this data type. + type: string + default: binary + example: binary dependencies: - "camel:core" - "camel:aws2-s3" @@ -114,6 +120,15 @@ spec: - "camel:kamelet" template: beans: + - name: dataTypeRegistry + type: "#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry" + - name: outputTypeProcessor + type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor" + property: + - key: scheme + value: 'aws2-s3' + - key: format + value: '{{outputFormat}}' - name: renameHeaders type: "#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders" property: @@ -143,4 +158,6 @@ spec: steps: - process: ref: "{{renameHeaders}}" + - process: + ref: "{{outputTypeProcessor}}" - to: "kamelet:sink" diff --git a/test/aws-s3/README.md b/test/aws-s3/README.md new file mode 100644 index 000000000..6e7d7315f --- /dev/null +++ b/test/aws-s3/README.md @@ -0,0 +1,76 @@ +# AWS S3 Kamelet test + +This test verifies the AWS S3 Kamelet source defined in [aws-s3-source.kamelet.yaml](aws-s3-source.kamelet.yaml) + +## Objectives + +The test verifies the AWS S3 Kamelet source by creating a Camel K integration that uses the Kamelet and listens for messages on the +AWS S3 bucket. + +The test uses a [LocalStack Testcontainers](https://www.testcontainers.org/modules/localstack/) instance to start a local AWS S3 service for mocking reasons. +The Kamelet and the test interact with the local AWS S3 service for validation of functionality. + +### Test Kamelet source + +The test performs the following high level steps for configs - URI, secret and property based: + +*Preparation* +- Start the AWS S3 service as LocalStack container +- Overwrite the Kamelet with the latest source +- Prepare the Camel AWS S3 client + +*Scenario* +- Create the Kamelet in the current namespace in the cluster +- Create the Camel K integration that uses the Kamelet source to consume data from AWS S3 service +- Wait for the Camel K integration to start and listen for AWS S3 messages +- Create a new message in the AWS S3 bucket +- Verify that the integration has received the message event + +*Cleanup* +- Stop the LocalStack container +- Delete the Camel K integration +- Delete the secret from the current namespacce + +## Installation + +The test assumes that you have access to a Kubernetes cluster and that the Camel K operator as well as the YAKS operator is installed +and running. + +You can review the installation steps for the operators in the documentation: + +- [Install Camel K operator](https://camel.apache.org/camel-k/latest/installation/installation.html) +- [Install YAKS operator](https://github.com/citrusframework/yaks#installation) + +## Run the tests + +To run tests with URI based configuration: + +```shell script +$ yaks test aws-s3-source-uri-conf.feature +``` + +To run tests with secret based configuration: + +```shell script +$ yaks test aws-s3-source-secret-conf.feature +``` + +To run tests with property based configuration: + +```shell script +$ yaks test aws-s3-source-property-conf.feature +``` + +To run tests with URI binding: + +```shell script +$ yaks test aws-s3-uri-binding.feature +``` + +To run tests with binding to Knative channel: + +```shell script +$ yaks test aws-s3-inmem-binding.feature +``` + +You will be provided with the test log output and the test results. diff --git a/test/aws-s3/amazonS3Client.groovy b/test/aws-s3/amazonS3Client.groovy new file mode 100644 index 000000000..5c3ff8a01 --- /dev/null +++ b/test/aws-s3/amazonS3Client.groovy @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.S3Client + +S3Client s3 = S3Client + .builder() + .endpointOverride(URI.create("${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL}")) + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create( + "${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}", + "${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}") + )) + .region(Region.of("${YAKS_TESTCONTAINERS_LOCALSTACK_REGION}")) + .build() + +s3.createBucket(b -> b.bucket("${aws.s3.bucketNameOrArn}")) + +return s3 diff --git a/test/aws-s3/aws-s3-credentials.properties b/test/aws-s3/aws-s3-credentials.properties new file mode 100644 index 000000000..f9dd1e10b --- /dev/null +++ b/test/aws-s3/aws-s3-credentials.properties @@ -0,0 +1,7 @@ +# Please add your AWS S3 account credentials +camel.kamelet.aws-s3-source.aws-s3-credentials.bucketNameOrArn=${aws.s3.bucketNameOrArn} +camel.kamelet.aws-s3-source.aws-s3-credentials.overrideEndpoint=true +camel.kamelet.aws-s3-source.aws-s3-credentials.uriEndpointOverride=${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL} +camel.kamelet.aws-s3-source.aws-s3-credentials.secretKey=${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY} +camel.kamelet.aws-s3-source.aws-s3-credentials.accessKey=${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY} +camel.kamelet.aws-s3-source.aws-s3-credentials.region=${YAKS_TESTCONTAINERS_LOCALSTACK_REGION} diff --git a/test/aws-s3/aws-s3-inmem-binding.feature b/test/aws-s3/aws-s3-inmem-binding.feature new file mode 100644 index 000000000..d67e77984 --- /dev/null +++ b/test/aws-s3/aws-s3-inmem-binding.feature @@ -0,0 +1,49 @@ +@knative +Feature: AWS S3 Kamelet - binding to InMemoryChannel + + Background: + Given Kamelet aws-s3-source is available + Given variables + | aws.s3.bucketNameOrArn | mybucket | + | aws.s3.message | Hello from S3 Kamelet | + | aws.s3.key | hello.txt | + + Scenario: Start LocalStack container + Given Enable service S3 + Given start LocalStack container + And log 'Started LocalStack container: ${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}' + + Scenario: Create AWS-S3 client + Given New global Camel context + Given load to Camel registry amazonS3Client.groovy + + Scenario: Create Knative broker and channel + Given create Knative broker default + And Knative broker default is running + Given create Knative channel messages + + Scenario: Create AWS-S3 Kamelet to InMemoryChannel binding + Given variable loginfo is "Installed features" + Given load KameletBinding aws-s3-to-inmem.yaml + Given load KameletBinding inmem-to-log.yaml + Then KameletBinding aws-s3-to-inmem should be available + And KameletBinding inmem-to-log should be available + And Camel K integration aws-s3-to-inmem is running + And Camel K integration inmem-to-log is running + And Camel K integration aws-s3-to-inmem should print ${loginfo} + And Camel K integration inmem-to-log should print ${loginfo} + Then sleep 10000 ms + + Scenario: Verify Kamelet source + Given Camel exchange message header CamelAwsS3Key="${aws.s3.key}" + Given send Camel exchange to("aws2-s3://${aws.s3.bucketNameOrArn}?amazonS3Client=#amazonS3Client") with body: ${aws.s3.message} + Then Camel K integration inmem-to-log should print ${aws.s3.message} + + Scenario: Remove resources + Given delete KameletBinding aws-s3-to-inmem + Given delete KameletBinding inmem-to-log + Given delete Knative broker default + Given delete Knative channel messages + + Scenario: Stop container + Given stop LocalStack container diff --git a/test/aws-s3/aws-s3-source-property-conf.feature b/test/aws-s3/aws-s3-source-property-conf.feature new file mode 100644 index 000000000..93a2d3539 --- /dev/null +++ b/test/aws-s3/aws-s3-source-property-conf.feature @@ -0,0 +1,37 @@ +Feature: AWS S3 Kamelet - property based config + + Background: + Given Kamelet aws-s3-source is available + Given variables + | aws.s3.bucketNameOrArn | mybucket | + | aws.s3.message | Hello from S3 Kamelet | + | aws.s3.key | hello.txt | + + Scenario: Start LocalStack container + Given Enable service S3 + Given start LocalStack container + And log 'Started LocalStack container: ${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}' + + Scenario: Create AWS-S3 client + Given New global Camel context + Given load to Camel registry amazonS3Client.groovy + + Scenario: Create AWS-S3 Kamelet to log binding + Given Camel K integration property file aws-s3-credentials.properties + Given create Camel K integration aws-s3-to-log-prop-based.groovy + """ + from("kamelet:aws-s3-source/aws-s3-credentials") + .to("log:info") + """ + Then Camel K integration aws-s3-to-log-prop-based should be running + + Scenario: Verify Kamelet source + Given Camel exchange message header CamelAwsS3Key="${aws.s3.key}" + Given send Camel exchange to("aws2-s3://${aws.s3.bucketNameOrArn}?amazonS3Client=#amazonS3Client") with body: ${aws.s3.message} + Then Camel K integration aws-s3-to-log-prop-based should print ${aws.s3.message} + + Scenario: Remove Camel K resources + Given delete Camel K integration aws-s3-to-log-prop-based + + Scenario: Stop container + Given stop LocalStack container diff --git a/test/aws-s3/aws-s3-source-secret-conf.feature b/test/aws-s3/aws-s3-source-secret-conf.feature new file mode 100644 index 000000000..78ee9be56 --- /dev/null +++ b/test/aws-s3/aws-s3-source-secret-conf.feature @@ -0,0 +1,39 @@ +@ignored +Feature: AWS S3 Kamelet - secret based config + + Background: + Given Kamelet aws-s3-source is available + Given variables + | aws.s3.bucketNameOrArn | mybucket | + | aws.s3.message | Hello from S3 Kamelet | + | aws.s3.key | hello.txt | + + Scenario: Start LocalStack container + Given Enable service S3 + Given start LocalStack container + And log 'Started LocalStack container: ${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}' + + Scenario: Create AWS-S3 client + Given New global Camel context + Given load to Camel registry amazonS3Client.groovy + + Scenario: Create AWS-S3 Kamelet to log binding + Given create Kubernetes secret aws-s3-source-credentials + | aws-s3-credentials.properties | citrus:encodeBase64(citrus:readFile(aws-s3-credentials.properties)) | + Given create labels on Kubernetes secret aws-s3-source-credentials + | camel.apache.org/kamelet | aws-s3-source | + | camel.apache.org/kamelet.configuration | aws-s3-credentials | + Given load Camel K integration aws-s3-to-log-secret-based.groovy + Then Camel K integration aws-s3-to-log-secret-based should be running + + Scenario: Verify Kamelet source + Given Camel exchange message header CamelAwsS3Key="${aws.s3.key}" + Given send Camel exchange to("aws2-s3://${aws.s3.bucketNameOrArn}?amazonS3Client=#amazonS3Client") with body: ${aws.s3.message} + Then Camel K integration aws-s3-to-log-secret-based should print ${aws.s3.message} + + Scenario: Remove resources + Given delete Camel K integration aws-s3-to-log-secret-based + Given delete Kubernetes secret aws-s3-source-credentials + + Scenario: Stop container + Given stop LocalStack container diff --git a/test/aws-s3/aws-s3-source-uri-conf.feature b/test/aws-s3/aws-s3-source-uri-conf.feature new file mode 100644 index 000000000..ca65ba7dd --- /dev/null +++ b/test/aws-s3/aws-s3-source-uri-conf.feature @@ -0,0 +1,32 @@ +Feature: AWS S3 Kamelet - URI based config + + Background: + Given Kamelet aws-s3-source is available + Given variables + | aws.s3.bucketNameOrArn | mybucket | + | aws.s3.message | Hello from S3 Kamelet | + | aws.s3.key | hello.txt | + + Scenario: Start LocalStack container + Given Enable service S3 + Given start LocalStack container + And log 'Started LocalStack container: ${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}' + + Scenario: Create S3 client + Given New global Camel context + Given load to Camel registry amazonS3Client.groovy + + Scenario: Create AWS-S3 Kamelet to log binding + Given load Camel K integration aws-s3-to-log-uri-based.groovy + Then Camel K integration aws-s3-to-log-uri-based should be running + + Scenario: Verify Kamelet source + Given Camel exchange message header CamelAwsS3Key="${aws.s3.key}" + Given send Camel exchange to("aws2-s3://${aws.s3.bucketNameOrArn}?amazonS3Client=#amazonS3Client") with body: ${aws.s3.message} + Then Camel K integration aws-s3-to-log-uri-based should print ${aws.s3.message} + + Scenario: Remove Camel K resources + Given delete Camel K integration aws-s3-to-log-uri-based + + Scenario: Stop container + Given stop LocalStack container diff --git a/test/aws-s3/aws-s3-to-inmem.yaml b/test/aws-s3/aws-s3-to-inmem.yaml new file mode 100644 index 000000000..ce880028d --- /dev/null +++ b/test/aws-s3/aws-s3-to-inmem.yaml @@ -0,0 +1,39 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: aws-s3-to-inmem +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: aws-s3-source + properties: + bucketNameOrArn: ${aws.s3.bucketNameOrArn} + overrideEndpoint: true + uriEndpointOverride: ${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL} + accessKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY} + secretKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY} + region: ${YAKS_TESTCONTAINERS_LOCALSTACK_REGION} + sink: + ref: + kind: InMemoryChannel + apiVersion: messaging.knative.dev/v1 + name: messages diff --git a/test/aws-s3/aws-s3-to-log-secret-based.groovy b/test/aws-s3/aws-s3-to-log-secret-based.groovy new file mode 100644 index 000000000..02fb1c58c --- /dev/null +++ b/test/aws-s3/aws-s3-to-log-secret-based.groovy @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// camel-k: language=groovy + +from("kamelet:aws-s3-source/aws-s3-credentials") + .to("log:info") diff --git a/test/aws-s3/aws-s3-to-log-uri-based.groovy b/test/aws-s3/aws-s3-to-log-uri-based.groovy new file mode 100644 index 000000000..145b5510e --- /dev/null +++ b/test/aws-s3/aws-s3-to-log-uri-based.groovy @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// camel-k: language=groovy + +def parameters = 'bucketNameOrArn=${aws.s3.bucketNameOrArn}&'+ + 'overrideEndpoint=true&' + + 'uriEndpointOverride=${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL}&' + + 'accessKey=${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY}&' + + 'secretKey=${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY}&'+ + 'region=${YAKS_TESTCONTAINERS_LOCALSTACK_REGION}&'+ + 'deleteAfterRead=true' + +from("kamelet:aws-s3-source?$parameters") + .to("log:info") diff --git a/test/aws-s3/aws-s3-uri-binding.feature b/test/aws-s3/aws-s3-uri-binding.feature new file mode 100644 index 000000000..ace191779 --- /dev/null +++ b/test/aws-s3/aws-s3-uri-binding.feature @@ -0,0 +1,35 @@ +Feature: AWS S3 Kamelet - binding to URI + + Background: + Given Kamelet aws-s3-source is available + Given variables + | aws.s3.bucketNameOrArn | mybucket | + | aws.s3.message | Hello from S3 Kamelet | + | aws.s3.key | hello.txt | + + Scenario: Start LocalStack container + Given Enable service S3 + Given start LocalStack container + And log 'Started LocalStack container: ${YAKS_TESTCONTAINERS_LOCALSTACK_CONTAINER_NAME}' + + Scenario: Create AWS-S3 client + Given New global Camel context + Given load to Camel registry amazonS3Client.groovy + + Scenario: Create AWS-S3 Kamelet to log binding + Given variable loginfo is "Installed features" + When load KameletBinding aws-s3-uri-binding.yaml + And KameletBinding aws-s3-uri-binding is available + And Camel K integration aws-s3-uri-binding is running + Then Camel K integration aws-s3-uri-binding should print ${loginfo} + + Scenario: Verify Kamelet source + Given Camel exchange message header CamelAwsS3Key="${aws.s3.key}" + Given send Camel exchange to("aws2-s3://${aws.s3.bucketNameOrArn}?amazonS3Client=#amazonS3Client") with body: ${aws.s3.message} + Then Camel K integration aws-s3-uri-binding should print ${aws.s3.message} + + Scenario: Remove Camel K resources + Given delete KameletBinding aws-s3-uri-binding + + Scenario: Stop container + Given stop LocalStack container diff --git a/test/aws-s3/aws-s3-uri-binding.yaml b/test/aws-s3/aws-s3-uri-binding.yaml new file mode 100644 index 000000000..505228185 --- /dev/null +++ b/test/aws-s3/aws-s3-uri-binding.yaml @@ -0,0 +1,37 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: aws-s3-uri-binding +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: aws-s3-source + properties: + bucketNameOrArn: ${aws.s3.bucketNameOrArn} + overrideEndpoint: true + outputFormat: json + uriEndpointOverride: ${YAKS_TESTCONTAINERS_LOCALSTACK_S3_URL} + accessKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_ACCESS_KEY} + secretKey: ${YAKS_TESTCONTAINERS_LOCALSTACK_SECRET_KEY} + region: ${YAKS_TESTCONTAINERS_LOCALSTACK_REGION} + sink: + uri: log:info diff --git a/test/aws-s3/yaks-config.yaml b/test/aws-s3/yaks-config.yaml new file mode 100644 index 000000000..f36d136cd --- /dev/null +++ b/test/aws-s3/yaks-config.yaml @@ -0,0 +1,65 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +config: + namespace: + temporary: false + runtime: + testcontainers: + enabled: true + env: + - name: YAKS_CAMEL_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_KAMELETS_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_KNATIVE_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_TESTCONTAINERS_AUTO_REMOVE_RESOURCES + value: false + - name: CITRUS_TYPE_CONVERTER + value: camel + resources: + - amazonS3Client.groovy + - aws-s3-credentials.properties + - aws-s3-to-log-uri-based.groovy + - aws-s3-to-log-secret-based.groovy + - aws-s3-uri-binding.yaml + - aws-s3-to-inmem.yaml + - ../utils/inmem-to-log.yaml + cucumber: + tags: + - "not @ignored" + settings: + dependencies: + - groupId: com.amazonaws + artifactId: aws-java-sdk-kinesis + version: "@aws-java-sdk.version@" + - groupId: org.apache.camel + artifactId: camel-aws2-s3 + version: "@camel.version@" + - groupId: org.apache.camel + artifactId: camel-jackson + version: "@camel.version@" + dump: + enabled: true + failedOnly: true + includes: + - app=camel-k diff --git a/test/utils/inmem-to-log.yaml b/test/utils/inmem-to-log.yaml new file mode 100644 index 000000000..8b5dc51e7 --- /dev/null +++ b/test/utils/inmem-to-log.yaml @@ -0,0 +1,29 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: inmem-to-log +spec: + source: + ref: + kind: InMemoryChannel + apiVersion: messaging.knative.dev/v1 + name: messages + sink: + uri: log:info