Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: described the upcaster support in extension-kafka #227

Merged
merged 3 commits into from
Dec 14, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 12 additions & 7 deletions extensions/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
The Kafka Extension currently only has a Release Candidate. Due to this, minor releases of the extension or Axon Framework may include breaking changes to the APIs.
{% endhint %}

Apache Kafka is a very popular system for publishing and consuming events. Its architecture is fundamentally different from most messaging systems, and combines speed with reliability.
Apache Kafka is a very popular system for publishing and consuming events. Its architecture is fundamentally different from most messaging systems and combines speed with reliability.

Axon provides an extension dedicated to _publishing_ and _receiving_ event messages from Kafka. The Kafka Extension should be regarded as an alternative approach to distributing events, besides \(the default\) Axon Server.

Expand Down Expand Up @@ -83,7 +83,7 @@ public class KafkaEventPublicationConfiguration {
The second infrastructure component to introduce is the `KafkaPublisher`, which has a hard requirement on the `ProducerFactory`. Additionally, this would be the place to define the Kafka topic upon which Axon event messages will be published. Note that the `KafkaPublisher` needs to be `shutDown` properly, to ensure all `Producer` instances are properly closed.

```java
public class KafkaEventPublicationConfiguration {
public class KafkaEventPublicationConfiguration {
// ...

public KafkaPublisher<String, byte[]> kafkaPublisher(String topic,
Expand Down Expand Up @@ -121,7 +121,7 @@ public class KafkaEventPublicationConfiguration {
clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class)
)
.registerSubscribingEventProcessor(processingGroup);
// Replace `registerSubscribingEventProcessor` for `registerTrackingEventProcessor` to use a tracking processor
// Replace `registerSubscribingEventProcessor` for `registerTrackingEventProcessor` to use a tracking processor
}
// ...
}
Expand Down Expand Up @@ -284,17 +284,23 @@ Albeit the default, this implementation allows for some customization, such as h

The `SequencingPolicy` can be adjusted to change the behaviour of the record key being used. The default sequencing policy is the `SequentialPerAggregatePolicy`, which leads to the aggregate identifier of an event being the key of a `ProducerRecord` and `ConsumerRecord`.

The format of an event message defines an API between the producer and the consumer of the message. This API may change over time leading to incompatibility between the structure of the event class on the receiver side to the structure of the message, if old format is transmitted. Axon addresses the topic of [Event Versioning](../axon-framework/events/event-versioning.md) by introducing Event Upcasters. The `DefaultKafkaMessageConverter` will use provided `EventUpcasterChain` and run the upcasting process on the `MetaData` and `Payload` of individual messages converted from `ConsumerRecord` before those are passed to the `Serializer` and are converted to `Event` instances.
zambrovski marked this conversation as resolved.
Show resolved Hide resolved

Note that the upcasters are feed with messages one-by-one, which limits the upcasters to one-to-one or one-to-many only. If your upcaster are implemented in many-to-one manner, they won't be able to operate inside the extension yet.
zambrovski marked this conversation as resolved.
Show resolved Hide resolved

Lastly, the `Serializer` used by the converter can be adjusted. See the [Serializer](../axon-framework/events/event-serialization.md) section for more details on this.

```java
public class KafkaMessageConversationConfiguration {
// ...
public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(Serializer serializer,
SequencingPolicy<? super EventMessage<?>> sequencingPolicy,
BiFunction<String, Object, RecordHeader> headerValueMapper) {
BiFunction<String, Object, RecordHeader> headerValueMapper,
EventUpcasterChain upcasterChain) {
return DefaultKafkaMessageConverter.builder()
.serializer(serializer) // Hard requirement
.sequencingPolicy(sequencingPolicy) // Defaults to a "SequentialPerAggregatePolicy"
.serializer(serializer) // Hard requirement
.sequencingPolicy(sequencingPolicy) // Defaults to a "SequentialPerAggregatePolicy"
.upcasterChain(upcasterChain) // Defaults to empty upcaster chain
.headerValueMapper(headerValueMapper) // Defaults to "HeaderUtils#byteMapper()"
.build();
}
Expand Down Expand Up @@ -386,4 +392,3 @@ axon:
> The auto configured `StreamableKafkaMessageSource` can be toggled off by setting the `axon.kafka.consumer.event-processing-mode` to `subscribing`.
>
> Note that this **does not** create a `SubscribableKafkaMessageSource` for you out of the box. To set up a subscribable message, we recommend to read [this](kafka.md#consuming-events-with-a-subscribable-message-source) section.