Skip to content

Commit

Permalink
Merge pull request #227 from holixon/feature/extension-kafka
Browse files Browse the repository at this point in the history
docs: described the upcaster support in extension-kafka
  • Loading branch information
smcvb committed Dec 14, 2021
2 parents a6a10c7 + 37a8d3e commit 472f04e
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions extensions/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
The Kafka Extension currently only has a Release Candidate. Due to this, minor releases of the extension or Axon Framework may include breaking changes to the APIs.
{% endhint %}

Apache Kafka is a very popular system for publishing and consuming events. Its architecture is fundamentally different from most messaging systems, and combines speed with reliability.
Apache Kafka is a very popular system for publishing and consuming events. Its architecture is fundamentally different from most messaging systems and combines speed with reliability.

Axon provides an extension dedicated to _publishing_ and _receiving_ event messages from Kafka. The Kafka Extension should be regarded as an alternative approach to distributing events, besides \(the default\) Axon Server.

Expand Down Expand Up @@ -83,7 +83,7 @@ public class KafkaEventPublicationConfiguration {
The second infrastructure component to introduce is the `KafkaPublisher`, which has a hard requirement on the `ProducerFactory`. Additionally, this would be the place to define the Kafka topic upon which Axon event messages will be published. Note that the `KafkaPublisher` needs to be `shutDown` properly, to ensure all `Producer` instances are properly closed.

```java
public class KafkaEventPublicationConfiguration {
public class KafkaEventPublicationConfiguration {
// ...

public KafkaPublisher<String, byte[]> kafkaPublisher(String topic,
Expand Down Expand Up @@ -121,7 +121,7 @@ public class KafkaEventPublicationConfiguration {
clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class)
)
.registerSubscribingEventProcessor(processingGroup);
// Replace `registerSubscribingEventProcessor` for `registerTrackingEventProcessor` to use a tracking processor
// Replace `registerSubscribingEventProcessor` for `registerTrackingEventProcessor` to use a tracking processor
}
// ...
}
Expand Down Expand Up @@ -284,17 +284,27 @@ Albeit the default, this implementation allows for some customization, such as h

The `SequencingPolicy` can be adjusted to change the behaviour of the record key being used. The default sequencing policy is the `SequentialPerAggregatePolicy`, which leads to the aggregate identifier of an event being the key of a `ProducerRecord` and `ConsumerRecord`.

The format of an event message defines an API between the producer and the consumer of the message.
This API may change over time, leading to incompatibility between the event class' structure on the receiving side and the event structure of a message containing the old format.
Axon addresses the topic of [Event Versioning](../axon-framework/events/event-versioning.md) by introducing Event Upcasters.
The `DefaultKafkaMessageConverter` will use a provided `EventUpcasterChain` and run the upcasting process on the `MetaData` and `Payload` of individual messages converted from `ConsumerRecord` before those are passed to the `Serializer` and converted into `Event` instances.

Note that the `KafkaMessageConverter` feeds the upcasters with messages one-by-one, limiting it to one-to-one or one-to-many upcasting <b>only</b>.
Upcasters performing a many-to-one or many-to-many operation thus won't be able to operate inside the extension (yet).

Lastly, the `Serializer` used by the converter can be adjusted. See the [Serializer](../axon-framework/events/event-serialization.md) section for more details on this.

```java
public class KafkaMessageConversationConfiguration {
// ...
public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(Serializer serializer,
SequencingPolicy<? super EventMessage<?>> sequencingPolicy,
BiFunction<String, Object, RecordHeader> headerValueMapper) {
BiFunction<String, Object, RecordHeader> headerValueMapper,
EventUpcasterChain upcasterChain) {
return DefaultKafkaMessageConverter.builder()
.serializer(serializer) // Hard requirement
.sequencingPolicy(sequencingPolicy) // Defaults to a "SequentialPerAggregatePolicy"
.serializer(serializer) // Hard requirement
.sequencingPolicy(sequencingPolicy) // Defaults to a "SequentialPerAggregatePolicy"
.upcasterChain(upcasterChain) // Defaults to empty upcaster chain
.headerValueMapper(headerValueMapper) // Defaults to "HeaderUtils#byteMapper()"
.build();
}
Expand Down Expand Up @@ -386,4 +396,3 @@ axon:
> The auto configured `StreamableKafkaMessageSource` can be toggled off by setting the `axon.kafka.consumer.event-processing-mode` to `subscribing`.
>
> Note that this **does not** create a `SubscribableKafkaMessageSource` for you out of the box. To set up a subscribable message, we recommend to read [this](kafka.md#consuming-events-with-a-subscribable-message-source) section.

0 comments on commit 472f04e

Please sign in to comment.