Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ public class YamlRoutesBuilderLoader extends YamlRoutesBuilderLoaderSupport {
private static final String BINDING_VERSION = "camel.apache.org/v1alpha1";
private static final String PIPE_VERSION = "camel.apache.org/v1";
private static final String STRIMZI_VERSION = "kafka.strimzi.io/v1";
private static final String KNATIVE_VERSION = "messaging.knative.dev/v1";
private static final String KNATIVE_MESSAGING_VERSION = "messaging.knative.dev/v1";
private static final String KNATIVE_EVENTING_VERSION = "eventing.knative.dev/v1";
private static final String KNATIVE_EVENT_TYPE = "org.apache.camel.event";

private final Map<String, Boolean> preparseDone = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -875,11 +877,17 @@ private String extractCamelEndpointUri(MappingNode node) {
boolean strimzi
= !kamelet && mn != null && anyTupleMatches(mn.getValue(), "apiVersion", v -> v.startsWith(STRIMZI_VERSION))
&& anyTupleMatches(mn.getValue(), "kind", "KafkaTopic");
boolean knative
boolean knativeBroker
= !kamelet && mn != null
&& anyTupleMatches(mn.getValue(), "apiVersion", v -> v.startsWith(KNATIVE_EVENTING_VERSION))
&& anyTupleMatches(mn.getValue(), "kind", "Broker");
boolean knativeChannel
= !kamelet && !strimzi && mn != null
&& anyTupleMatches(mn.getValue(), "apiVersion", v -> v.startsWith(KNATIVE_VERSION));
&& anyTupleMatches(mn.getValue(), "apiVersion", v -> v.startsWith(KNATIVE_MESSAGING_VERSION));
String uri;
if (kamelet || strimzi || knative) {
if (knativeBroker) {
uri = KNATIVE_EVENT_TYPE;
} else if (kamelet || strimzi || knativeChannel) {
uri = extractTupleValue(mn.getValue(), "name");
} else {
uri = extractTupleValue(node.getValue(), "uri");
Expand All @@ -888,6 +896,12 @@ private String extractCamelEndpointUri(MappingNode node) {
// properties
MappingNode prop = asMappingNode(nodeAt(node, "/properties"));
Map<String, Object> params = asMap(prop);

if (knativeBroker && params != null && params.containsKey("type")) {
// Use explicit event type from properties - remove setting from params and set as uri
uri = params.remove("type").toString();
}

if (params != null && !params.isEmpty()) {
String query = URISupport.createQueryString(params);
uri = uri + "?" + query;
Expand All @@ -897,7 +911,14 @@ private String extractCamelEndpointUri(MappingNode node) {
return "kamelet:" + uri;
} else if (strimzi) {
return "kafka:" + uri;
} else if (knative) {
} else if (knativeBroker) {
if (uri.contains("?")) {
uri += "&kind=Broker&name=" + extractTupleValue(mn.getValue(), "name");
} else {
uri += "?kind=Broker&name=" + extractTupleValue(mn.getValue(), "name");
}
return "knative:event/" + uri;
} else if (knativeChannel) {
return "knative:channel/" + uri;
} else {
return uri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ class PipeLoaderTest extends YamlTestSupport {
}
}

def "Pipe from kamelet to knative"() {
def "Pipe from kamelet to knative channel"() {
when:

// stub knative for testing as it requires to setup connection to a real knative broker
Expand Down Expand Up @@ -555,6 +555,127 @@ class PipeLoaderTest extends YamlTestSupport {
}
}

def "Pipe from knative channel to kamelet"() {
when:

// stub knative for testing as it requires to setup connection to a real knative broker
context.removeComponent("knative")
context.addComponent("knative", context.getComponent("stub"))

loadBindings('''
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: knative-event-source
spec:
source:
ref:
kind: InMemoryChannel
apiVersion: messaging.knative.dev/v1
name: my-messages
sink:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: log-sink
properties:
showHeaders: true
''')
then:
context.routeDefinitions.size() == 2

with (context.routeDefinitions[0]) {
routeId == 'knative-event-source'
input.endpointUri == 'knative:channel/my-messages'
outputs.size() == 1
with (outputs[0], ToDefinition) {
endpointUri == 'kamelet:log-sink?showHeaders=true'
}
}
}

def "Pipe from kamelet to knative broker"() {
when:

// stub knative for testing as it requires to setup connection to a real knative broker
context.removeComponent("knative")
context.addComponent("knative", context.getComponent("stub"))

loadBindings('''
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: timer-event-source
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: timer-source
properties:
message: "Hello world!"
sink:
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: foo-broker
properties:
type: org.apache.camel.event.messages
''')
then:
context.routeDefinitions.size() == 2

with (context.routeDefinitions[0]) {
routeId == 'timer-event-source'
input.endpointUri == 'kamelet:timer-source?message=Hello+world%21'
outputs.size() == 1
with (outputs[0], ToDefinition) {
endpointUri == 'knative:event/org.apache.camel.event.messages?kind=Broker&name=foo-broker'
}
}
}

def "Pipe from knative broker to kamelet"() {
when:

// stub knative for testing as it requires to setup connection to a real knative broker
context.removeComponent("knative")
context.addComponent("knative", context.getComponent("stub"))

loadBindings('''
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: knative-event-source
spec:
source:
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: foo-broker
properties:
type: org.apache.camel.event.messages
sink:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: log-sink
properties:
showHeaders: true
''')
then:
context.routeDefinitions.size() == 2

with (context.routeDefinitions[0]) {
routeId == 'knative-event-source'
input.endpointUri == 'knative:event/org.apache.camel.event.messages?kind=Broker&name=foo-broker'
outputs.size() == 1
with (outputs[0], ToDefinition) {
endpointUri == 'kamelet:log-sink?showHeaders=true'
}
}
}

def "kamelet start route"() {
when:
loadBindings('''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package org.apache.camel.dsl.yaml.support

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.networknt.schema.JsonSchemaFactory;
import com.networknt.schema.JsonSchemaFactory
import com.networknt.schema.SchemaValidatorsConfig
import com.networknt.schema.SpecVersionDetector
import groovy.util.logging.Slf4j
import org.apache.camel.CamelContext
Expand All @@ -40,9 +41,14 @@ import java.nio.charset.StandardCharsets
@Slf4j
class YamlTestSupport extends Specification implements HasCamelContext {
static def MAPPER = new ObjectMapper(new YAMLFactory())
static def SCHEMA_NODE = MAPPER.readTree(ResourceHelper.getResourceAsStream('/schema/camelYamlDsl.json'));
static def FACTORY = JsonSchemaFactory.getInstance(SpecVersionDetector.detect(SCHEMA_NODE));
static def SCHEMA = FACTORY.getSchema(SCHEMA_NODE);
static def SCHEMA_NODE = MAPPER.readTree(ResourceHelper.getResourceAsStream('/schema/camelYamlDsl.json'))
static def FACTORY = JsonSchemaFactory.getInstance(SpecVersionDetector.detect(SCHEMA_NODE))
static def SCHEMA_VALIDATORS_CONFIG = {
SchemaValidatorsConfig config = new SchemaValidatorsConfig()
config.setLocale(Locale.ENGLISH)
return config
}()
static def SCHEMA = FACTORY.getSchema(SCHEMA_NODE, SCHEMA_VALIDATORS_CONFIG)

@AutoCleanup
def context = new DefaultCamelContext()
Expand Down