Skip to content

Commit

Permalink
Introduce Kamelet input/output data types
Browse files Browse the repository at this point in the history
- Introduce data type converters
- Add data type processor to auto convert exchange message from/to given data type
- Let user choose which data type to use (via Kamelet property)
- Add data type registry and annotation based loader to find data type implementations by component scheme and name

Relates to CAMEL-18698 and apache/camel-k#1980
  • Loading branch information
christophd authored and oscerd committed Dec 2, 2022
1 parent 03c2d24 commit 4595e5f
Show file tree
Hide file tree
Showing 38 changed files with 1,829 additions and 74 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/yaks-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ concurrency:
env:
CAMEL_K_VERSION: 1.10.3
YAKS_VERSION: 0.11.0
YAKS_IMAGE_NAME: "docker.io/yaks/yaks"
YAKS_IMAGE_NAME: "docker.io/citrusframework/yaks"
YAKS_RUN_OPTIONS: "--timeout=15m"

jobs:
Expand Down Expand Up @@ -110,6 +110,7 @@ jobs:
run: |
echo "Running tests"
yaks run test/aws-ddb-sink $YAKS_RUN_OPTIONS
yaks run test/aws-s3 $YAKS_RUN_OPTIONS
yaks run test/extract-field-action $YAKS_RUN_OPTIONS
yaks run test/insert-field-action $YAKS_RUN_OPTIONS
yaks run test/mail-sink $YAKS_RUN_OPTIONS
Expand Down
27 changes: 20 additions & 7 deletions kamelets/aws-ddb-sink.kamelet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ spec:
x-descriptors:
- 'urn:alm:descriptor:com.tectonic.ui:checkbox'
default: false
inputFormat:
title: Input Type
description: Specify the input type for this Kamelet. The Kamelet will automatically apply conversion logic in order to transform message content to this data type.
type: string
default: json
example: json
types:
in:
mediaType: application/json
Expand All @@ -107,17 +113,24 @@ spec:
- "camel:aws2-ddb"
- "camel:kamelet"
template:
beans:
- name: dataTypeRegistry
type: "#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry"
- name: inputTypeProcessor
type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor"
property:
- key: scheme
value: 'aws2-ddb'
- key: format
value: '{{inputFormat}}'
from:
uri: "kamelet:source"
steps:
- set-property:
name: operation
constant: "{{operation}}"
- unmarshal:
json:
library: Jackson
unmarshalType: com.fasterxml.jackson.databind.JsonNode
- bean: "org.apache.camel.kamelets.utils.transform.aws.ddb.JsonToDdbModelConverter"
name: operation
constant: "{{operation}}"
- process:
ref: "{{inputTypeProcessor}}"
- to:
uri: "aws2-ddb:{{table}}"
parameters:
Expand Down
17 changes: 17 additions & 0 deletions kamelets/aws-s3-source.kamelet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,28 @@ spec:
description: The number of milliseconds before the next poll of the selected bucket.
type: integer
default: 500
outputFormat:
title: Output Type
description: Choose the output type for this Kamelet. The Kamelet supports different output types and performs automatic message conversion according to this data type.
type: string
default: binary
example: binary
dependencies:
- "camel:core"
- "camel:aws2-s3"
- "github:apache.camel-kamelets:camel-kamelets-utils:main-SNAPSHOT"
- "camel:kamelet"
template:
beans:
- name: dataTypeRegistry
type: "#class:org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry"
- name: outputTypeProcessor
type: "#class:org.apache.camel.kamelets.utils.format.DataTypeProcessor"
property:
- key: scheme
value: 'aws2-s3'
- key: format
value: '{{outputFormat}}'
- name: renameHeaders
type: "#class:org.apache.camel.kamelets.utils.headers.DuplicateNamingHeaders"
property:
Expand Down Expand Up @@ -143,4 +158,6 @@ spec:
steps:
- process:
ref: "{{renameHeaders}}"
- process:
ref: "{{outputTypeProcessor}}"
- to: "kamelet:sink"
7 changes: 6 additions & 1 deletion library/camel-kamelets-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,17 @@
<artifactId>camel-kafka</artifactId>
</dependency>

<!-- AWS Dynamo DB camel component -->
<!-- Optional dependencies for data type conversion -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws2-ddb</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-aws2-s3</artifactId>
<scope>provided</scope>
</dependency>

<!-- Test scoped dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kamelets.utils.format;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Set;

import org.apache.camel.TypeConverterLoaderException;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
import org.apache.camel.kamelets.utils.format.spi.DataTypeLoader;
import org.apache.camel.kamelets.utils.format.spi.DataTypeRegistry;
import org.apache.camel.kamelets.utils.format.spi.annotations.DataType;
import org.apache.camel.spi.Injector;
import org.apache.camel.spi.PackageScanClassResolver;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Data type loader scans packages for {@link DataTypeConverter} classes annotated with {@link DataType} annotation.
*/
public class AnnotationDataTypeLoader implements DataTypeLoader {

public static final String META_INF_SERVICES = "META-INF/services/org/apache/camel/DataType";

private static final Logger LOG = LoggerFactory.getLogger(AnnotationDataTypeLoader.class);

protected final PackageScanClassResolver resolver;
protected final Injector injector;

protected Set<Class<?>> visitedClasses = new HashSet<>();
protected Set<String> visitedURIs = new HashSet<>();

public AnnotationDataTypeLoader(Injector injector, PackageScanClassResolver resolver) {
this.injector = injector;
this.resolver = resolver;
}

@Override
public void load(DataTypeRegistry registry) {
Set<String> packages = new HashSet<>();

LOG.trace("Searching for {} services", META_INF_SERVICES);
try {
ClassLoader ccl = Thread.currentThread().getContextClassLoader();
if (ccl != null) {
findPackages(packages, ccl);
}
findPackages(packages, getClass().getClassLoader());
if (packages.isEmpty()) {
LOG.debug("No package names found to be used for classpath scanning for annotated data types.");
return;
}
} catch (Exception e) {
throw new TypeConverterLoaderException(
"Cannot find package names to be used for classpath scanning for annotated data types.", e);
}

// if there is any packages to scan and load @DataType classes, then do it
if (LOG.isTraceEnabled()) {
LOG.trace("Found data type packages to scan: {}", String.join(", ", packages));
}
Set<Class<?>> scannedClasses = resolver.findAnnotated(DataType.class, packages.toArray(new String[]{}));
if (!scannedClasses.isEmpty()) {
LOG.debug("Found {} packages with {} @DataType classes to load", packages.size(), scannedClasses.size());

// load all the found classes into the type data type registry
for (Class<?> type : scannedClasses) {
if (acceptClass(type)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Loading data type annotation: {}", ObjectHelper.name(type));
}
loadDataType(registry, type);
}
}
}

// now clear the maps so we do not hold references
visitedClasses.clear();
visitedURIs.clear();
}

private void loadDataType(DataTypeRegistry registry, Class<?> type) {
if (visitedClasses.contains(type)) {
return;
}
visitedClasses.add(type);

try {
if (DataTypeConverter.class.isAssignableFrom(type) && type.isAnnotationPresent(DataType.class)) {
DataType dt = type.getAnnotation(DataType.class);
DataTypeConverter converter = (DataTypeConverter) injector.newInstance(type);
registry.addDataTypeConverter(dt.scheme(), converter);
}
} catch (NoClassDefFoundError e) {
LOG.debug("Ignoring converter type: {} as a dependent class could not be found: {}",
type.getCanonicalName(), e, e);
}
}

protected boolean acceptClass(Class<?> type) {
return true;
}

protected void findPackages(Set<String> packages, ClassLoader classLoader) throws IOException {
Enumeration<URL> resources = classLoader.getResources(META_INF_SERVICES);
while (resources.hasMoreElements()) {
URL url = resources.nextElement();
String path = url.getPath();
if (!visitedURIs.contains(path)) {
// remember we have visited this uri so we wont read it twice
visitedURIs.add(path);
LOG.debug("Loading file {} to retrieve list of packages, from url: {}", META_INF_SERVICES, url);
try (BufferedReader reader = IOHelper.buffered(new InputStreamReader(url.openStream(), StandardCharsets.UTF_8))) {
while (true) {
String line = reader.readLine();
if (line == null) {
break;
}
line = line.trim();
if (line.startsWith("#") || line.length() == 0) {
continue;
}
packages.add(line);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kamelets.utils.format;

import org.apache.camel.BeanInject;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

/**
* Processor applies data type conversion based on given format name. Searches for matching data type converter
* with given component scheme and format name.
*/
public class DataTypeProcessor implements Processor, CamelContextAware {

private CamelContext camelContext;

@BeanInject
private DefaultDataTypeRegistry dataTypeRegistry;

private String scheme;
private String format;

@Override
public void process(Exchange exchange) throws Exception {
if (format == null || format.isEmpty()) {
return;
}

dataTypeRegistry.lookup(scheme, format)
.ifPresent(converter -> converter.convert(exchange));
}

public void setFormat(String format) {
this.format = format;
}

public void setScheme(String scheme) {
this.scheme = scheme;
}

@Override
public CamelContext getCamelContext() {
return camelContext;
}

@Override
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kamelets.utils.format;

import org.apache.camel.Exchange;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;

/**
* Default data type converter receives a name and a target type in order to use traditional exchange body conversion
* mechanisms in order to transform the message body to a given type.
*/
public class DefaultDataTypeConverter implements DataTypeConverter {

private final String name;
private final Class<?> type;

public DefaultDataTypeConverter(String name, Class<?> type) {
this.name = name;
this.type = type;
}

@Override
public void convert(Exchange exchange) {
if (type.isInstance(exchange.getMessage().getBody())) {
return;
}

exchange.getMessage().setBody(exchange.getMessage().getBody(type));
}

@Override
public String getName() {
return name;
}

public Class<?> getType() {
return type;
}
}
Loading

0 comments on commit 4595e5f

Please sign in to comment.