Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connector cross channel config properties #33

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 27 additions & 9 deletions spec/src/main/asciidoc/architecture.asciidoc
Expand Up @@ -162,7 +162,7 @@ They are responsible for mapping a specific _channel_ to remote sink or source o
This mapping is configured in the application configuration.
Note that an implementation may provide various ways to configure the mapping, but support for MicroProfile Config as a configuration source is mandatory.

Connectors implementation are associated with a name corresponding to a messaging transport, such as Apache Kafka, Amazon Kinesis, RabbitMQ or Apache ActiveMQ.
Connector implementations are associated with a name corresponding to a messaging transport, such as Apache Kafka, Amazon Kinesis, RabbitMQ or Apache ActiveMQ.
For instance, an hypothetic Kafka connector could be associated with the following name: `acme.kafka`.
This name is indicated using a qualifier on the connector implementation.

Expand All @@ -179,7 +179,7 @@ The configuration format is detailed later in this document.
The Reactive Messaging implementation is responsible for finding the connector implementation associated with the given name in the user configuration.
If the connector cannot be found, the deployment of the application must be failed.

The Reactive Messaging specification provides a SPI to implement connectors.
The Reactive Messaging specification provides an SPI to implement connectors.

=== Message stream operation

Expand Down Expand Up @@ -1039,39 +1039,57 @@ The implementation processes the global configuration and determines:
The builder methods defined in the `IncomingConnectorFactory` and `OutgoingConnectorFactory` receive a `org.eclipse.microprofile.config.Config` as parameter.
The `Config` object contains key-value pairs to configure the connector.
The configuration is specific to the connector.
For example, a Kafka connector expects a _bootstrap servers_ entry as well as a _topic_ entry.
For example, a Kafka connector expects a _bootstrap.servers_ entry as well as a _topic_ entry.

The Reactive Messaging implementation reads the global application configuration and must support the following format:

* `mp.messaging.incoming.[channel-name].[attribute]=[value]`
* `mp.messaging.outgoing.[channel-name].[attribute]=[value]`
* `mp.messaging.connector.[connector-name].[attribute]=[value]`
hutchig marked this conversation as resolved.
Show resolved Hide resolved

For each extracted `channel-name`:

1. The `connector` attribute value is read, and the connector implementation identified. If no connector matches, the deployment must be failed with a `DeploymentException`;
2. The other attributes are processed to generate a `Config` object containing only `attribute=value` entries.
1. The `connector` attribute of the channel is read, and the connector implementation identified. If no loadable connector implementation matches, the deployment must be failed with a `DeploymentException`;
2. Relevant attributes are those matching either the `channel-name` or the resolved `connector-name`.
3. Relevant attributes are processed to generate a `Config` object containing only `attribute=value` entries.
If is valid to have an attribute specified at a connector level and also for a specific channel.
If an attribute appears for both a channel and its relevant connector, the channel specific value will be used.
In the example below, the `acme.kafka` default value for `bootstrap.servers` is overridden for `my-channel` to be `9096`.

For example, the following snippet gives an example for a hypothetical Kafka connector:
The following snippet gives an example for a hypothetical Kafka connector:

[source]
----
mp.messaging.incoming.my-channel.connector=acme.kafka
mp.messaging.incoming.my-channel.bootstrap-servers=localhost:9092
mp.messaging.incoming.my-channel.bootstrap.servers=localhost:9096
mp.messaging.incoming.my-channel.topic=my-topic
mp.messaging.connector.acme.kafka.bootstrap.servers=localhost:9092
----

For properties that have a `mp.messaging.incoming.` or `mp.messaging.outgoing` prefix,
this prefix is stripped off the property name and the remainder of the property name
up to the first occurrence of `.` is treated as the channel name. Channel names may not
include the `.` character.

For properties that have a `mp.messaging.connector.` prefix, this prefix is stripped off the property name and
the longest remaining prefix that matches any configured `connector`
is treated as a connector name.
The remainder of the property name, minus the expected initial `.` separator, is taken
as the name of an attribute for this connector. For example `bootstrap.servers` appears as a
default attribute for all channels that use the `acme.kafka` connector.

The Reactive Messaging implementation:

1. Reads the configuration
2. Identifies that a `my-channel` source needs to be managed
3. Searches for the `connector` attribute and finds `acme.kafka`
4. Looks for a bean implementing the `IncomingConnectorFactory` interface qualified with `@Connector("acme.kafka")`.
If the configuration had contained a `mp.messaging.outgoing.my-channel...` entry, a bean implementing the `OutgoingConnectorFactory` interface would have been searched.
If the configuration had contained a `mp.messaging.outgoing.my-channel...` entry, a bean implementing the `OutgoingConnectorFactory` interface would have been searched for.
5. Creates a new `Config` object with just the relevant `key=value` pairs:
+
[source]
----
bootstrap.servers=localhost:9092
bootstrap.servers=localhost:9096
topic=my-topic
----
+
Expand Down