Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc updates for reactive messaging mp #4730

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/mp/reactivemessaging/aq.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ include::{rootdir}/includes/dependencies.adoc[]
== Reactive Oracle Advanced Queueing Connector

Connecting streams to Oracle AQ with Reactive Messaging couldn't be easier.
This connector extends Helidon's JMS connector with Oracle AQ's specific API.
This connector extends Helidon's JMS connector with Oracle's AQ-specific API.

=== Config
=== Configuration

Connector name: `helidon-aq`

Expand Down Expand Up @@ -72,7 +72,7 @@ will not be added to the durable subscription. Default value: `false`
they share same JMS session and same JDBC connection as well.
|===

=== Configured JMS factory
=== Configured JMS Factory

The simplest possible usage is leaving construction of AQjmsConnectionFactory to the connector.

Expand Down Expand Up @@ -135,7 +135,7 @@ mp:

=== Injected JMS factory

In case you need more advanced setup, connector can work with injected AQjmsConnectionFactory
If you need more advanced configurations, connector can work with injected `AQjmsConnectionFactory`:

[source,java]
.Inject:
Expand Down
98 changes: 51 additions & 47 deletions docs/mp/reactivemessaging/introduction.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

///////////////////////////////////////////////////////////////////////////////

= Reactive Messaging
= Reactive Messaging MP
:spec-name: MicroProfile Reactive Messaging
:description: {spec-name} support in Helidon MP
:keywords: helidon, mp, microprofile, messaging
Expand All @@ -25,31 +25,25 @@
:microprofile-bundle: false
:rootdir: {docdir}/../..

include::{rootdir}/includes/mp.adoc[]

== Contents

- <<Overview, Overview>>
- <<Maven Coordinates, Maven Coordinates>>
- <<Channel, Channel>>
- <<Consuming method, Consuming method>>
- <<Injected publisher, Injected publisher>>
- <<Producing method, Producing method>>
- <<Emitter, Emitter>>
- <<Processing method, Processing method>>
- <<Connector, Connector>>
- <<Message, Message>>
- <<Acknowledgement, Acknowledgement>>
- <<Health check, Health check>>
- <<Upgrading to Messaging 3.0, Upgrading to 3.0>>
- <<Usage, Usage>>
- <<Configuration, Configuration>>
- <<Reference, Reference>>
- <<Additional Information, Additional Information>>

== Overview
There is more to expect from the modern messaging than we got from old-fashioned Message Driven Beans,
blocking is not always favorable way to apply backpressure to the message source, actually is not the only way anymore.
Reactive messaging uses reactive streams as message channels, users can construct very effective pipelines for working
with the messages, or use messaging in the old-fashioned way. Similarly to the MDB's
Reactive messaging offers a new way of processing messages that is different from the older method of using message-driven beans. One significant difference is that blocking is no longer the only way to apply backpressure to the message source.

Reactive messaging uses reactive streams as message channels so you can construct very effective pipelines for working
with the messages or, if you prefer, you can continue to use older messaging methods. Like the message-driven beans,
link:{microprofile-reactive-messaging-spec-url}[MicroProfile Reactive Messaging]
uses CDI beans to produce, consume or process messages over Reactive Streams.
Such messaging bean is expected to be either in `ApplicationScoped` or `Dependent` scope.
These essaging beans are expected to be either `ApplicationScoped` or `Dependent` scoped.
Messages are managed by methods annotated by `@Incoming` and `@Outgoing`
and the invocation is always driven by message core - either at assembly time, or for every message coming from the stream.

Expand All @@ -74,23 +68,30 @@ To include health checks for Messaging add the following dependency:
</dependency>
----

== Channel
== Usage

Reactive messaging uses named channels to connect always one source(upstream) with one consumer(downstream).
Each channel needs to have both ends connected otherwise container won't successfully start.
- <<Channels, Channels>>
- <<Emitter, Emitter>>
- <<Connector, Connector>>


=== Channels

Reactive messaging uses named channels to connect one source (upstream) with one consumer (downstream).
Each channel needs to have both ends connected otherwise the container cannot successfully start.

image::msg/channel.svg[Messaging Channel]

Channels can be connected either to <<Emitter, emitter>> (1), <<Producing method, producing method>> (2) or <<Connector, connector>> (3) on the upstream side. And <<Injected publisher, injected publisher>> (4), <<Consuming method, consuming method>> (5) or <<Connector, connector>> (6)
Channels can be connected either to <<Emitter, emitter>> (1), <<Producing Method, producing method>> (2) or <<Connector, connector>> (3) on the upstream side. And <<Injected Publisher, injected publisher>> (4), <<Consuming Method, consuming method>> (5) or <<Connector, connector>> (6)
on the downstream.

=== Consuming method
==== Consuming Method
Consuming methods can be connected to the channel's downstream to consume the message coming through the channel.
Incoming annotation has one required attribute `value` that defines the channel name.
The incoming annotation has one required attribute `value` that defines the channel name.

Consuming method can function in two ways:

* consume every message coming from the stream connected to the <<Channel, channel>> - invoked per each message
* consume every message coming from the stream connected to the <<Channels, channels>> - invoked per each message
* prepare reactive stream's subscriber and connect it to the channel - invoked only once during the channel construction

[source,java]
Expand All @@ -113,9 +114,9 @@ public Subscriber<String> printMessage() {
}
----

=== Injected publisher
==== Injected Publisher
Directly injected publisher can be connected as a channel downstream,
we can consume the data from the channel by subscribing to it.
you can consume the data from the channel by subscribing to it.

Helidon can inject following types of publishers:

Expand All @@ -139,12 +140,12 @@ public MyBean(@Channel("example-channel-1") Multi<String> multiChannel) {
}
----

=== Producing method
==== Producing Method

The annotation has one required attribute `value` that defines the
link:{microprofile-reactive-messaging-spec-url}#_channel[channel] name.

Such annotated <<terms,messaging method>> can function in two ways:
The annotated <<terms,messaging method>> can function in two ways:

* produce exactly one message to the stream connected to the link:{microprofile-reactive-messaging-spec-url}#_channel[channel]
* prepare reactive stream's publisher and connect it to the link:{microprofile-reactive-messaging-spec-url}#_channel[channel]
Expand All @@ -170,7 +171,7 @@ public Publisher<String> printMessage() {
WARNING: Messaging methods are not meant to be invoked directly!

=== Emitter
For sending messages from imperative code we can inject special channel source called emitter.
To send messages from imperative code, you can inject a special channel source called an emitter.
Emitter can serve only as an upstream, source of the messages, for messaging channel.

[source,java]
Expand All @@ -188,9 +189,8 @@ public Response sendMessage(final String payload) {
}
----

Emitters as a source of messages for reactive channels needs to address possible backpressure from the downstream side
of the channel. In case there is not enough demand from the downstream we can configure best fitting strategy for our use-case
with annotation `@OnOverflow`.
Emitters, as a source of messages for reactive channels, need to address possible backpressure from the downstream side
of the channel. In case there is not enough demand from the downstream, you can configure a buffer size strategy using the `@OnOverflow` annotation. Additional overflow strategies are described below.

.Overflow strategies
|===
Expand All @@ -204,7 +204,7 @@ with annotation `@OnOverflow`.
|NONE |Messages are sent to downstream even if there is no demand. Backpressure is effectively ignored.
|===

=== Processing method
==== Processing Method

Such link:{microprofile-reactive-messaging-spec-url}#_method_consuming_and_producing[methods]
acts as processors, consuming messages from one channel and producing to another.
Expand Down Expand Up @@ -253,7 +253,7 @@ public String processMessage(String msg) {
----

=== Connector
Messaging connector is an application scoped bean which implements one or both of following interfaces:
Messaging connector is an application-scoped bean that implements one or both of following interfaces:

* `IncomingConnectorFactory` - connector can create an upstream publisher to produce messages to a channel
* `OutgoingConnectorFactory` - connector can create a downstream subscriber to consume messages from a channel
Expand All @@ -280,7 +280,9 @@ public class ExampleConnector implements IncomingConnectorFactory, OutgoingConne
}
----

Channel needs to be instructed to use connector as its upstream or downstream by configuration.
== Configuration

The channel must be configured to use connector as its upstream or downstream.

[source,yaml]
.Example of channel to connector mapping config:
Expand Down Expand Up @@ -317,8 +319,8 @@ public void consume(String value) {
> Consuming: bar
----

When connector constructs publisher or subscriber for given channel,
it can access general connector configuration and channel specific properties merged together with
When the connector constructs a publisher or subscriber for a given channel,
it can access general connector configuration and channel-specific properties merged together with
special synthetic property `channel-name`.

image::msg/connector-config.svg[Connector config]
Expand Down Expand Up @@ -397,10 +399,9 @@ public void consumeImplicitlyUnwrappedMessage(String value) {
----

=== Acknowledgement
Message carries a callback for reception acknowledgement(ack) and negative acknowledgement(nack),
acknowledgement in messaging methods is possible manually by
Messages carry a callback for reception acknowledgement (ack) and negative acknowledgement (nack). An acknowledgement in messaging methods is possible manually by
`org.eclipse.microprofile.reactive.messaging.Message#ack` or automatically according explicit
or implicit acknowledgement strategy by messaging core. Explicit strategy configuration is possible
or implicit acknowledgement strategy by the messaging core. Explicit strategy configuration is possible
with `@Acknowledgment` annotation which has one required attribute `value` that expects the strategy type from enum
`org.eclipse.microprofile.reactive.messaging.Acknowledgment.Strategy`. More information about supported signatures
and implicit automatic acknowledgement can be found in specification
Expand Down Expand Up @@ -524,15 +525,18 @@ every messaging channel to have its own probe.
}
----

WARNING: Due to the nack support are exceptions thrown in messaging methods NOT translated to error and cancel signals implicitly anymore

== Upgrading to Messaging 3.0
.Non-backward compatible changes:
* Exceptions thrown in messaging methods are not propagated as error or cancel signals to the stream(use `mp.messaging.helidon.propagate-errors=true` for backward compatible mode) - errors are propagated only to the upstream by `nack` functionality.
* Default acknowledgement strategies changed for selected signatures(all with Message as a parameter or return type) - See the specification issue link:{https://github.com/eclipse/microprofile-reactive-messaging/pull/97}[#97]
Caution: Due to the nack support are exceptions thrown in messaging methods NOT translated to error and cancel signals implicitly anymore

== Reference

* link:https://helidon.io/docs/v2/apidocs/io.helidon.microprofile.messaging/module-summary.html[Helidon MicroProfile Reactive Messaging]
* link:{microprofile-reactive-messaging-spec-url}[MicroProfile Reactive Messaging Specification]
* link:https://github.com/eclipse/microprofile-reactive-messaging[MicroProfile Reactive Messaging on GitHub]
* link:https://github.com/eclipse/microprofile-reactive-messaging[MicroProfile Reactive Messaging on GitHub]

== Additional Information

=== Upgrading to Messaging 3.0
.Non-backward compatible changes:
* Exceptions thrown in messaging methods are not propagated as error or cancel signals to the stream(use `mp.messaging.helidon.propagate-errors=true` for backward compatible mode) - errors are propagated only to the upstream by `nack` functionality.
* Default acknowledgement strategies changed for selected signatures(all with Message as a parameter or return type) - See the specification issue link:{https://github.com/eclipse/microprofile-reactive-messaging/pull/97}[#97]

14 changes: 9 additions & 5 deletions docs/mp/reactivemessaging/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
:microprofile-bundle: false
:rootdir: {docdir}/../..

include::{rootdir}/includes/mp.adoc[]

== Contents

- <<Overview, Overview>>
- <<Maven Coordinates, Maven Coordinates>>
- <<NACK Strategy, NACK Strategy>>
- <<Examples, Examples>>

include::{rootdir}/includes/mp.adoc[]
include::{rootdir}/includes/dependencies.adoc[]

[source,xml]
Expand All @@ -42,7 +45,7 @@ include::{rootdir}/includes/dependencies.adoc[]
----

== Overview
Connecting streams to Kafka with Reactive Messaging couldn't be easier.
Connecting streams to Kafka with Reactive Messaging is easy to do.
There is a standard Kafka client behind the scenes, all the link:{kafka-client-base-url}#producerconfigs[producer] and link:{kafka-client-base-url}#consumerconfigs[consumer] configs can
be propagated through messaging config.

Expand Down Expand Up @@ -114,7 +117,7 @@ topic is present on the same Kafka cluster.
Serializers are derived from deserializers used for consumption
`org.apache.kafka.common.serialization.StringDeserializer` >
`org.apache.kafka.common.serialization.StringSerializer`.
In this ideal case only the name of error topic is needed.
Note that the name of the error topic is needed only in this case.

[source,yaml]
.Example of derived DLQ config:
Expand All @@ -125,8 +128,7 @@ mp.messaging:
nack-dlq: dql_topic_name
----

In case custom connection is needed, all the usual producer configuration is possible
under the `nack-dlq` key.
If a custom connection is needed, then use the 'nack-dlq' key for all of the producer configuration.

[source,yaml]
.Example of custom DLQ config:
Expand Down Expand Up @@ -156,6 +158,8 @@ mp.messaging:
nack-log-only: true
----

== Examples

Don't forget to check out the examples with pre-configured Kafka docker image, for easy testing:

* {helidon-github-tree-url}/examples/messaging
21 changes: 13 additions & 8 deletions docs/mp/reactivemessaging/mock.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
:microprofile-bundle: false
:rootdir: {docdir}/../..

include::{rootdir}/includes/mp.adoc[]

== Contents

- <<Overview, Overview>>
- <<Emitting data, Emitting data>>
- <<Asserting data, Asserting data>>
- <<Usage, Usage>>
- <<Configuration, Configuration>>
- <<Helidon test with Mock Connector, Helidon Test>>
- <<Helidon Test with Mock Connector, Helidon Test>>

== Overview
Mock connector is a simple application scoped bean that can be used for emitting to a channel
Expand All @@ -58,7 +59,11 @@ For injecting Mock Connector use `@TestConnector` qualifier:
MockConnector mockConnector;
----

=== Emitting data
== Usage
- <<Emitting Data, Emitting Data>>
- <<Asserting Data, Asserting Data>>

=== Emitting Data

.Emitting String values `a`, `b`, `c`
[source,java]
Expand All @@ -69,7 +74,7 @@ mockConnector
----
<1> Get incoming channel of given name and payload type

=== Asserting data
=== Asserting Data

.Awaiting and asserting payloads with custom mapper
[source,java]
Expand All @@ -88,11 +93,11 @@ mockConnector
|mock-data-type |java.lang.String| Type of the emitted initial data to be emitted
|===

== Helidon test with Mock Connector
== Helidon Test with Mock Connector
Mock connector works great with built-in Helidon test support for
link:/{rootdir}/testing/testing.adoc[JUnit 5] or link:/{rootdir}/testing/testing-ng.adoc[Test NG].
link:/{rootdir}/testing/testing.adoc[JUnit 5] or link:/{rootdir}/testing/testing-ng.adoc[TestNG].

As Helidon test support makes a bean out of your test, you can inject MockConnector right in to it.
As Helidon test support makes a bean out of your test, you can inject MockConnector directly into it.

[source,java]
----
Expand Down