diff --git a/.github/workflows/yaks-tests.yaml b/.github/workflows/yaks-tests.yaml index 2d52f9154..1994831a3 100644 --- a/.github/workflows/yaks-tests.yaml +++ b/.github/workflows/yaks-tests.yaml @@ -91,6 +91,9 @@ jobs: yaks run test/aws-ddb-sink $YAKS_RUN_OPTIONS yaks run test/aws-s3 $YAKS_RUN_OPTIONS + yaks run test/avro-data-type $YAKS_RUN_OPTIONS + yaks run test/avro-serdes-action $YAKS_RUN_OPTIONS + yaks run test/extract-field-action $YAKS_RUN_OPTIONS yaks run test/insert-field-action $YAKS_RUN_OPTIONS diff --git a/kamelets/avro-deserialize-action.kamelet.yaml b/kamelets/avro-deserialize-action.kamelet.yaml index 3bec55f78..ebff66050 100644 --- a/kamelets/avro-deserialize-action.kamelet.yaml +++ b/kamelets/avro-deserialize-action.kamelet.yaml @@ -33,8 +33,6 @@ spec: title: "Avro Deserialize Action" description: "Deserialize payload to Avro" type: object - required: - - schema properties: schema: title: Schema @@ -54,23 +52,21 @@ spec: - "camel:core" - "camel:jackson-avro" template: + beans: + - name: schemaResolver + type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver" + property: + - key: validate + value: '{{validate}}' + - key: schema + value: '{{schema:}}' from: uri: kamelet:source steps: - - set-property: - name: schema - constant: "{{schema}}" - - set-property: - name: validate - constant: "{{validate}}" - unmarshal: avro: library: Jackson unmarshalType: com.fasterxml.jackson.databind.JsonNode - schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver" - - remove-property: - name: schema - - remove-property: - name: validate + schemaResolver: "#bean:{{schemaResolver}}" - remove-header: name: "Content-Type" diff --git a/kamelets/avro-serialize-action.kamelet.yaml b/kamelets/avro-serialize-action.kamelet.yaml index 06830b277..93a50270a 100644 --- a/kamelets/avro-serialize-action.kamelet.yaml +++ b/kamelets/avro-serialize-action.kamelet.yaml @@ -33,8 +33,6 @@ spec: title: "Avro Serialize Action" description: "Serialize payload to Avro" type: object - required: - - schema properties: schema: title: Schema @@ -54,24 +52,22 @@ spec: - "camel:core" - "camel:jackson-avro" template: + beans: + - name: schemaResolver + type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver" + property: + - key: validate + value: '{{validate}}' + - key: schema + value: '{{schema:}}' from: uri: kamelet:source steps: - - set-property: - name: schema - constant: "{{schema}}" - - set-property: - name: validate - constant: "{{validate}}" - marshal: avro: library: Jackson unmarshalType: com.fasterxml.jackson.databind.JsonNode - schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver" - - remove-property: - name: schema - - remove-property: - name: validate + schemaResolver: "#bean:{{schemaResolver}}" - set-header: name: "Content-Type" constant: "application/avro" diff --git a/kamelets/protobuf-deserialize-action.kamelet.yaml b/kamelets/protobuf-deserialize-action.kamelet.yaml index bf06dc4fc..c0727a5f1 100644 --- a/kamelets/protobuf-deserialize-action.kamelet.yaml +++ b/kamelets/protobuf-deserialize-action.kamelet.yaml @@ -32,8 +32,6 @@ spec: title: "Protobuf Deserialize Action" description: "Deserialize payload to Protobuf" type: object - required: - - schema properties: schema: title: Schema @@ -46,18 +44,19 @@ spec: - "camel:core" - "camel:jackson-protobuf" template: + beans: + - name: schemaResolver + type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver" + property: + - key: schema + value: '{{schema:}}' from: uri: kamelet:source steps: - - set-property: - name: schema - constant: "{{schema}}" - unmarshal: protobuf: library: Jackson unmarshalType: com.fasterxml.jackson.databind.JsonNode - schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver" - - remove-property: - name: schema + schemaResolver: "#bean:{{schemaResolver}}" - remove-header: name: "Content-Type" diff --git a/kamelets/protobuf-serialize-action.kamelet.yaml b/kamelets/protobuf-serialize-action.kamelet.yaml index 56f321dbc..36218aeca 100644 --- a/kamelets/protobuf-serialize-action.kamelet.yaml +++ b/kamelets/protobuf-serialize-action.kamelet.yaml @@ -32,8 +32,6 @@ spec: title: "Protobuf Serialize Action" description: "Serialize payload to Protobuf" type: object - required: - - schema properties: schema: title: Schema @@ -46,19 +44,20 @@ spec: - "camel:core" - "camel:jackson-protobuf" template: + beans: + - name: schemaResolver + type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver" + property: + - key: schema + value: '{{schema:}}' from: uri: kamelet:source steps: - - set-property: - name: schema - constant: "{{schema}}" - marshal: protobuf: library: Jackson unmarshalType: com.fasterxml.jackson.databind.JsonNode - schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver" - - remove-property: - name: schema + schemaResolver: "#bean:{{schemaResolver}}" - set-header: name: "Content-Type" constant: "application/protobuf" diff --git a/kamelets/resolve-pojo-schema-action.kamelet.yaml b/kamelets/resolve-pojo-schema-action.kamelet.yaml new file mode 100644 index 000000000..3bafc0902 --- /dev/null +++ b/kamelets/resolve-pojo-schema-action.kamelet.yaml @@ -0,0 +1,76 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: Kamelet +metadata: + name: resolve-pojo-schema-action + annotations: + camel.apache.org/kamelet.support.level: "Stable" + camel.apache.org/catalog.version: "4.0.0-SNAPSHOT" + camel.apache.org/kamelet.icon: "" + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Actions" + camel.apache.org/kamelet.namespace: "Transformation" + labels: + camel.apache.org/kamelet.type: "action" +spec: + definition: + title: "Resolve Schema Action" + description: "Resolves schema from given mime type and payload. Sets the resolved schema, the schema type and its content class as properties for later reference." + type: object + properties: + mimeType: + title: Mime Type + description: The mime type to determine the schema resolver implementation that should perform the operation. + type: string + default: "application/json" + example: "application/json" + schema: + title: Schema + description: Optional schema content (as single-line, using JSON format). + type: string + contentClass: + title: Content Class + description: Type information of the content object. Fully qualified class name. + type: string + example: "org.apache.camel.content.Foo" + targetMimeType: + title: Target Mime Type + description: Additional mime type information used to determine the schema resolver. Usually only used in combination with mime type "application/x-java-object" + type: string + example: "application/json" + dependencies: + - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.0.0-SNAPSHOT" + - "camel:kamelet" + - "camel:core" + - "camel:jackson-avro" + - "camel:jackson-protobuf" + template: + beans: + - name: schemaResolver + type: "#class:org.apache.camel.kamelets.utils.format.schema.DelegatingSchemaResolver" + properties: + mimeType: '{{mimeType}}' + schema: '{{schema:}}' + contentClass: '{{contentClass:}}' + targetMimeType: '{{targetMimeType:}}' + from: + uri: "kamelet:source" + steps: + - process: + ref: "{{schemaResolver}}" diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java index 24c77b706..cb01358a5 100644 --- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistry.java @@ -26,9 +26,8 @@ import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType; -import org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType; -import org.apache.camel.kamelets.utils.format.converter.standard.StringDataType; +import org.apache.camel.kamelets.utils.format.converter.bytes.ByteArrayDataType; +import org.apache.camel.kamelets.utils.format.converter.text.StringDataType; import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; import org.apache.camel.kamelets.utils.format.spi.DataTypeConverterResolver; import org.apache.camel.kamelets.utils.format.spi.DataTypeLoader; @@ -105,9 +104,8 @@ protected void doInit() throws Exception { if (classpathScan) { dataTypeLoaders.add(new AnnotationDataTypeLoader()); } else if (useDefaultConverters) { - addDataTypeConverter(new BinaryDataType()); + addDataTypeConverter(new ByteArrayDataType()); addDataTypeConverter(new StringDataType()); - addDataTypeConverter(new JsonModelDataType()); } for (DataTypeLoader loader : dataTypeLoaders) { diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/MimeType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/MimeType.java new file mode 100644 index 000000000..7c2054395 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/MimeType.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format; + +import java.util.Objects; + +public enum MimeType { + JSON("application/json"), + PROTOBUF("application/protobuf"), + AVRO("application/avro"), + AVRO_BINARY("avro/binary"), + AVRO_STRUCT("avro/x-struct"), + BINARY("application/octet-stream"), + TEXT("text/plain"), + JAVA_OBJECT("application/x-java-object"), + STRUCT("application/x-struct"); + + private static final MimeType[] VALUES = values(); + private final String type; + + MimeType(String type) { + this.type = type; + } + + public String type() { + return type; + } + + public static MimeType of(String type) { + for (MimeType mt : VALUES) { + if (Objects.equals(type, mt.type)) { + return mt; + } + } + + throw new IllegalArgumentException("Unsupported type: " + type); + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/SchemaType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/SchemaType.java new file mode 100644 index 000000000..10fa2c58f --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/SchemaType.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format; + +import java.util.Arrays; +import java.util.Objects; + +/** + * Supported schema type for Java object serialization/deserialization + */ +public enum SchemaType { + PROTOBUF("protobuf"), + AVRO("avsc"), + JSON("json"); + + private static final SchemaType[] VALUES = values(); + + private final String schemaType; + + SchemaType(String type) { + this.schemaType = type; + } + + public String type() { + return schemaType; + } + + public static SchemaType of(String type) { + return Arrays.stream(VALUES) + .filter(s -> Objects.equals(s.schemaType, type)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException(String.format("Unsupported schema type '%s'", type))); + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightAvroSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/Avro.java similarity index 52% rename from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightAvroSchemaResolver.java rename to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/Avro.java index a75df4d12..cfadf972b 100644 --- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightAvroSchemaResolver.java +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/Avro.java @@ -14,24 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.kamelets.utils.serialization; -import com.fasterxml.jackson.core.FormatSchema; -import com.fasterxml.jackson.dataformat.avro.AvroSchema; +package org.apache.camel.kamelets.utils.format.converter.avro; -import org.apache.avro.Schema; -import org.apache.camel.Exchange; -import org.apache.camel.component.jackson.SchemaResolver; +import com.fasterxml.jackson.dataformat.avro.AvroMapper; -public class InflightAvroSchemaResolver implements SchemaResolver { +public final class Avro { - @Override - public FormatSchema resolve(Exchange exchange) { - String schemaJson = (String) exchange.getProperty("schema"); - Boolean validate = Boolean.valueOf((String) exchange.getProperty("validate")); - Schema raw = new Schema.Parser().setValidate(validate).parse(schemaJson); - AvroSchema schema = new AvroSchema(raw); - return schema; - } + public static final AvroMapper MAPPER = new AvroMapper(); + private Avro() { + // prevent instantiation of utility class + } } diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroBinaryDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroBinaryDataType.java new file mode 100644 index 000000000..465da568f --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroBinaryDataType.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.avro; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.dataformat.avro.AvroSchema; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.kamelets.utils.format.MimeType; +import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; + +/** + * Data type uses Jackson Avro data format to marshal given JsonNode in Exchange body to a binary (byte array) representation. + * Uses given Avro schema from the Exchange properties when marshalling the payload (usually already resolved via schema + * resolver Kamelet action). + */ +@DataType(name = "avro-binary", mediaType = "avro/binary") +public class AvroBinaryDataType implements DataTypeConverter { + + @Override + public void convert(Exchange exchange) { + AvroSchema schema = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class); + + if (schema == null) { + throw new CamelExecutionException("Missing proper avro schema for data type processing", exchange); + } + + try { + byte[] marshalled = Avro.MAPPER.writer().forType(JsonNode.class).with(schema) + .writeValueAsBytes(getBodyAsJsonNode(exchange, schema)); + exchange.getMessage().setBody(marshalled); + + exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.AVRO_BINARY.type()); + exchange.getMessage().setHeader(SchemaHelper.CONTENT_SCHEMA, + exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, "", String.class)); + } catch (InvalidPayloadException | IOException e) { + throw new CamelExecutionException("Failed to apply Avro binary data type on exchange", exchange, e); + } + } + + private JsonNode getBodyAsJsonNode(Exchange exchange, AvroSchema schema) throws InvalidPayloadException, IOException { + if (exchange.getMessage().getBody() instanceof JsonNode) { + return (JsonNode) exchange.getMessage().getBody(); + } + + return Avro.MAPPER.reader().forType(JsonNode.class).with(schema) + .readValue(getBodyAsStream(exchange)); + } + + private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException { + InputStream bodyStream = exchange.getMessage().getBody(InputStream.class); + + if (bodyStream == null) { + bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class)); + } + + return bodyStream; + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroSchemaResolver.java new file mode 100644 index 000000000..e6a71ad0c --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroSchemaResolver.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.avro; + +import java.io.InputStream; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.fasterxml.jackson.core.FormatSchema; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.dataformat.avro.AvroSchema; +import org.apache.avro.Schema; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.jackson.SchemaResolver; +import org.apache.camel.kamelets.utils.format.SchemaType; +import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper; +import org.apache.camel.spi.Resource; +import org.apache.camel.support.PluginHelper; +import org.apache.camel.util.ObjectHelper; + +public class AvroSchemaResolver implements SchemaResolver, Processor { + private final ConcurrentMap schemes; + + private AvroSchema schema; + private String contentClass; + + private boolean validate = true; + + public AvroSchemaResolver() { + this.schemes = new ConcurrentHashMap<>(); + } + + public String getSchema() { + if (this.schema != null) { + return this.schema.getAvroSchema().toString(); + } + + return null; + } + + public void setSchema(String schema) { + if (ObjectHelper.isNotEmpty(schema)) { + this.schema = new AvroSchema(new Schema.Parser().setValidate(validate).parse(schema)); + } else { + this.schema = null; + } + } + + public boolean isValidate() { + return validate; + } + + public void setValidate(boolean validate) { + this.validate = validate; + } + + public String getContentClass() { + return contentClass; + } + + public void setContentClass(String contentClass) { + if (ObjectHelper.isNotEmpty(contentClass)) { + this.contentClass = contentClass; + } else { + this.contentClass = null; + } + } + + @Override + public void process(Exchange exchange) throws Exception { + Object payload = exchange.getMessage().getBody(); + if (payload == null) { + return; + } + + AvroSchema answer = computeIfAbsent(exchange); + + if (answer != null) { + exchange.setProperty(SchemaHelper.CONTENT_SCHEMA, answer); + exchange.setProperty(SchemaHelper.CONTENT_SCHEMA_TYPE, SchemaType.AVRO.type()); + exchange.setProperty(SchemaHelper.CONTENT_CLASS, SchemaHelper.resolveContentClass(exchange, this.contentClass)); + } + } + + @Override + public FormatSchema resolve(Exchange exchange) { + AvroSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class); + if (answer == null) { + answer = computeIfAbsent(exchange); + } + + return answer; + } + + private AvroSchema computeIfAbsent(Exchange exchange) { + if (this.schema != null) { + return this.schema; + } + + AvroSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class); + + if (answer == null && exchange.getProperties().containsKey(SchemaHelper.SCHEMA)) { + String schemaJson = exchange.getProperty(SchemaHelper.SCHEMA, String.class); + Schema raw = new Schema.Parser().setValidate(validate).parse(schemaJson); + answer = new AvroSchema(raw); + } + + if (answer == null) { + String contentClass = SchemaHelper.resolveContentClass(exchange, this.contentClass); + if (contentClass != null) { + answer = this.schemes.computeIfAbsent(contentClass, t -> { + Resource res = PluginHelper.getResourceLoader(exchange.getContext()) + .resolveResource("classpath:schemas/" + SchemaType.AVRO.type() + "/" + t + "." + SchemaType.AVRO.type()); + + try { + if (res.exists()) { + try (InputStream is = res.getInputStream()) { + if (is != null) { + return Avro.MAPPER.schemaFrom(is); + } + } + } + } catch (Exception e) { + throw new RuntimeException( + "Unable to load Avro schema for type: " + t + ", resource: " + res.getLocation(), e); + } + + try { + return Avro.MAPPER.schemaFor(Class.forName(contentClass)); + } catch (JsonMappingException | ClassNotFoundException e) { + throw new RuntimeException( + "Unable to compute Avro schema for type: " + t, e); + } + }); + } + } + + if (answer != null) { + this.schema = answer; + } + + return answer; + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroStructDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroStructDataType.java new file mode 100644 index 000000000..c1997b049 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/avro/AvroStructDataType.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.avro; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.dataformat.avro.AvroSchema; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.kamelets.utils.format.MimeType; +import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; + +/** + * Data type uses Avro Jackson data format to unmarshal Exchange body to generic JsonNode. + * Uses given Avro schema from the Exchange properties when unmarshalling the payload (usually already resolved via schema + * resolver Kamelet action). + */ +@DataType(name = "avro-x-struct", mediaType = "application/x-struct") +public class AvroStructDataType implements DataTypeConverter { + + @Override + public void convert(Exchange exchange) { + AvroSchema schema = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, AvroSchema.class); + + if (schema == null) { + throw new CamelExecutionException("Missing proper avro schema for data type processing", exchange); + } + + try { + Object unmarshalled = Avro.MAPPER.reader().forType(JsonNode.class).with(schema) + .readValue(getBodyAsStream(exchange)); + exchange.getMessage().setBody(unmarshalled); + + exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.STRUCT.type()); + } catch (InvalidPayloadException | IOException e) { + throw new CamelExecutionException("Failed to apply Avro x-struct data type on exchange", exchange, e); + } + } + + private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException { + InputStream bodyStream = exchange.getMessage().getBody(InputStream.class); + + if (bodyStream == null) { + bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class)); + } + + return bodyStream; + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java index 471e569fd..0f6e979e0 100644 --- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputType.java @@ -27,12 +27,12 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.camel.CamelExecutionException; import org.apache.camel.Exchange; import org.apache.camel.component.aws2.ddb.Ddb2Constants; import org.apache.camel.component.aws2.ddb.Ddb2Operations; import org.apache.camel.component.jackson.JacksonDataFormat; +import org.apache.camel.kamelets.utils.format.converter.json.Json; import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; import software.amazon.awssdk.services.dynamodb.model.AttributeAction; @@ -77,10 +77,10 @@ * In case key and item attribute value maps are identical you can omit the special top level properties completely. The * converter will map the whole Json body as is then and use it as source for the attribute value map. */ -@DataType(scheme = "aws2-ddb", name = "json", mediaType = "application/json") +@DataType(scheme = "aws2-ddb", name = "application-json", mediaType = "application/json") public class Ddb2JsonInputType implements DataTypeConverter { - private final JacksonDataFormat dataFormat = new JacksonDataFormat(new ObjectMapper(), JsonNode.class); + private final JacksonDataFormat dataFormat = new JacksonDataFormat(Json.MAPPER, JsonNode.class); @Override public void convert(Exchange exchange) { diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java index d8847886a..241840e8e 100644 --- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputType.java @@ -29,7 +29,7 @@ * Output data type represents AWS S3 get object response as CloudEvent V1. The data type sets Camel specific * CloudEvent headers on the exchange. */ -@DataType(scheme = "aws2-s3", name = "cloudevents", mediaType = "application/octet-stream") +@DataType(scheme = "aws2-s3", name = "application-cloudevents", mediaType = "application/octet-stream") public class AWS2S3CloudEventOutputType implements DataTypeConverter { @Override diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/bytes/ByteArrayDataType.java similarity index 64% rename from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java rename to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/bytes/ByteArrayDataType.java index 532e998ba..cdacabbda 100644 --- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataType.java +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/bytes/ByteArrayDataType.java @@ -15,24 +15,27 @@ * limitations under the License. */ -package org.apache.camel.kamelets.utils.format.converter.standard; +package org.apache.camel.kamelets.utils.format.converter.bytes; import org.apache.camel.Exchange; import org.apache.camel.kamelets.utils.format.DefaultDataTypeConverter; +import org.apache.camel.kamelets.utils.format.MimeType; import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; /** - * Binary data type. + * Generic binary data type uses Camel message body converter mechanism to convert content to byte array representation. */ -@DataType(name = "binary", mediaType = "application/octet-stream") -public class BinaryDataType implements DataTypeConverter { +@DataType(name = "application-octet-stream", mediaType = "application/octet-stream") +public class ByteArrayDataType implements DataTypeConverter { - private static final DataTypeConverter DELEGATE = - new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "binary", "application/octet-stream", byte[].class); + private static final DataTypeConverter DELEGATE = new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "binary", + MimeType.BINARY.type(), byte[].class); @Override public void convert(Exchange exchange) { DELEGATE.convert(exchange); + + exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.BINARY.type()); } } diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java index bbad4637a..05e937ffb 100644 --- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputType.java @@ -30,7 +30,7 @@ * * By default, sets the Http content type header to application/json when not set explicitly. */ -@DataType(scheme = "http", name = "cloudevents", mediaType = "application/json") +@DataType(scheme = "http", name = "application-cloudevents", mediaType = "application/json") public class HttpCloudEventOutputType implements DataTypeConverter { @Override diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/Json.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/Json.java new file mode 100644 index 000000000..cbcbe57c9 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/Json.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.json; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public final class Json { + + public static final ObjectMapper MAPPER = new ObjectMapper(); + + private Json() { + // prevent instantiation of utility class + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonDataType.java new file mode 100644 index 000000000..42a20a2af --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonDataType.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.json; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Exchange; +import org.apache.camel.kamelets.utils.format.MimeType; +import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; + +/** + * Data type uses Jackson data format to marshal given Exchange payload to a Json (binary byte array representation). + * Requires Exchange payload as JsonNode representation. + */ +@DataType(name = "application-json", mediaType = "application/json") +public class JsonDataType implements DataTypeConverter { + + @Override + public void convert(Exchange exchange) { + try { + byte[] marshalled = Json.MAPPER.writer().forType(JsonNode.class).writeValueAsBytes(exchange.getMessage().getBody()); + exchange.getMessage().setBody(marshalled); + + exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.JSON.type()); + exchange.getMessage().setHeader(SchemaHelper.CONTENT_SCHEMA, + exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, "", String.class)); + } catch (JsonProcessingException e) { + throw new CamelExecutionException("Failed to apply Json output data type on exchange", exchange, e); + } + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonFormatSchema.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonFormatSchema.java new file mode 100644 index 000000000..cea76aa6b --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonFormatSchema.java @@ -0,0 +1,22 @@ +package org.apache.camel.kamelets.utils.format.converter.json; + +import com.fasterxml.jackson.core.FormatSchema; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.camel.kamelets.utils.format.SchemaType; + +public class JsonFormatSchema implements FormatSchema { + private final JsonNode schema; + + public JsonFormatSchema(JsonNode schema) { + this.schema = schema; + } + + @Override + public String getSchemaType() { + return SchemaType.JSON.type(); + } + + public JsonNode getSchema() { + return schema; + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonSchemaResolver.java new file mode 100644 index 000000000..74a42a33e --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonSchemaResolver.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.json; + +import java.io.InputStream; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.fasterxml.jackson.core.FormatSchema; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.component.jackson.SchemaResolver; +import org.apache.camel.kamelets.utils.format.SchemaType; +import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper; +import org.apache.camel.spi.Resource; +import org.apache.camel.support.PluginHelper; +import org.apache.camel.util.ObjectHelper; + +public class JsonSchemaResolver implements SchemaResolver, Processor { + private final ConcurrentMap schemes; + + private JsonNode schema; + private String contentClass; + + public JsonSchemaResolver() { + this.schemes = new ConcurrentHashMap<>(); + } + + public String getSchema() { + if (this.schema != null) { + try { + return Json.MAPPER.writeValueAsString(this.schema); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + return null; + } + + public void setSchema(String schema) { + if (ObjectHelper.isNotEmpty(schema)) { + try { + this.schema = Json.MAPPER.readTree(schema); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } else { + this.schema = null; + } + } + + public String getContentClass() { + return contentClass; + } + + public void setContentClass(String contentClass) { + if (ObjectHelper.isNotEmpty(contentClass)) { + this.contentClass = contentClass; + } else { + this.contentClass = null; + } + } + + @Override + public void process(Exchange exchange) throws Exception { + Object payload = exchange.getMessage().getBody(); + if (payload == null) { + return; + } + + JsonNode answer = computeIfAbsent(exchange); + + if (answer != null) { + exchange.setProperty(SchemaHelper.CONTENT_SCHEMA, answer); + exchange.setProperty(SchemaHelper.CONTENT_SCHEMA_TYPE, SchemaType.JSON.type()); + exchange.setProperty(SchemaHelper.CONTENT_CLASS, SchemaHelper.resolveContentClass(exchange, this.contentClass)); + } + } + + private JsonNode computeIfAbsent(Exchange exchange) { + if (this.schema != null) { + return this.schema; + } + + JsonNode answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, JsonNode.class); + + if (answer == null && exchange.getProperties().containsKey(SchemaHelper.SCHEMA)) { + String schemaJson = exchange.getProperty(SchemaHelper.SCHEMA, String.class); + try { + answer = Json.MAPPER.readTree(schemaJson); + } catch (JsonProcessingException e) { + throw new RuntimeException("Unable to load Json schema", e); + } + } + + if (answer == null) { + String contentClass = SchemaHelper.resolveContentClass(exchange, this.contentClass); + if (contentClass != null) { + answer = this.schemes.computeIfAbsent(contentClass, t -> { + Resource res = PluginHelper.getResourceLoader(exchange.getContext()) + .resolveResource("classpath:schemas/" + SchemaType.JSON.type() + "/" + t + "." + SchemaType.JSON.type()); + + try { + if (res.exists()) { + try (InputStream is = res.getInputStream()) { + if (is != null) { + return Json.MAPPER.readTree(is); + } + } + } + } catch (Exception e) { + throw new RuntimeException( + "Unable to load Json schema for type: " + t + ", resource: " + res.getLocation(), e); + } + + return null; + }); + } + } + + if (answer != null) { + this.schema = answer; + } + + return answer; + } + + @Override + public FormatSchema resolve(Exchange exchange) { + JsonNode answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, JsonNode.class); + if (answer == null) { + answer = computeIfAbsent(exchange); + } + + return new JsonFormatSchema(answer); + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonStructDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonStructDataType.java new file mode 100644 index 000000000..56543746b --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/json/JsonStructDataType.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.json; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.kamelets.utils.format.MimeType; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; + +/** + * Data type uses Jackson data format to unmarshal Exchange body to generic JsonNode representation. + */ +@DataType(name = "application-x-struct", mediaType = "application/x-struct") +public class JsonStructDataType implements DataTypeConverter { + + @Override + public void convert(Exchange exchange) { + try { + Object unmarshalled = Json.MAPPER.reader().forType(JsonNode.class).readValue(getBodyAsStream(exchange)); + exchange.getMessage().setBody(unmarshalled); + + exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.STRUCT.type()); + } catch (InvalidPayloadException | IOException e) { + throw new CamelExecutionException("Failed to apply Json input data type on exchange", exchange, e); + } + } + + private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException { + InputStream bodyStream = exchange.getMessage().getBody(InputStream.class); + + if (bodyStream == null) { + bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class)); + } + + return bodyStream; + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/pojo/JavaObjectDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/pojo/JavaObjectDataType.java new file mode 100644 index 000000000..1fcbc869a --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/pojo/JavaObjectDataType.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.pojo; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import com.fasterxml.jackson.core.FormatSchema; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Exchange; +import org.apache.camel.InvalidPayloadException; +import org.apache.camel.kamelets.utils.format.MimeType; +import org.apache.camel.kamelets.utils.format.SchemaType; +import org.apache.camel.kamelets.utils.format.converter.avro.Avro; +import org.apache.camel.kamelets.utils.format.converter.json.Json; +import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper; +import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; +import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; +import org.apache.camel.util.ObjectHelper; + +/** + * Data type able to unmarshal Exchange body to Java object. Supports both Avro and Json schema types and uses respective + * Jackson implementation for the unmarshal operation. + * Requires proper setting of content schema, class and schema type in Exchange properties + * (usually resolved via Avro or Json schema resolver Kamelet action). + */ +@DataType(name = "application-x-java-object", mediaType = "application/x-java-object") +public class JavaObjectDataType implements DataTypeConverter, CamelContextAware { + + private CamelContext camelContext; + + @Override + public void convert(Exchange exchange) { + ObjectHelper.notNull(camelContext, "camelContext"); + + FormatSchema schema = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, FormatSchema.class); + if (schema == null) { + throw new CamelExecutionException("Missing proper schema for Java object data type processing", exchange); + } + + String contentClass = SchemaHelper.resolveContentClass(exchange, null); + if (contentClass == null) { + throw new CamelExecutionException("Missing content class information for Java object data type processing", + exchange); + } + + SchemaType schemaType = SchemaType.of(exchange.getProperty(SchemaHelper.CONTENT_SCHEMA_TYPE, SchemaType.JSON.type(), String.class)); + + try { + Class contentType = camelContext.getClassResolver().resolveMandatoryClass(contentClass); + Object unmarshalled; + + if (schemaType == SchemaType.AVRO) { + unmarshalled = Avro.MAPPER.reader().forType(contentType).with(schema).readValue(getBodyAsStream(exchange)); + } else if (schemaType == SchemaType.JSON) { + unmarshalled = Json.MAPPER.reader().forType(contentType).with(schema).readValue(getBodyAsStream(exchange)); + } else { + throw new CamelExecutionException(String.format("Unsupported schema type '%s'", schemaType), exchange); + } + + exchange.getMessage().setBody(unmarshalled); + + exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.STRUCT.type()); + } catch (InvalidPayloadException | IOException | ClassNotFoundException e) { + throw new CamelExecutionException("Failed to apply Java object data type on exchange", exchange, e); + } + } + + private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException { + InputStream bodyStream = exchange.getMessage().getBody(InputStream.class); + + if (bodyStream == null) { + bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class)); + } + + return bodyStream; + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/Protobuf.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/Protobuf.java new file mode 100644 index 000000000..1bb5d1c18 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/Protobuf.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.protobuf; + +import com.fasterxml.jackson.dataformat.protobuf.ProtobufMapper; + +public final class Protobuf { + + public static final ProtobufMapper MAPPER = new ProtobufMapper(); + + private Protobuf() { + // prevent instantiation of utility class + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/ProtobufSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/ProtobufSchemaResolver.java new file mode 100644 index 000000000..9fba10912 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/protobuf/ProtobufSchemaResolver.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.protobuf; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.fasterxml.jackson.core.FormatSchema; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.dataformat.protobuf.schema.ProtobufSchema; +import com.fasterxml.jackson.dataformat.protobuf.schema.ProtobufSchemaLoader; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.jackson.SchemaResolver; +import org.apache.camel.kamelets.utils.format.SchemaType; +import org.apache.camel.kamelets.utils.format.converter.utils.SchemaHelper; +import org.apache.camel.spi.Resource; +import org.apache.camel.support.PluginHelper; +import org.apache.camel.util.ObjectHelper; + +public class ProtobufSchemaResolver implements SchemaResolver, Processor { + private final ConcurrentMap schemes; + + private ProtobufSchema schema; + private String contentClass; + + public ProtobufSchemaResolver() { + this.schemes = new ConcurrentHashMap<>(); + } + + public String getSchema() { + if (this.schema != null) { + return this.schema.getSource().toString(); + } + + return null; + } + + public void setSchema(String schema) { + if (ObjectHelper.isNotEmpty(schema)) { + try { + this.schema = ProtobufSchemaLoader.std.parse(schema); + } catch (IOException e) { + throw new RuntimeCamelException("Cannot parse protobuf schema", e); + } + } else { + this.schema = null; + } + } + + public String getContentClass() { + return contentClass; + } + + public void setContentClass(String contentClass) { + if (ObjectHelper.isNotEmpty(contentClass)) { + this.contentClass = contentClass; + } else { + this.contentClass = null; + } + } + + @Override + public void process(Exchange exchange) throws Exception { + Object payload = exchange.getMessage().getBody(); + if (payload == null) { + return; + } + + ProtobufSchema answer = computeIfAbsent(exchange); + + if (answer != null) { + exchange.setProperty(SchemaHelper.CONTENT_SCHEMA, answer); + exchange.setProperty(SchemaHelper.CONTENT_SCHEMA_TYPE, SchemaType.PROTOBUF.type()); + exchange.setProperty(SchemaHelper.CONTENT_CLASS, SchemaHelper.resolveContentClass(exchange, this.contentClass)); + } + } + + @Override + public FormatSchema resolve(Exchange exchange) { + ProtobufSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, ProtobufSchema.class); + if (answer == null) { + answer = computeIfAbsent(exchange); + } + + return answer; + } + + private ProtobufSchema computeIfAbsent(Exchange exchange) { + if (this.schema != null) { + return this.schema; + } + + ProtobufSchema answer = exchange.getProperty(SchemaHelper.CONTENT_SCHEMA, ProtobufSchema.class); + + if (answer == null && exchange.getProperties().containsKey(SchemaHelper.SCHEMA)) { + String schemaJson = exchange.getProperty(SchemaHelper.SCHEMA, String.class); + try { + answer = ProtobufSchemaLoader.std.parse(schemaJson); + } catch (IOException e) { + throw new RuntimeException("Unable to parse Protobuf schema", e); + } + } + + if (answer == null) { + String contentClass = SchemaHelper.resolveContentClass(exchange, this.contentClass); + if (contentClass != null) { + answer = this.schemes.computeIfAbsent(contentClass, t -> { + Resource res = PluginHelper.getResourceLoader(exchange.getContext()) + .resolveResource("classpath:schemas/" + SchemaType.AVRO.type() + "/" + t + "." + SchemaType.AVRO.type()); + + try { + if (res.exists()) { + try (InputStream is = res.getInputStream()) { + if (is != null) { + return Protobuf.MAPPER.schemaLoader().load(is); + } + } + } + } catch (Exception e) { + throw new RuntimeException( + "Unable to load Protobuf schema for type: " + t + ", resource: " + res.getLocation(), e); + } + + try { + return Protobuf.MAPPER.generateSchemaFor(Class.forName(contentClass)); + } catch (JsonMappingException | ClassNotFoundException e) { + throw new RuntimeException( + "Unable to compute Protobuf schema for type: " + t, e); + } + }); + } + } + + if (answer != null) { + this.schema = answer; + } + + return answer; + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java deleted file mode 100644 index 183f11123..000000000 --- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataType.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kamelets.utils.format.converter.standard; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.camel.CamelContext; -import org.apache.camel.CamelContextAware; -import org.apache.camel.CamelExecutionException; -import org.apache.camel.Exchange; -import org.apache.camel.InvalidPayloadException; -import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; -import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; -import org.apache.camel.util.ObjectHelper; - -/** - * Data type converter able to unmarshal to given unmarshalType using jackson data format. - *

- * Unmarshal type should be given as a fully qualified class name in the exchange properties. - */ -@DataType(name = "jsonObject", mediaType = "application/json") -public class JsonModelDataType implements DataTypeConverter, CamelContextAware { - - public static final String DATA_TYPE_MODEL_PROPERTY = "CamelDataTypeModel"; - - private String model; - - private CamelContext camelContext; - - private static final ObjectMapper mapper = new ObjectMapper(); - - @Override - public void convert(Exchange exchange) { - String type; - if (exchange.hasProperties() && exchange.getProperties().containsKey(DATA_TYPE_MODEL_PROPERTY)) { - type = exchange.getProperty(DATA_TYPE_MODEL_PROPERTY, String.class); - } else { - type = model; - } - - if (type == null) { - // neither model property nor exchange property defines proper type - do nothing - return; - } - - ObjectHelper.notNull(camelContext, "camelContext"); - - try { - Object unmarshalled = mapper.reader().forType(camelContext.getClassResolver().resolveMandatoryClass(type)).readValue(getBodyAsStream(exchange)); - exchange.getMessage().setBody(unmarshalled); - } catch (Exception e) { - throw new CamelExecutionException( - String.format("Failed to load Json unmarshalling type '%s'", type), exchange, e); - } - } - - private InputStream getBodyAsStream(Exchange exchange) throws InvalidPayloadException { - InputStream bodyStream = exchange.getMessage().getBody(InputStream.class); - - if (bodyStream == null) { - bodyStream = new ByteArrayInputStream(exchange.getMessage().getMandatoryBody(byte[].class)); - } - - return bodyStream; - } - - @Override - public CamelContext getCamelContext() { - return camelContext; - } - - public void setModel(String model) { - this.model = model; - } - - @Override - public void setCamelContext(CamelContext camelContext) { - this.camelContext = camelContext; - } -} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/text/StringDataType.java similarity index 60% rename from library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java rename to library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/text/StringDataType.java index d60b2aaa9..9569e1d17 100644 --- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataType.java +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/text/StringDataType.java @@ -15,24 +15,33 @@ * limitations under the License. */ -package org.apache.camel.kamelets.utils.format.converter.standard; +package org.apache.camel.kamelets.utils.format.converter.text; + +import java.nio.charset.StandardCharsets; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePropertyKey; import org.apache.camel.kamelets.utils.format.DefaultDataTypeConverter; +import org.apache.camel.kamelets.utils.format.MimeType; import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; import org.apache.camel.kamelets.utils.format.spi.annotations.DataType; /** - * String data type. + * Generic String data type converts Exchange payload to String representation using the Camel message body converter mechanism. + * By default, uses UTF-8 charset as encoding. */ -@DataType(name = "string", mediaType = "text/plain") +@DataType(name = "text-plain", mediaType = "text/plain") public class StringDataType implements DataTypeConverter { - private static final DataTypeConverter DELEGATE = - new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "string", "text/plain", String.class); + private static final DataTypeConverter DELEGATE = new DefaultDataTypeConverter(DataType.DEFAULT_SCHEME, "string", + MimeType.TEXT.type(), String.class); @Override public void convert(Exchange exchange) { + exchange.setProperty(ExchangePropertyKey.CHARSET_NAME, StandardCharsets.UTF_8.name()); + DELEGATE.convert(exchange); + + exchange.getMessage().setHeader(Exchange.CONTENT_TYPE, MimeType.TEXT.type()); } } diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/PojoHelper.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/PojoHelper.java new file mode 100644 index 000000000..17f222801 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/PojoHelper.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.converter.utils; + +import java.util.Objects; + +import org.apache.camel.Exchange; + +public final class PojoHelper { + private PojoHelper() { + } + + public static boolean isString(Class type) { + return String.class.isAssignableFrom(type); + } + + public static boolean isNumber(Class type) { + return Number.class.isAssignableFrom(type) + || int.class.isAssignableFrom(type) + || long.class.isAssignableFrom(type) + || short.class.isAssignableFrom(type) + || char.class.isAssignableFrom(type) + || float.class.isAssignableFrom(type) + || double.class.isAssignableFrom(type); + } + + public static boolean isPrimitive(Class type) { + return type.isPrimitive() + || (type.isArray() && type.getComponentType().isPrimitive()) + || char.class.isAssignableFrom(type) || Character.class.isAssignableFrom(type) + || byte.class.isAssignableFrom(type) || Byte.class.isAssignableFrom(type) + || boolean.class.isAssignableFrom(type) || Boolean.class.isAssignableFrom(type); + } + + public static boolean isPojo(Class type) { + Package pkg = type.getPackage(); + if (pkg != null) { + if (pkg.getName().startsWith("java") + || pkg.getName().startsWith("javax") + || pkg.getName().startsWith("com.sun") + || pkg.getName().startsWith("com.oracle")) { + return false; + } + } + + if (isNumber(type)) { + return false; + } + if (isPrimitive(type)) { + return false; + } + if (isString(type)) { + return false; + } + + return true; + } + + public static boolean hasProperty(Exchange exchange, String name) { + return exchange.getProperties().containsKey(name); + } + + public static boolean hasProperty(Exchange exchange, String name, Object value) { + return Objects.equals( + value, + exchange.getProperty(name, value.getClass())); + } + + public static boolean hasHeader(Exchange exchange, String name, Object value) { + return Objects.equals( + value, + exchange.getMessage().getHeader(name, value.getClass())); + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/SchemaHelper.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/SchemaHelper.java new file mode 100644 index 000000000..87dea3f69 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/converter/utils/SchemaHelper.java @@ -0,0 +1,34 @@ +package org.apache.camel.kamelets.utils.format.converter.utils; + +import org.apache.camel.Exchange; + +public class SchemaHelper { + + public static final String SCHEMA = "schema"; + public static final String VALIDATE = "validate"; + public static final String CONTENT_SCHEMA = "X-Content-Schema"; + public static final String CONTENT_SCHEMA_TYPE = "X-Content-Schema-Type"; + public static final String CONTENT_CLASS = "X-Content-Class"; + + private SchemaHelper() { + } + + /** + * Helper resolves content class from exchange properties and as a fallback tries to retrieve the content class + * from the payload body type. + * @param exchange the Camel exchange eventually holding content class information in its properties. + * @param fallback the fallback content class information when no exchange property is set. + * @return the content class as String representation. + */ + public static String resolveContentClass(Exchange exchange, String fallback) { + String contentClass = exchange.getProperty(CONTENT_CLASS, fallback, String.class); + if (contentClass == null) { + Object payload = exchange.getMessage().getBody(); + if (payload != null && PojoHelper.isPojo(payload.getClass())) { + contentClass = payload.getClass().getName(); + } + } + + return contentClass; + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/schema/DelegatingSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/schema/DelegatingSchemaResolver.java new file mode 100644 index 000000000..26b541ee8 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/format/schema/DelegatingSchemaResolver.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kamelets.utils.format.schema; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.kamelets.utils.format.MimeType; +import org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver; +import org.apache.camel.kamelets.utils.format.converter.json.JsonSchemaResolver; +import org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver; +import org.apache.camel.util.ObjectHelper; + +/** + * Schema resolver processor delegates to either Avro or Json schema resolver based on the given mimetype property. + * When mimetype is of type application/x-java-object uses additional target mimetype (usually the produces mimetype) to + * determine the schema resolver (Avro or Json). + * Delegates to schema resolver and sets proper content class and schema properties on the delegate. + */ +public class DelegatingSchemaResolver implements Processor { + private String mimeType; + private String targetMimeType; + + private String schema; + private String contentClass; + + @Override + public void process(Exchange exchange) throws Exception { + if (ObjectHelper.isEmpty(mimeType)) { + return; + } + + MimeType mimeType = MimeType.of(this.mimeType); + Processor resolver; + if (mimeType.equals(MimeType.JAVA_OBJECT)) { + if (ObjectHelper.isEmpty(targetMimeType)) { + return; + } + resolver = fromMimeType(MimeType.of(targetMimeType)); + } else { + resolver = fromMimeType(mimeType); + } + + if (resolver != null) { + resolver.process(exchange); + } + } + + private Processor fromMimeType(MimeType mimeType) { + switch (mimeType) { + case PROTOBUF: + ProtobufSchemaResolver protobufSchemaResolver = new ProtobufSchemaResolver(); + protobufSchemaResolver.setSchema(this.schema); + protobufSchemaResolver.setContentClass(this.contentClass); + return protobufSchemaResolver; + case AVRO: + case AVRO_BINARY: + case AVRO_STRUCT: + AvroSchemaResolver avroSchemaResolver = new AvroSchemaResolver(); + avroSchemaResolver.setSchema(this.schema); + avroSchemaResolver.setContentClass(this.contentClass); + return avroSchemaResolver; + case JSON: + case STRUCT: + JsonSchemaResolver jsonSchemaResolver = new JsonSchemaResolver(); + jsonSchemaResolver.setSchema(this.schema); + jsonSchemaResolver.setContentClass(this.contentClass); + return jsonSchemaResolver; + default: + return null; + } + } + + public String getMimeType() { + return mimeType; + } + + public void setMimeType(String mimeType) { + this.mimeType = mimeType; + } + + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } + + public String getContentClass() { + return contentClass; + } + + public void setContentClass(String contentClass) { + this.contentClass = contentClass; + } + + public String getTargetMimeType() { + return targetMimeType; + } + + public void setTargetMimeType(String targetMimeType) { + this.targetMimeType = targetMimeType; + } +} diff --git a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightProtobufSchemaResolver.java b/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightProtobufSchemaResolver.java deleted file mode 100644 index 4d09f9d5e..000000000 --- a/library/camel-kamelets-utils/src/main/java/org/apache/camel/kamelets/utils/serialization/InflightProtobufSchemaResolver.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.kamelets.utils.serialization; - -import java.io.IOException; - -import com.fasterxml.jackson.core.FormatSchema; -import com.fasterxml.jackson.dataformat.protobuf.schema.ProtobufSchema; -import com.fasterxml.jackson.dataformat.protobuf.schema.ProtobufSchemaLoader; - -import org.apache.camel.Exchange; -import org.apache.camel.RuntimeCamelException; -import org.apache.camel.component.jackson.SchemaResolver; - -public class InflightProtobufSchemaResolver implements SchemaResolver { - - @Override - public FormatSchema resolve(Exchange exchange) { - String schemaStr = (String) exchange.getProperty("schema"); - try { - ProtobufSchema schema = ProtobufSchemaLoader.std.parse(schemaStr); - return schema; - } catch(IOException e) { - throw new RuntimeCamelException("Cannot parse protobuf schema", e); - } - } -} diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter index 1cd0ed789..96cc4b098 100644 --- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter +++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/DataTypeConverter @@ -15,7 +15,11 @@ # limitations under the License. # -org.apache.camel.kamelets.utils.format.converter.standard +org.apache.camel.kamelets.utils.format.converter.bytes +org.apache.camel.kamelets.utils.format.converter.text +org.apache.camel.kamelets.utils.format.converter.pojo +org.apache.camel.kamelets.utils.format.converter.avro +org.apache.camel.kamelets.utils.format.converter.json org.apache.camel.kamelets.utils.format.converter.aws2.s3 org.apache.camel.kamelets.utils.format.converter.aws2.ddb org.apache.camel.kamelets.utils.format.converter.http diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-json b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-application-json similarity index 100% rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-json rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-ddb-application-json diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-cloudevents b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-application-cloudevents similarity index 100% rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-cloudevents rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/aws2-s3-application-cloudevents diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-json similarity index 90% rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-json index edf9a4ca6..18aebea05 100644 --- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-binary +++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-json @@ -15,4 +15,4 @@ # limitations under the License. # -class=org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType \ No newline at end of file +class=org.apache.camel.kamelets.utils.format.converter.json.JsonDataType diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-octet-stream similarity index 90% rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-octet-stream index 2f725f6aa..520b0341c 100644 --- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject +++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-octet-stream @@ -15,4 +15,4 @@ # limitations under the License. # -class=org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType \ No newline at end of file +class=org.apache.camel.kamelets.utils.format.converter.bytes.ByteArrayDataType diff --git a/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-java-object similarity index 90% rename from library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-java-object index 2f725f6aa..3f8079835 100644 --- a/library/camel-kamelets-utils/src/test/resources/META-INF/services/org/apache/camel/datatype/converter/camel-jsonObject +++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-java-object @@ -15,4 +15,4 @@ # limitations under the License. # -class=org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType \ No newline at end of file +class=org.apache.camel.kamelets.utils.format.converter.pojo.JavaObjectDataType diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-struct b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-struct new file mode 100644 index 000000000..fef2aab83 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-application-x-struct @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.kamelets.utils.format.converter.json.JsonStructDataType diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-binary b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-binary new file mode 100644 index 000000000..69fc53091 --- /dev/null +++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-binary @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.kamelets.utils.format.converter.avro.AvroBinaryDataType diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-x-struct b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-x-struct new file mode 100644 index 000000000..070e3147c --- /dev/null +++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-avro-x-struct @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.kamelets.utils.format.converter.avro.AvroStructDataType diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-string b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-plain-text similarity index 90% rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-string rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-plain-text index 8ef257256..3233f4272 100644 --- a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-string +++ b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/camel-plain-text @@ -15,4 +15,4 @@ # limitations under the License. # -class=org.apache.camel.kamelets.utils.format.converter.standard.StringDataType \ No newline at end of file +class=org.apache.camel.kamelets.utils.format.converter.text.StringDataType diff --git a/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-cloudevents b/library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-application-cloudevents similarity index 100% rename from library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-cloudevents rename to library/camel-kamelets-utils/src/main/resources/META-INF/services/org/apache/camel/datatype/converter/http-application-cloudevents diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java index b281f3143..341ad05df 100644 --- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeConverterResolverTest.java @@ -21,7 +21,7 @@ import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType; +import org.apache.camel.kamelets.utils.format.converter.json.JsonStructDataType; import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -49,9 +49,9 @@ public void shouldHandleUnresolvableDataTypeConverters() throws Exception { @Test public void shouldResolveDataTypeConverters() throws Exception { - Optional converter = resolver.resolve("jsonObject", camelContext); + Optional converter = resolver.resolve("application-x-struct", camelContext); Assertions.assertTrue(converter.isPresent()); - Assertions.assertEquals(JsonModelDataType.class, converter.get().getClass()); + Assertions.assertEquals(JsonStructDataType.class, converter.get().getClass()); converter = resolver.resolve("foo", "json", camelContext); Assertions.assertTrue(converter.isPresent()); @@ -73,4 +73,4 @@ public String getName() { return "foo"; } } -} \ No newline at end of file +} diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java index d83c474b2..9061c2942 100644 --- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/DefaultDataTypeRegistryTest.java @@ -21,9 +21,8 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType; -import org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType; -import org.apache.camel.kamelets.utils.format.converter.standard.StringDataType; +import org.apache.camel.kamelets.utils.format.converter.bytes.ByteArrayDataType; +import org.apache.camel.kamelets.utils.format.converter.text.StringDataType; import org.apache.camel.kamelets.utils.format.converter.test.UppercaseDataType; import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; import org.junit.jupiter.api.Assertions; @@ -41,15 +40,12 @@ void setup() { @Test public void shouldLookupDefaultDataTypeConverters() throws Exception { - Optional converter = dataTypeRegistry.lookup( "jsonObject"); - Assertions.assertTrue(converter.isPresent()); - Assertions.assertEquals(JsonModelDataType.class, converter.get().getClass()); - converter = dataTypeRegistry.lookup( "string"); + Optional converter = dataTypeRegistry.lookup( "plain-text"); Assertions.assertTrue(converter.isPresent()); Assertions.assertEquals(StringDataType.class, converter.get().getClass()); - converter = dataTypeRegistry.lookup( "binary"); + converter = dataTypeRegistry.lookup( "application-octet-stream"); Assertions.assertTrue(converter.isPresent()); - Assertions.assertEquals(BinaryDataType.class, converter.get().getClass()); + Assertions.assertEquals(ByteArrayDataType.class, converter.get().getClass()); converter = dataTypeRegistry.lookup( "lowercase"); Assertions.assertTrue(converter.isPresent()); converter = dataTypeRegistry.lookup( "uppercase"); @@ -57,4 +53,4 @@ public void shouldLookupDefaultDataTypeConverters() throws Exception { Assertions.assertEquals(UppercaseDataType.class, converter.get().getClass()); } -} \ No newline at end of file +} diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java index 7f1f9e9fc..6d2fee236 100644 --- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/ddb/Ddb2JsonInputTypeTest.java @@ -20,7 +20,6 @@ import java.util.Map; import java.util.Optional; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.camel.CamelContextAware; import org.apache.camel.CamelExecutionException; import org.apache.camel.Exchange; @@ -28,6 +27,7 @@ import org.apache.camel.component.aws2.ddb.Ddb2Operations; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry; +import org.apache.camel.kamelets.utils.format.converter.json.Json; import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; import org.apache.camel.support.DefaultExchange; import org.junit.jupiter.api.Assertions; @@ -42,8 +42,6 @@ public class Ddb2JsonInputTypeTest { private DefaultCamelContext camelContext; - private final ObjectMapper mapper = new ObjectMapper(); - private final Ddb2JsonInputType inputType = new Ddb2JsonInputType(); private final String keyJson = "{" + @@ -69,7 +67,7 @@ void setup() { void shouldMapPutItemHeaders() throws Exception { Exchange exchange = new DefaultExchange(camelContext); - exchange.getMessage().setBody(mapper.readTree(itemJson)); + exchange.getMessage().setBody(Json.MAPPER.readTree(itemJson)); exchange.setProperty("operation", Ddb2Operations.PutItem.name()); inputType.convert(exchange); @@ -85,7 +83,7 @@ void shouldMapPutItemHeaders() throws Exception { void shouldMapUpdateItemHeaders() throws Exception { Exchange exchange = new DefaultExchange(camelContext); - exchange.getMessage().setBody(mapper.readTree("{\"operation\": \"" + Ddb2Operations.UpdateItem.name() + "\", \"key\": " + exchange.getMessage().setBody(Json.MAPPER.readTree("{\"operation\": \"" + Ddb2Operations.UpdateItem.name() + "\", \"key\": " + keyJson + ", \"item\": " + itemJson + "}")); inputType.convert(exchange); @@ -106,7 +104,7 @@ void shouldMapUpdateItemHeaders() throws Exception { void shouldMapDeleteItemHeaders() throws Exception { Exchange exchange = new DefaultExchange(camelContext); - exchange.getMessage().setBody(mapper.readTree("{\"key\": " + keyJson + "}")); + exchange.getMessage().setBody(Json.MAPPER.readTree("{\"key\": " + keyJson + "}")); exchange.setProperty("operation", Ddb2Operations.DeleteItem.name()); inputType.convert(exchange); @@ -125,7 +123,7 @@ void shouldMapDeleteItemHeaders() throws Exception { void shouldMapNestedObjects() throws Exception { Exchange exchange = new DefaultExchange(camelContext); - exchange.getMessage().setBody(mapper.readTree("{\"user\":" + itemJson + "}")); + exchange.getMessage().setBody(Json.MAPPER.readTree("{\"user\":" + itemJson + "}")); exchange.setProperty("operation", Ddb2Operations.PutItem.name()); inputType.convert(exchange); @@ -176,7 +174,7 @@ void shouldFailForWrongBodyType() throws Exception { void shouldFailForUnsupportedOperation() throws Exception { Exchange exchange = new DefaultExchange(camelContext); - exchange.getMessage().setBody(mapper.readTree("{}")); + exchange.getMessage().setBody(Json.MAPPER.readTree("{}")); exchange.setProperty("operation", Ddb2Operations.BatchGetItems.name()); Assertions.assertThrows(UnsupportedOperationException.class, () -> inputType.convert(exchange)); @@ -186,7 +184,7 @@ void shouldFailForUnsupportedOperation() throws Exception { public void shouldLookupDataType() throws Exception { DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry(); CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext); - Optional converter = dataTypeRegistry.lookup("aws2-ddb", "json"); + Optional converter = dataTypeRegistry.lookup("aws2-ddb", "application-json"); Assertions.assertTrue(converter.isPresent()); } diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java index 53570dd92..f5d8a082a 100644 --- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/aws2/s3/AWS2S3CloudEventOutputTypeTest.java @@ -62,7 +62,7 @@ void shouldMapToCloudEvent() throws Exception { public void shouldLookupDataType() throws Exception { DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry(); CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext); - Optional converter = dataTypeRegistry.lookup("aws2-s3", "cloudevents"); + Optional converter = dataTypeRegistry.lookup("aws2-s3", "application-cloudevents"); Assertions.assertTrue(converter.isPresent()); } } diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java index 01e331e62..0e73cb0a3 100644 --- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/http/HttpCloudEventOutputTypeTest.java @@ -67,7 +67,7 @@ void shouldMapToHttpCloudEvent() throws Exception { public void shouldLookupDataType() throws Exception { DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry(); CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext); - Optional converter = dataTypeRegistry.lookup("http", "cloudevents"); + Optional converter = dataTypeRegistry.lookup("http", "application-cloudevents"); Assertions.assertTrue(converter.isPresent()); } } diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java deleted file mode 100644 index 7785017c6..000000000 --- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/JsonModelDataTypeTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kamelets.utils.format.converter.standard; - -import java.util.Optional; - -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.camel.CamelContextAware; -import org.apache.camel.Exchange; -import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry; -import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; -import org.apache.camel.support.DefaultExchange; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class JsonModelDataTypeTest { - - private final DefaultCamelContext camelContext = new DefaultCamelContext(); - - private final JsonModelDataType dataType = new JsonModelDataType(); - - @BeforeEach - public void setup() { - dataType.setCamelContext(camelContext); - } - - @Test - void shouldMapStringToJsonModelWithModelProperty() throws Exception { - Exchange exchange = new DefaultExchange(camelContext); - - exchange.getMessage().setBody("{ \"name\": \"Rajesh\", \"age\": 28}"); - dataType.setModel(Person.class.getName()); - dataType.convert(exchange); - - assertEquals(Person.class, exchange.getMessage().getBody().getClass()); - assertEquals("Rajesh", exchange.getMessage().getBody(Person.class).getName()); - } - - @Test - void shouldMapStringToJsonModelWithExchangeProperty() throws Exception { - Exchange exchange = new DefaultExchange(camelContext); - - exchange.setProperty(JsonModelDataType.DATA_TYPE_MODEL_PROPERTY, Person.class.getName()); - exchange.getMessage().setBody("{ \"name\": \"Sheldon\", \"age\": 29}"); - dataType.convert(exchange); - - assertEquals(Person.class, exchange.getMessage().getBody().getClass()); - assertEquals("Sheldon", exchange.getMessage().getBody(Person.class).getName()); - } - - @Test - public void shouldLookupDataType() throws Exception { - DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry(); - CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext); - Optional converter = dataTypeRegistry.lookup("jsonObject"); - Assertions.assertTrue(converter.isPresent()); - } - - public static class Person { - @JsonProperty - private String name; - - @JsonProperty - private Long age; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public Long getAge() { - return age; - } - - public void setAge(Long age) { - this.age = age; - } - } - -} \ No newline at end of file diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/bytes/ByteArrayDataTypeTest.java similarity index 94% rename from library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataTypeTest.java rename to library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/bytes/ByteArrayDataTypeTest.java index d2dd616a2..09ba2b934 100644 --- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/BinaryDataTypeTest.java +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/bytes/ByteArrayDataTypeTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.kamelets.utils.format.converter.standard; +package org.apache.camel.kamelets.utils.format.converter.standard.bytes; import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; @@ -24,6 +24,7 @@ import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry; +import org.apache.camel.kamelets.utils.format.converter.bytes.ByteArrayDataType; import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; import org.apache.camel.support.DefaultExchange; import org.junit.jupiter.api.Assertions; @@ -31,11 +32,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -public class BinaryDataTypeTest { +public class ByteArrayDataTypeTest { private final DefaultCamelContext camelContext = new DefaultCamelContext(); - private final BinaryDataType dataType = new BinaryDataType(); + private final ByteArrayDataType dataType = new ByteArrayDataType(); @Test void shouldRetainBytesModel() throws Exception { @@ -89,7 +90,7 @@ void shouldMapFromInputStreamToBytesModel() throws Exception { public void shouldLookupDataType() throws Exception { DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry(); CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext); - Optional converter = dataTypeRegistry.lookup( "binary"); + Optional converter = dataTypeRegistry.lookup( "application-octet-stream"); Assertions.assertTrue(converter.isPresent()); } diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataTypeTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/text/StringDataTypeTest.java similarity index 97% rename from library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataTypeTest.java rename to library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/text/StringDataTypeTest.java index 8ee19cbab..32a9b569e 100644 --- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/StringDataTypeTest.java +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/format/converter/standard/text/StringDataTypeTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.kamelets.utils.format.converter.standard; +package org.apache.camel.kamelets.utils.format.converter.standard.text; import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; @@ -24,6 +24,7 @@ import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.kamelets.utils.format.DefaultDataTypeRegistry; +import org.apache.camel.kamelets.utils.format.converter.text.StringDataType; import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter; import org.apache.camel.support.DefaultExchange; import org.junit.jupiter.api.Assertions; @@ -77,7 +78,7 @@ void shouldMapFromInputStreamToStringModel() throws Exception { public void shouldLookupDataType() throws Exception { DefaultDataTypeRegistry dataTypeRegistry = new DefaultDataTypeRegistry(); CamelContextAware.trySetCamelContext(dataTypeRegistry, camelContext); - Optional converter = dataTypeRegistry.lookup( "string"); + Optional converter = dataTypeRegistry.lookup( "plain-text"); Assertions.assertTrue(converter.isPresent()); } diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java index af6c92f24..ca675a7ee 100644 --- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DeDuplicateHeadersTest.java @@ -16,7 +16,6 @@ */ package org.apache.camel.kamelets.utils.headers; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.support.DefaultExchange; @@ -28,15 +27,13 @@ class DeDuplicateHeadersTest { private DefaultCamelContext camelContext; - private final ObjectMapper mapper = new ObjectMapper(); - private DeDuplicateNamingHeaders processor; @BeforeEach void setup() { camelContext = new DefaultCamelContext(); } - + @Test void shouldDuplicateHeaders() throws Exception { Exchange exchange = new DefaultExchange(camelContext); @@ -56,7 +53,7 @@ void shouldDuplicateHeaders() throws Exception { Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("kafka.OVERRIDE_TOPIC")); Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("my-header")); } - + @Test void shouldDuplicateSelectedHeaders() throws Exception { Exchange exchange = new DefaultExchange(camelContext); @@ -78,7 +75,7 @@ void shouldDuplicateSelectedHeaders() throws Exception { Assertions.assertFalse(exchange.getMessage().getHeaders().containsKey("kafka.OVERRIDE_TOPIC")); Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("my-header")); } - + @Test void shouldDeDuplicateSelectedHeaders() throws Exception { Exchange exchange = new DefaultExchange(camelContext); diff --git a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java index fdff94014..3cb3ca877 100644 --- a/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java +++ b/library/camel-kamelets-utils/src/test/java/org/apache/camel/kamelets/utils/headers/DuplicateHeadersTest.java @@ -16,10 +16,6 @@ */ package org.apache.camel.kamelets.utils.headers; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.util.HashMap; - import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.support.DefaultExchange; @@ -31,15 +27,13 @@ class DuplicateHeadersTest { private DefaultCamelContext camelContext; - private final ObjectMapper mapper = new ObjectMapper(); - private DuplicateNamingHeaders processor; @BeforeEach void setup() { camelContext = new DefaultCamelContext(); } - + @Test void shouldDuplicateHeaders() throws Exception { Exchange exchange = new DefaultExchange(camelContext); @@ -57,7 +51,7 @@ void shouldDuplicateHeaders() throws Exception { Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("aws.s3.bucket.name")); Assertions.assertTrue(exchange.getMessage().getHeaders().containsKey("my-header")); } - + @Test void shouldDuplicateSelectedHeaders() throws Exception { Exchange exchange = new DefaultExchange(camelContext); diff --git a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml index 3bec55f78..ebff66050 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/avro-deserialize-action.kamelet.yaml @@ -33,8 +33,6 @@ spec: title: "Avro Deserialize Action" description: "Deserialize payload to Avro" type: object - required: - - schema properties: schema: title: Schema @@ -54,23 +52,21 @@ spec: - "camel:core" - "camel:jackson-avro" template: + beans: + - name: schemaResolver + type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver" + property: + - key: validate + value: '{{validate}}' + - key: schema + value: '{{schema:}}' from: uri: kamelet:source steps: - - set-property: - name: schema - constant: "{{schema}}" - - set-property: - name: validate - constant: "{{validate}}" - unmarshal: avro: library: Jackson unmarshalType: com.fasterxml.jackson.databind.JsonNode - schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver" - - remove-property: - name: schema - - remove-property: - name: validate + schemaResolver: "#bean:{{schemaResolver}}" - remove-header: name: "Content-Type" diff --git a/library/camel-kamelets/src/main/resources/kamelets/avro-serialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/avro-serialize-action.kamelet.yaml index 06830b277..93a50270a 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/avro-serialize-action.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/avro-serialize-action.kamelet.yaml @@ -33,8 +33,6 @@ spec: title: "Avro Serialize Action" description: "Serialize payload to Avro" type: object - required: - - schema properties: schema: title: Schema @@ -54,24 +52,22 @@ spec: - "camel:core" - "camel:jackson-avro" template: + beans: + - name: schemaResolver + type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver" + property: + - key: validate + value: '{{validate}}' + - key: schema + value: '{{schema:}}' from: uri: kamelet:source steps: - - set-property: - name: schema - constant: "{{schema}}" - - set-property: - name: validate - constant: "{{validate}}" - marshal: avro: library: Jackson unmarshalType: com.fasterxml.jackson.databind.JsonNode - schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver" - - remove-property: - name: schema - - remove-property: - name: validate + schemaResolver: "#bean:{{schemaResolver}}" - set-header: name: "Content-Type" constant: "application/avro" diff --git a/library/camel-kamelets/src/main/resources/kamelets/protobuf-deserialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/protobuf-deserialize-action.kamelet.yaml index bf06dc4fc..c0727a5f1 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/protobuf-deserialize-action.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/protobuf-deserialize-action.kamelet.yaml @@ -32,8 +32,6 @@ spec: title: "Protobuf Deserialize Action" description: "Deserialize payload to Protobuf" type: object - required: - - schema properties: schema: title: Schema @@ -46,18 +44,19 @@ spec: - "camel:core" - "camel:jackson-protobuf" template: + beans: + - name: schemaResolver + type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver" + property: + - key: schema + value: '{{schema:}}' from: uri: kamelet:source steps: - - set-property: - name: schema - constant: "{{schema}}" - unmarshal: protobuf: library: Jackson unmarshalType: com.fasterxml.jackson.databind.JsonNode - schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver" - - remove-property: - name: schema + schemaResolver: "#bean:{{schemaResolver}}" - remove-header: name: "Content-Type" diff --git a/library/camel-kamelets/src/main/resources/kamelets/protobuf-serialize-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/protobuf-serialize-action.kamelet.yaml index 56f321dbc..36218aeca 100644 --- a/library/camel-kamelets/src/main/resources/kamelets/protobuf-serialize-action.kamelet.yaml +++ b/library/camel-kamelets/src/main/resources/kamelets/protobuf-serialize-action.kamelet.yaml @@ -32,8 +32,6 @@ spec: title: "Protobuf Serialize Action" description: "Serialize payload to Protobuf" type: object - required: - - schema properties: schema: title: Schema @@ -46,19 +44,20 @@ spec: - "camel:core" - "camel:jackson-protobuf" template: + beans: + - name: schemaResolver + type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver" + property: + - key: schema + value: '{{schema:}}' from: uri: kamelet:source steps: - - set-property: - name: schema - constant: "{{schema}}" - marshal: protobuf: library: Jackson unmarshalType: com.fasterxml.jackson.databind.JsonNode - schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver" - - remove-property: - name: schema + schemaResolver: "#bean:{{schemaResolver}}" - set-header: name: "Content-Type" constant: "application/protobuf" diff --git a/library/camel-kamelets/src/main/resources/kamelets/resolve-pojo-schema-action.kamelet.yaml b/library/camel-kamelets/src/main/resources/kamelets/resolve-pojo-schema-action.kamelet.yaml new file mode 100644 index 000000000..3bafc0902 --- /dev/null +++ b/library/camel-kamelets/src/main/resources/kamelets/resolve-pojo-schema-action.kamelet.yaml @@ -0,0 +1,76 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: Kamelet +metadata: + name: resolve-pojo-schema-action + annotations: + camel.apache.org/kamelet.support.level: "Stable" + camel.apache.org/catalog.version: "4.0.0-SNAPSHOT" + camel.apache.org/kamelet.icon: "" + camel.apache.org/provider: "Apache Software Foundation" + camel.apache.org/kamelet.group: "Actions" + camel.apache.org/kamelet.namespace: "Transformation" + labels: + camel.apache.org/kamelet.type: "action" +spec: + definition: + title: "Resolve Schema Action" + description: "Resolves schema from given mime type and payload. Sets the resolved schema, the schema type and its content class as properties for later reference." + type: object + properties: + mimeType: + title: Mime Type + description: The mime type to determine the schema resolver implementation that should perform the operation. + type: string + default: "application/json" + example: "application/json" + schema: + title: Schema + description: Optional schema content (as single-line, using JSON format). + type: string + contentClass: + title: Content Class + description: Type information of the content object. Fully qualified class name. + type: string + example: "org.apache.camel.content.Foo" + targetMimeType: + title: Target Mime Type + description: Additional mime type information used to determine the schema resolver. Usually only used in combination with mime type "application/x-java-object" + type: string + example: "application/json" + dependencies: + - "mvn:org.apache.camel.kamelets:camel-kamelets-utils:4.0.0-SNAPSHOT" + - "camel:kamelet" + - "camel:core" + - "camel:jackson-avro" + - "camel:jackson-protobuf" + template: + beans: + - name: schemaResolver + type: "#class:org.apache.camel.kamelets.utils.format.schema.DelegatingSchemaResolver" + properties: + mimeType: '{{mimeType}}' + schema: '{{schema:}}' + contentClass: '{{contentClass:}}' + targetMimeType: '{{targetMimeType:}}' + from: + uri: "kamelet:source" + steps: + - process: + ref: "{{schemaResolver}}" diff --git a/test/avro-data-type/README.md b/test/avro-data-type/README.md new file mode 100644 index 000000000..dcdc29e40 --- /dev/null +++ b/test/avro-data-type/README.md @@ -0,0 +1,42 @@ +# Avro data type + +This test verifies the Avro data type serialization/deserialization + +## Objectives + +The test verifies the proper serialization and deserialization of Avro data types `avro/binary` and `avro/x-struct`. + +The test uses two KameletBindings that interact with each other. The first binding `json-to-avro` periodically creates a test data event as Json and applies the `avro/binary` data type using the schema in [User.avsc](User.avsc). + +The binary Avro data is then sent to a Http webhook sink that references an Http endpoint that is provided by the 2nd binding `avro-to-log`. The `avro-to-log` binding provides the Http service and deserializes the binary Avro data using the same User schema. The deserialized data is printed to the log output. + +The test starts both KameletBindings and is able to verify the proper log output as an expected outcome. + +### YAKS Test + +The test performs the following high level steps: + +*Avro data type feature* +- Create test data based on the User.avsc Avro schema +- Load and run the `avro-to-log` KameletBinding +- Load and run the `json-to-avro` KameletBinding +- Verify that the bindings do interact with each other and the proper test data is logged in the binding output + +## Installation + +The test assumes that you have [JBang](https://www.jbang.dev/) installed and the YAKS CLI setup locally. + +You can review the installation steps for the tooling in the documentation: + +- [JBang](https://www.jbang.dev/documentation/guide/latest/installation.html) +- [Install YAKS CLI](https://github.com/citrusframework/yaks#installation) + +## Run the tests with JBang + +To run tests with URI based configuration: + +```shell script +$ yaks run --local test/avro-data-type/avro-serdes-action.feature +``` + +You will be provided with the test log output and the test results. diff --git a/test/avro-data-type/User.avsc b/test/avro-data-type/User.avsc new file mode 100644 index 000000000..a2d21f3b3 --- /dev/null +++ b/test/avro-data-type/User.avsc @@ -0,0 +1,23 @@ +{ + "name": "User", + "type": "record", + "namespace": "demo.kamelets", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "firstname", + "type": "string" + }, + { + "name": "lastname", + "type": "string" + }, + { + "name": "age", + "type": "int" + }, + ] +} diff --git a/test/avro-data-type/avro-data-type.feature b/test/avro-data-type/avro-data-type.feature new file mode 100644 index 000000000..28765a9de --- /dev/null +++ b/test/avro-data-type/avro-data-type.feature @@ -0,0 +1,22 @@ +Feature: Avro data type + + Scenario: Create Kamelet bindings + Given variable uuid is "citrus:randomUUID()" + Given variable user is + """ + { "id": "${uuid}", "firstname": "Sheldon", "lastname": "Cooper", "age": 28 } + """ + # Create avro-to-log binding + When load KameletBinding avro-to-log-binding.yaml + Then Camel K integration avro-to-log-binding should be running + + # Create json-to-avro binding + When load KameletBinding json-to-avro-binding.yaml + Then Camel K integration json-to-avro-binding should be running + + # Verify output message sent + Then Camel K integration avro-to-log-binding should print Body: { "id" : "${uuid}", "firstname" : "Sheldon", "lastname" : "Cooper", "age" : 28} + + Scenario: Remove resources + Given delete KameletBinding avro-to-log-binding + Given delete KameletBinding json-to-avro-binding diff --git a/test/avro-data-type/avro-to-log-binding.yaml b/test/avro-data-type/avro-to-log-binding.yaml new file mode 100644 index 000000000..e84a816db --- /dev/null +++ b/test/avro-data-type/avro-to-log-binding.yaml @@ -0,0 +1,52 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: avro-to-log-binding +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: webhook-source + properties: + subpath: user + steps: + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: resolve-pojo-schema-action + properties: + mimeType: "avro/binary" + schema: > + { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] } + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: data-type-action + properties: + scheme: "camel" + format: "avro-x-struct" + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: log-action + properties: + showHeaders: true diff --git a/test/avro-data-type/json-to-avro-binding.yaml b/test/avro-data-type/json-to-avro-binding.yaml new file mode 100644 index 000000000..c57c692c9 --- /dev/null +++ b/test/avro-data-type/json-to-avro-binding.yaml @@ -0,0 +1,54 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: json-to-avro-binding +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: timer-source + properties: + period: 5000 + contentType: application/json + message: > + ${user} + steps: + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: json-deserialize-action + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: resolve-pojo-schema-action + properties: + mimeType: "avro/binary" + schema: > + { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] } + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: data-type-action + properties: + scheme: "camel" + format: "avro-binary" + sink: + uri: http://localhost:8080/user diff --git a/test/avro-data-type/yaks-config.yaml b/test/avro-data-type/yaks-config.yaml new file mode 100644 index 000000000..b57394464 --- /dev/null +++ b/test/avro-data-type/yaks-config.yaml @@ -0,0 +1,47 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +config: + namespace: + temporary: false + runtime: + env: + - name: YAKS_JBANG_KAMELETS_LOCAL_DIR + value: "../../../kamelets" + - name: YAKS_CAMELK_KAMELET_API_VERSION + value: v1alpha1 + - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_JBANG_CAMEL_DUMP_INTEGRATION_OUTPUT + value: true + settings: + loggers: + - name: INTEGRATION_STATUS + level: INFO + - name: INTEGRATION_LOGS + level: INFO + resources: + - json-to-avro-binding.yaml + - avro-to-log-binding.yaml + - User.avsc + dump: + enabled: true + failedOnly: true + includes: + - app=camel-k diff --git a/test/avro-serdes-action/README.md b/test/avro-serdes-action/README.md new file mode 100644 index 000000000..1a8b1b292 --- /dev/null +++ b/test/avro-serdes-action/README.md @@ -0,0 +1,42 @@ +# Avro serialization/deserialization + +This test verifies the Avro serialization/deserialization actions + +## Objectives + +The test verifies the proper Avro serialization and deserialization of Avro. + +The test uses two KameletBindings that interact with each other. The first binding `json-to-avro` periodically creates a test data event as Json and applies the `avro/binary` data type using the schema in [User.avsc](User.avsc). + +The binary Avro data is then sent to a Http webhook sink that references an Http endpoint that is provided by the 2nd binding `avro-to-log`. The `avro-to-log` binding provides the Http service and deserializes the binary Avro data using the same User schema. The deserialized data is printed to the log output. + +The test starts both KameletBindings and is able to verify the proper log output as an expected outcome. + +### YAKS Test + +The test performs the following high level steps: + +*Avro data type feature* +- Create test data based on the User.avsc Avro schema +- Load and run the `avro-to-log` KameletBinding +- Load and run the `json-to-avro` KameletBinding +- Verify that the bindings do interact with each other and the proper test data is logged in the binding output + +## Installation + +The test assumes that you have [JBang](https://www.jbang.dev/) installed and the YAKS CLI setup locally. + +You can review the installation steps for the tooling in the documentation: + +- [JBang](https://www.jbang.dev/documentation/guide/latest/installation.html) +- [Install YAKS CLI](https://github.com/citrusframework/yaks#installation) + +## Run the tests with JBang + +To run tests with URI based configuration: + +```shell script +$ yaks run --local test/avro-serdes-action/avro-serdes-action.feature +``` + +You will be provided with the test log output and the test results. diff --git a/test/avro-serdes-action/User.avsc b/test/avro-serdes-action/User.avsc new file mode 100644 index 000000000..a2d21f3b3 --- /dev/null +++ b/test/avro-serdes-action/User.avsc @@ -0,0 +1,23 @@ +{ + "name": "User", + "type": "record", + "namespace": "demo.kamelets", + "fields": [ + { + "name": "id", + "type": "string" + }, + { + "name": "firstname", + "type": "string" + }, + { + "name": "lastname", + "type": "string" + }, + { + "name": "age", + "type": "int" + }, + ] +} diff --git a/test/avro-serdes-action/avro-serdes-action.feature b/test/avro-serdes-action/avro-serdes-action.feature new file mode 100644 index 000000000..d09e24507 --- /dev/null +++ b/test/avro-serdes-action/avro-serdes-action.feature @@ -0,0 +1,22 @@ +Feature: Avro serialize/deserialize action + + Scenario: Create Kamelet bindings + Given variable uuid is "citrus:randomUUID()" + Given variable user is + """ + { "id": "${uuid}", "firstname": "Sheldon", "lastname": "Cooper", "age": 28 } + """ + # Create avro-to-log binding + When load KameletBinding avro-to-log-binding.yaml + Then Camel K integration avro-to-log-binding should be running + + # Create json-to-avro binding + When load KameletBinding json-to-avro-binding.yaml + Then Camel K integration json-to-avro-binding should be running + + # Verify output message sent + Then Camel K integration avro-to-log-binding should print Body: { "id" : "${uuid}", "firstname" : "Sheldon", "lastname" : "Cooper", "age" : 28} + + Scenario: Remove resources + Given delete KameletBinding avro-to-log-binding + Given delete KameletBinding json-to-avro-binding diff --git a/test/avro-serdes-action/avro-to-log-binding.yaml b/test/avro-serdes-action/avro-to-log-binding.yaml new file mode 100644 index 000000000..725393763 --- /dev/null +++ b/test/avro-serdes-action/avro-to-log-binding.yaml @@ -0,0 +1,44 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: avro-to-log-binding +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: webhook-source + properties: + subpath: user + steps: + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: avro-deserialize-action + properties: + schema: > + { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] } + sink: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: log-action + properties: + showHeaders: true diff --git a/test/avro-serdes-action/json-to-avro-binding.yaml b/test/avro-serdes-action/json-to-avro-binding.yaml new file mode 100644 index 000000000..be4388404 --- /dev/null +++ b/test/avro-serdes-action/json-to-avro-binding.yaml @@ -0,0 +1,52 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +apiVersion: camel.apache.org/v1alpha1 +kind: KameletBinding +metadata: + name: json-to-avro-binding +spec: + source: + ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: timer-source + properties: + period: 5000 + contentType: application/json + message: > + ${user} + steps: + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: json-deserialize-action + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: avro-serialize-action + properties: + schema: > + { "name": "User", "type": "record", "namespace": "demo.kamelets", "fields": [{ "name": "id", "type": "string" }, { "name": "firstname", "type": "string" }, { "name": "lastname", "type": "string" }, { "name": "age", "type": "int" }] } + - ref: + kind: Kamelet + apiVersion: camel.apache.org/v1alpha1 + name: log-action + properties: + showHeaders: true + sink: + uri: http://localhost:8080/user diff --git a/test/avro-serdes-action/yaks-config.yaml b/test/avro-serdes-action/yaks-config.yaml new file mode 100644 index 000000000..b57394464 --- /dev/null +++ b/test/avro-serdes-action/yaks-config.yaml @@ -0,0 +1,47 @@ +# --------------------------------------------------------------------------- +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# --------------------------------------------------------------------------- + +config: + namespace: + temporary: false + runtime: + env: + - name: YAKS_JBANG_KAMELETS_LOCAL_DIR + value: "../../../kamelets" + - name: YAKS_CAMELK_KAMELET_API_VERSION + value: v1alpha1 + - name: YAKS_CAMELK_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_KUBERNETES_AUTO_REMOVE_RESOURCES + value: false + - name: YAKS_JBANG_CAMEL_DUMP_INTEGRATION_OUTPUT + value: true + settings: + loggers: + - name: INTEGRATION_STATUS + level: INFO + - name: INTEGRATION_LOGS + level: INFO + resources: + - json-to-avro-binding.yaml + - avro-to-log-binding.yaml + - User.avsc + dump: + enabled: true + failedOnly: true + includes: + - app=camel-k diff --git a/test/aws-s3/aws-s3-knative-broker.feature b/test/aws-s3/aws-s3-knative-broker.feature index fe935dc7d..719f91d2f 100644 --- a/test/aws-s3/aws-s3-knative-broker.feature +++ b/test/aws-s3/aws-s3-knative-broker.feature @@ -5,7 +5,7 @@ Feature: AWS S3 Kamelet - Knative broker binding Given Knative event consumer timeout is 20000 ms Given variables | aws.s3.scheme | camel | - | aws.s3.format | string | + | aws.s3.format | plain-text | | aws.s3.bucketNameOrArn | mybucket | | aws.s3.message | Hello from S3 Kamelet | | aws.s3.key | hello.txt | diff --git a/test/aws-s3/aws-s3-knative-cloudevents.feature b/test/aws-s3/aws-s3-knative-cloudevents.feature index 647c8717c..0af416393 100644 --- a/test/aws-s3/aws-s3-knative-cloudevents.feature +++ b/test/aws-s3/aws-s3-knative-cloudevents.feature @@ -5,7 +5,7 @@ Feature: AWS S3 Kamelet - cloud events data type Given Knative event consumer timeout is 20000 ms Given variables | aws.s3.scheme | aws2-s3 | - | aws.s3.format | cloudevents | + | aws.s3.format | application-cloudevents | | aws.s3.bucketNameOrArn | mybucket | | aws.s3.message | Hello from S3 Kamelet | | aws.s3.key | hello.txt | diff --git a/test/aws-s3/aws-s3-to-http.yaml b/test/aws-s3/aws-s3-to-http.yaml index d0e2bd419..1a7b931d9 100644 --- a/test/aws-s3/aws-s3-to-http.yaml +++ b/test/aws-s3/aws-s3-to-http.yaml @@ -39,14 +39,14 @@ spec: name: data-type-action properties: scheme: "aws2-s3" - format: "cloudevents" + format: "application-cloudevents" - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 name: data-type-action properties: scheme: "http" - format: "cloudevents" + format: "application-cloudevents" - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1 diff --git a/test/data-type-action/data-type-action-binding.yaml b/test/data-type-action/data-type-action-binding.yaml index da02745c7..097195746 100644 --- a/test/data-type-action/data-type-action-binding.yaml +++ b/test/data-type-action/data-type-action-binding.yaml @@ -37,7 +37,7 @@ spec: name: data-type-action properties: scheme: "http" - format: "cloudevents" + format: "application-cloudevents" - ref: kind: Kamelet apiVersion: camel.apache.org/v1alpha1