Skip to content

Commit

Permalink
chore: Enhance Kamelet data type implementations
Browse files Browse the repository at this point in the history
- Refactor existing data type converter names to align with respective mime type (e.g. application/json -> application-json, plain/text -> plain-text)
- Add data type converter implementations for avro-binary, avro-x-struct, application-x-struct, application-json, application-x-java-object
- Enhance Avro/Protobuf schema resolver to resolve schemas from given exchange property, content class, schema file classpath reference or explicit schema content
- Add Json schema resolver
- Add resolve-pojo-schema-action.kamelet to resolve either Json, Avro or Protobuf schemas delegating to respective schema resolver implementations
- Use new schema resolver in avro/protobuf serialize/deserialize action Kamelets
  • Loading branch information
christophd committed May 17, 2023
1 parent 341564f commit a76b405
Show file tree
Hide file tree
Showing 64 changed files with 1,637 additions and 424 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/yaks-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ concurrency:
cancel-in-progress: true

env:
YAKS_VERSION: 0.14.2
YAKS_RUN_OPTIONS: "--timeout=15m --local -e YAKS_CAMELK_MAX_ATTEMPTS=10 -e YAKS_JBANG_CAMEL_VERSION=4.0.0-M1 -e YAKS_JBANG_KAMELETS_LOCAL_DIR=../../../kamelets"
CAMEL_VERSION: 4.0.0-M3
YAKS_VERSION: 0.15.0
YAKS_RUN_OPTIONS: "--timeout=15m --local -e YAKS_CAMELK_MAX_ATTEMPTS=10 -e YAKS_JBANG_CAMEL_VERSION=${CAMEL_VERSION} -e YAKS_JBANG_KAMELETS_LOCAL_DIR=../../../kamelets -e YAKS_CAMELK_KAMELET_API_VERSION=v1alpha1"

jobs:
test:
Expand Down
22 changes: 9 additions & 13 deletions kamelets/avro-deserialize-action.kamelet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ spec:
title: "Avro Deserialize Action"
description: "Deserialize payload to Avro"
type: object
required:
- schema
properties:
schema:
title: Schema
Expand All @@ -54,23 +52,21 @@ spec:
- "camel:core"
- "camel:jackson-avro"
template:
beans:
- name: schemaResolver
type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver"
property:
- key: validate
value: '{{validate}}'
- key: schema
value: '{{schema:}}'
from:
uri: kamelet:source
steps:
- set-property:
name: schema
constant: "{{schema}}"
- set-property:
name: validate
constant: "{{validate}}"
- unmarshal:
avro:
library: Jackson
unmarshalType: com.fasterxml.jackson.databind.JsonNode
schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
- remove-property:
name: schema
- remove-property:
name: validate
schemaResolver: "#bean:{{schemaResolver}}"
- remove-header:
name: "Content-Type"
22 changes: 9 additions & 13 deletions kamelets/avro-serialize-action.kamelet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ spec:
title: "Avro Serialize Action"
description: "Serialize payload to Avro"
type: object
required:
- schema
properties:
schema:
title: Schema
Expand All @@ -54,24 +52,22 @@ spec:
- "camel:core"
- "camel:jackson-avro"
template:
beans:
- name: schemaResolver
type: "#class:org.apache.camel.kamelets.utils.format.converter.avro.AvroSchemaResolver"
property:
- key: validate
value: '{{validate}}'
- key: schema
value: '{{schema:}}'
from:
uri: kamelet:source
steps:
- set-property:
name: schema
constant: "{{schema}}"
- set-property:
name: validate
constant: "{{validate}}"
- marshal:
avro:
library: Jackson
unmarshalType: com.fasterxml.jackson.databind.JsonNode
schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightAvroSchemaResolver"
- remove-property:
name: schema
- remove-property:
name: validate
schemaResolver: "#bean:{{schemaResolver}}"
- set-header:
name: "Content-Type"
constant: "application/avro"
15 changes: 7 additions & 8 deletions kamelets/protobuf-deserialize-action.kamelet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ spec:
title: "Protobuf Deserialize Action"
description: "Deserialize payload to Protobuf"
type: object
required:
- schema
properties:
schema:
title: Schema
Expand All @@ -46,18 +44,19 @@ spec:
- "camel:core"
- "camel:jackson-protobuf"
template:
beans:
- name: schemaResolver
type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver"
property:
- key: schema
value: '{{schema:}}'
from:
uri: kamelet:source
steps:
- set-property:
name: schema
constant: "{{schema}}"
- unmarshal:
protobuf:
library: Jackson
unmarshalType: com.fasterxml.jackson.databind.JsonNode
schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver"
- remove-property:
name: schema
schemaResolver: "#bean:{{schemaResolver}}"
- remove-header:
name: "Content-Type"
15 changes: 7 additions & 8 deletions kamelets/protobuf-serialize-action.kamelet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ spec:
title: "Protobuf Serialize Action"
description: "Serialize payload to Protobuf"
type: object
required:
- schema
properties:
schema:
title: Schema
Expand All @@ -46,19 +44,20 @@ spec:
- "camel:core"
- "camel:jackson-protobuf"
template:
beans:
- name: schemaResolver
type: "#class:org.apache.camel.kamelets.utils.format.converter.protobuf.ProtobufSchemaResolver"
property:
- key: schema
value: '{{schema:}}'
from:
uri: kamelet:source
steps:
- set-property:
name: schema
constant: "{{schema}}"
- marshal:
protobuf:
library: Jackson
unmarshalType: com.fasterxml.jackson.databind.JsonNode
schemaResolver: "#class:org.apache.camel.kamelets.utils.serialization.InflightProtobufSchemaResolver"
- remove-property:
name: schema
schemaResolver: "#bean:{{schemaResolver}}"
- set-header:
name: "Content-Type"
constant: "application/protobuf"
76 changes: 76 additions & 0 deletions kamelets/resolve-pojo-schema-action.kamelet.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

apiVersion: camel.apache.org/v1alpha1
kind: Kamelet
metadata:
name: resolve-pojo-schema-action
annotations:
camel.apache.org/kamelet.support.level: "Stable"
camel.apache.org/catalog.version: "4.0.0-SNAPSHOT"
camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PD94bWwgdmVyc2lvbj0iMS4wIiBlbmNvZGluZz0iVVRGLTgiIHN0YW5kYWxvbmU9Im5vIj8+CjxzdmcKICAgeG1sbnM6ZGM9Imh0dHA6Ly9wdXJsLm9yZy9kYy9lbGVtZW50cy8xLjEvIgogICB4bWxuczpjYz0iaHR0cDovL2NyZWF0aXZlY29tbW9ucy5vcmcvbnMjIgogICB4bWxuczpyZGY9Imh0dHA6Ly93d3cudzMub3JnLzE5OTkvMDIvMjItcmRmLXN5bnRheC1ucyMiCiAgIHhtbG5zOnN2Zz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciCiAgIHhtbG5zPSJodHRwOi8vd3d3LnczLm9yZy8yMDAwL3N2ZyIKICAgeG1sbnM6c29kaXBvZGk9Imh0dHA6Ly9zb2RpcG9kaS5zb3VyY2Vmb3JnZS5uZXQvRFREL3NvZGlwb2RpLTAuZHRkIgogICB4bWxuczppbmtzY2FwZT0iaHR0cDovL3d3dy5pbmtzY2FwZS5vcmcvbmFtZXNwYWNlcy9pbmtzY2FwZSIKICAgdmlld0JveD0iMCAtMjU2IDE3OTIgMTc5MiIKICAgaWQ9InN2ZzMwMjUiCiAgIHZlcnNpb249IjEuMSIKICAgaW5rc2NhcGU6dmVyc2lvbj0iMC40OC4zLjEgcjk4ODYiCiAgIHdpZHRoPSIxMDAlIgogICBoZWlnaHQ9IjEwMCUiCiAgIHNvZGlwb2RpOmRvY25hbWU9ImNvZ19mb250X2F3ZXNvbWUuc3ZnIj4KICA8bWV0YWRhdGEKICAgICBpZD0ibWV0YWRhdGEzMDM1Ij4KICAgIDxyZGY6UkRGPgogICAgICA8Y2M6V29yawogICAgICAgICByZGY6YWJvdXQ9IiI+CiAgICAgICAgPGRjOmZvcm1hdD5pbWFnZS9zdmcreG1sPC9kYzpmb3JtYXQ+CiAgICAgICAgPGRjOnR5cGUKICAgICAgICAgICByZGY6cmVzb3VyY2U9Imh0dHA6Ly9wdXJsLm9yZy9kYy9kY21pdHlwZS9TdGlsbEltYWdlIiAvPgogICAgICA8L2NjOldvcms+CiAgICA8L3JkZjpSREY+CiAgPC9tZXRhZGF0YT4KICA8ZGVmcwogICAgIGlkPSJkZWZzMzAzMyIgLz4KICA8c29kaXBvZGk6bmFtZWR2aWV3CiAgICAgcGFnZWNvbG9yPSIjZmZmZmZmIgogICAgIGJvcmRlcmNvbG9yPSIjNjY2NjY2IgogICAgIGJvcmRlcm9wYWNpdHk9IjEiCiAgICAgb2JqZWN0dG9sZXJhbmNlPSIxMCIKICAgICBncmlkdG9sZXJhbmNlPSIxMCIKICAgICBndWlkZXRvbGVyYW5jZT0iMTAiCiAgICAgaW5rc2NhcGU6cGFnZW9wYWNpdHk9IjAiCiAgICAgaW5rc2NhcGU6cGFnZXNoYWRvdz0iMiIKICAgICBpbmtzY2FwZTp3aW5kb3ctd2lkdGg9IjY0MCIKICAgICBpbmtzY2FwZTp3aW5kb3ctaGVpZ2h0PSI0ODAiCiAgICAgaWQ9Im5hbWVkdmlldzMwMzEiCiAgICAgc2hvd2dyaWQ9ImZhbHNlIgogICAgIGlua3NjYXBlOnpvb209IjAuMTMxNjk2NDMiCiAgICAgaW5rc2NhcGU6Y3g9Ijg5NiIKICAgICBpbmtzY2FwZTpjeT0iODk2IgogICAgIGlua3NjYXBlOndpbmRvdy14PSIwIgogICAgIGlua3NjYXBlOndpbmRvdy15PSIyNSIKICAgICBpbmtzY2FwZTp3aW5kb3ctbWF4aW1pemVkPSIwIgogICAgIGlua3NjYXBlOmN1cnJlbnQtbGF5ZXI9InN2ZzMwMjUiIC8+CiAgPGcKICAgICB0cmFuc2Zvcm09Im1hdHJpeCgxLDAsMCwtMSwxMjEuNDkxNTMsMTI4NS40MjM3KSIKICAgICBpZD0iZzMwMjciPgogICAgPHBhdGgKICAgICAgIGQ9Im0gMTAyNCw2NDAgcSAwLDEwNiAtNzUsMTgxIC03NSw3NSAtMTgxLDc1IC0xMDYsMCAtMTgxLC03NSAtNzUsLTc1IC03NSwtMTgxIDAsLTEwNiA3NSwtMTgxIDc1LC03NSAxODEsLTc1IDEwNiwwIDE4MSw3NSA3NSw3NSA3NSwxODEgeiBtIDUxMiwxMDkgViA1MjcgcSAwLC0xMiAtOCwtMjMgLTgsLTExIC0yMCwtMTMgbCAtMTg1LC0yOCBxIC0xOSwtNTQgLTM5LC05MSAzNSwtNTAgMTA3LC0xMzggMTAsLTEyIDEwLC0yNSAwLC0xMyAtOSwtMjMgLTI3LC0zNyAtOTksLTEwOCAtNzIsLTcxIC05NCwtNzEgLTEyLDAgLTI2LDkgbCAtMTM4LDEwOCBxIC00NCwtMjMgLTkxLC0zOCAtMTYsLTEzNiAtMjksLTE4NiAtNywtMjggLTM2LC0yOCBIIDY1NyBxIC0xNCwwIC0yNC41LDguNSBRIDYyMiwtMTExIDYyMSwtOTggTCA1OTMsODYgcSAtNDksMTYgLTkwLDM3IEwgMzYyLDE2IFEgMzUyLDcgMzM3LDcgMzIzLDcgMzEyLDE4IDE4NiwxMzIgMTQ3LDE4NiBxIC03LDEwIC03LDIzIDAsMTIgOCwyMyAxNSwyMSA1MSw2Ni41IDM2LDQ1LjUgNTQsNzAuNSAtMjcsNTAgLTQxLDk5IEwgMjksNDk1IFEgMTYsNDk3IDgsNTA3LjUgMCw1MTggMCw1MzEgdiAyMjIgcSAwLDEyIDgsMjMgOCwxMSAxOSwxMyBsIDE4NiwyOCBxIDE0LDQ2IDM5LDkyIC00MCw1NyAtMTA3LDEzOCAtMTAsMTIgLTEwLDI0IDAsMTAgOSwyMyAyNiwzNiA5OC41LDEwNy41IDcyLjUsNzEuNSA5NC41LDcxLjUgMTMsMCAyNiwtMTAgbCAxMzgsLTEwNyBxIDQ0LDIzIDkxLDM4IDE2LDEzNiAyOSwxODYgNywyOCAzNiwyOCBoIDIyMiBxIDE0LDAgMjQuNSwtOC41IFEgOTE0LDEzOTEgOTE1LDEzNzggbCAyOCwtMTg0IHEgNDksLTE2IDkwLC0zNyBsIDE0MiwxMDcgcSA5LDkgMjQsOSAxMywwIDI1LC0xMCAxMjksLTExOSAxNjUsLTE3MCA3LC04IDcsLTIyIDAsLTEyIC04LC0yMyAtMTUsLTIxIC01MSwtNjYuNSAtMzYsLTQ1LjUgLTU0LC03MC41IDI2LC01MCA0MSwtOTggbCAxODMsLTI4IHEgMTMsLTIgMjEsLTEyLjUgOCwtMTAuNSA4LC0yMy41IHoiCiAgICAgICBpZD0icGF0aDMwMjkiCiAgICAgICBpbmtzY2FwZTpjb25uZWN0b3ItY3VydmF0dXJlPSIwIgogICAgICAgc3R5bGU9ImZpbGw6Y3VycmVudENvbG9yIiAvPgogIDwvZz4KPC9zdmc+Cg=="
camel.apache.org/provider: "Apache Software Foundation"
camel.apache.org/kamelet.group: "Actions"
camel.apache.org/kamelet.namespace: "Transformation"
labels:
camel.apache.org/kamelet.type: "action"
spec:
definition:
title: "Resolve Schema Action"
description: "Resolves schema from given mime type and payload. Sets the resolved schema, the schema type and its content class as properties for later reference."
type: object
properties:
mimeType:
title: Mime Type
description: The mime type to determine the schema resolver implementation that should perform the operation.
type: string
default: "application/json"
example: "application/json"
schema:
title: Schema
description: Optional schema content (as single-line, using JSON format).
type: string
contentClass:
title: Content Class
description: Type information of the content object. Fully qualified class name.
type: string
example: "org.apache.camel.content.Foo"
targetMimeType:
title: Target Mime Type
description: Additional mime type information used to determine the schema resolver. Usually only used in combination with mime type "application/x-java-object"
type: string
example: "application/json"
dependencies:
- "camel:kamelet"
- "camel:core"
- "camel:jackson"
- "camel:jackson-avro"
- "camel:jackson-protobuf"
template:
beans:
- name: schemaResolver
type: "#class:org.apache.camel.kamelets.utils.format.schema.DelegatingSchemaResolver"
properties:
mimeType: '{{mimeType}}'
schema: '{{schema:}}'
contentClass: '{{contentClass:}}'
targetMimeType: '{{targetMimeType:}}'
from:
uri: "kamelet:source"
steps:
- process:
ref: "{{schemaResolver}}"
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.kamelets.utils.format.converter.standard.BinaryDataType;
import org.apache.camel.kamelets.utils.format.converter.standard.JsonModelDataType;
import org.apache.camel.kamelets.utils.format.converter.standard.StringDataType;
import org.apache.camel.kamelets.utils.format.converter.bytes.ByteArrayDataType;
import org.apache.camel.kamelets.utils.format.converter.text.StringDataType;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverter;
import org.apache.camel.kamelets.utils.format.spi.DataTypeConverterResolver;
import org.apache.camel.kamelets.utils.format.spi.DataTypeLoader;
Expand Down Expand Up @@ -105,9 +104,8 @@ protected void doInit() throws Exception {
if (classpathScan) {
dataTypeLoaders.add(new AnnotationDataTypeLoader());
} else if (useDefaultConverters) {
addDataTypeConverter(new BinaryDataType());
addDataTypeConverter(new ByteArrayDataType());
addDataTypeConverter(new StringDataType());
addDataTypeConverter(new JsonModelDataType());
}

for (DataTypeLoader loader : dataTypeLoaders) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kamelets.utils.format;

import java.util.Objects;

public enum MimeType {
JSON("application/json"),
PROTOBUF("application/protobuf"),
AVRO("application/avro"),
AVRO_BINARY("avro/binary"),
AVRO_STRUCT("avro/x-struct"),
BINARY("application/octet-stream"),
TEXT("text/plain"),
JAVA_OBJECT("application/x-java-object"),
STRUCT("application/x-struct");

private static final MimeType[] VALUES = values();
private final String type;

MimeType(String type) {
this.type = type;
}

public String type() {
return type;
}

public static MimeType of(String type) {
for (MimeType mt : VALUES) {
if (Objects.equals(type, mt.type)) {
return mt;
}
}

throw new IllegalArgumentException("Unsupported type: " + type);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.camel.kamelets.utils.format;

import java.util.Arrays;
import java.util.Objects;

/**
* Supported schema type for Java object serialization/deserialization
*/
public enum SchemaType {
PROTOBUF("protobuf"),
AVRO("avsc"),
JSON("json");

private static final SchemaType[] VALUES = values();

private final String schemaType;

SchemaType(String type) {
this.schemaType = type;
}

public String type() {
return schemaType;
}

public static SchemaType of(String type) {
return Arrays.stream(VALUES)
.filter(s -> Objects.equals(s.schemaType, type))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(String.format("Unsupported schema type '%s'", type)));
}
}
Loading

0 comments on commit a76b405

Please sign in to comment.