From d80be3beed1085af30783bedc52829b660c3262c Mon Sep 17 00:00:00 2001 From: Simon Zambrovski Date: Tue, 12 Oct 2021 12:06:56 +0200 Subject: [PATCH] feature: implement support for event upcasters, fix #193 --- .../autoconfig/KafkaAutoConfiguration.java | 12 +- .../DefaultKafkaMessageConverter.java | 104 ++++++++++++++---- .../DefaultKafkaMessageConverterTest.java | 55 +++++++++ 3 files changed, 145 insertions(+), 26 deletions(-) diff --git a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java index d2fe17e6..9611382b 100644 --- a/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java +++ b/kafka-spring-boot-autoconfigure/src/main/java/org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.java @@ -35,6 +35,7 @@ import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher; import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory; import org.axonframework.serialization.Serializer; +import org.axonframework.serialization.upcasting.event.EventUpcasterChain; import org.axonframework.spring.config.AxonConfiguration; import org.axonframework.springboot.autoconfig.AxonAutoConfiguration; import org.axonframework.springboot.autoconfig.InfraConfiguration; @@ -84,9 +85,16 @@ public KafkaAutoConfiguration(KafkaProperties properties) { @Bean @ConditionalOnMissingBean public KafkaMessageConverter kafkaMessageConverter( - @Qualifier("eventSerializer") Serializer eventSerializer + @Qualifier("eventSerializer") Serializer eventSerializer, + EventUpcasterChain eventUpcasterChain ) { - return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).build(); + return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).upcasterChain(eventUpcasterChain).build(); + } + + @Bean + @ConditionalOnMissingBean + public EventUpcasterChain emptyUpcasterChain() { + return new EventUpcasterChain(); } @Bean("axonKafkaProducerFactory") diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java index 16ff2d52..9db8d641 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverter.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.header.internals.RecordHeader; import org.axonframework.common.AxonConfigurationException; import org.axonframework.eventhandling.EventMessage; +import org.axonframework.eventhandling.GenericDomainEventEntry; import org.axonframework.eventhandling.GenericDomainEventMessage; import org.axonframework.eventhandling.GenericEventMessage; import org.axonframework.eventhandling.async.SequencingPolicy; @@ -32,6 +33,8 @@ import org.axonframework.serialization.SerializedObject; import org.axonframework.serialization.Serializer; import org.axonframework.serialization.SimpleSerializedObject; +import org.axonframework.serialization.upcasting.event.EventUpcasterChain; +import org.axonframework.serialization.upcasting.event.InitialEventRepresentation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,18 +42,19 @@ import java.util.Arrays; import java.util.Optional; import java.util.function.BiFunction; +import java.util.stream.Stream; import static org.axonframework.common.BuilderUtils.assertNonNull; import static org.axonframework.extensions.kafka.eventhandling.HeaderUtils.*; import static org.axonframework.messaging.Headers.*; /** - * Converts and {@link EventMessage} to a {@link ProducerRecord} Kafka message and {from a @link ConsumerRecord} Kafka + * Converts and {@link EventMessage} to a {@link ProducerRecord} Kafka message and from a {@link ConsumerRecord} Kafka * message back to an EventMessage (if possible). *

* During conversion meta data entries with the {@code 'axon-metadata-'} prefix are passed to the {@link Headers}. Other * message-specific attributes are added as metadata. The {@link EventMessage#getPayload()} is serialized using the - * configured {@link Serializer} and passed as the Kafka recordd's body. + * configured {@link Serializer} and passed as the Kafka record's body. *

* This implementation will suffice in most cases. * @@ -65,6 +69,7 @@ public class DefaultKafkaMessageConverter implements KafkaMessageConverter> sequencingPolicy; private final BiFunction headerValueMapper; + private final EventUpcasterChain upcasterChain; /** * Instantiate a {@link DefaultKafkaMessageConverter} based on the fields contained in the {@link Builder}. @@ -80,6 +85,7 @@ protected DefaultKafkaMessageConverter(Builder builder) { this.serializer = builder.serializer; this.sequencingPolicy = builder.sequencingPolicy; this.headerValueMapper = builder.headerValueMapper; + this.upcasterChain = builder.upcasterChain; } /** @@ -114,9 +120,9 @@ public static Builder builder() { public ProducerRecord createKafkaMessage(EventMessage eventMessage, String topic) { SerializedObject serializedObject = eventMessage.serializePayload(serializer, byte[].class); return new ProducerRecord<>( - topic, null, null, recordKey(eventMessage), - serializedObject.getData(), - toHeaders(eventMessage, serializedObject, headerValueMapper) + topic, null, null, recordKey(eventMessage), + serializedObject.getData(), + toHeaders(eventMessage, serializedObject, headerValueMapper) ); } @@ -130,9 +136,16 @@ public Optional> readKafkaMessage(ConsumerRecord try { Headers headers = consumerRecord.headers(); if (isAxonMessage(headers)) { + byte[] messageBody = consumerRecord.value(); - SerializedMessage message = extractSerializedMessage(headers, messageBody); - return buildMessage(headers, message); + final Optional> message; + // domain events may be upcasted + if (isDomainEvent(headers)) { + message = createDomainEventAndUpcast(headers, messageBody); + } else { + message = createEvent(headers, messageBody); + } + return message.flatMap(serializedMessage -> buildMessage(headers, serializedMessage)); } } catch (Exception e) { logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, e); @@ -145,35 +158,61 @@ private boolean isAxonMessage(Headers headers) { return keys(headers).containsAll(Arrays.asList(MESSAGE_ID, MESSAGE_TYPE)); } - private SerializedMessage extractSerializedMessage(Headers headers, byte[] messageBody) { + private Optional> createEvent(Headers headers, byte[] messageBody) { SimpleSerializedObject serializedObject = new SimpleSerializedObject<>( - messageBody, - byte[].class, - valueAsString(headers, MESSAGE_TYPE), - valueAsString(headers, MESSAGE_REVISION, null) + messageBody, + byte[].class, + valueAsString(headers, MESSAGE_TYPE), + valueAsString(headers, MESSAGE_REVISION, null) ); - return new SerializedMessage<>( - valueAsString(headers, MESSAGE_ID), - new LazyDeserializingObject<>(serializedObject, serializer), - new LazyDeserializingObject<>(MetaData.from(extractAxonMetadata(headers))) + return Optional.of(new SerializedMessage<>( + valueAsString(headers, MESSAGE_ID), + new LazyDeserializingObject<>(serializedObject, serializer), + new LazyDeserializingObject<>(MetaData.from(extractAxonMetadata(headers))) + )); + } + + private Optional> createDomainEventAndUpcast(Headers headers, byte[] messageBody) { + GenericDomainEventEntry domainEventEntry = new GenericDomainEventEntry<>( + valueAsString(headers, AGGREGATE_TYPE), + valueAsString(headers, AGGREGATE_ID), + valueAsLong(headers, AGGREGATE_SEQ), + valueAsString(headers, MESSAGE_ID), + valueAsLong(headers, MESSAGE_TIMESTAMP), + valueAsString(headers, MESSAGE_TYPE), + valueAsString(headers, MESSAGE_REVISION, null), + messageBody, + serializer.serialize(MetaData.from(extractAxonMetadata(headers)), byte[].class).getData() ); + + return upcasterChain.upcast( + Stream.of(new InitialEventRepresentation(domainEventEntry, serializer)) + ).findFirst().map(upcastedEventData -> new SerializedMessage<>( + valueAsString(headers, MESSAGE_ID), + new LazyDeserializingObject<>(upcastedEventData.getData(), serializer), + upcastedEventData.getMetaData() + )); + } + + private boolean isDomainEvent(Headers headers) { + return headers.lastHeader(AGGREGATE_ID) != null; } private Optional> buildMessage(Headers headers, SerializedMessage message) { long timestamp = valueAsLong(headers, MESSAGE_TIMESTAMP); - return headers.lastHeader(AGGREGATE_ID) != null - ? buildDomainEvent(headers, message, timestamp) - : buildEvent(message, timestamp); + return isDomainEvent(headers) + ? buildDomainEvent(headers, message, timestamp) + : buildEvent(message, timestamp); } private Optional> buildDomainEvent(Headers headers, SerializedMessage message, long timestamp) { return Optional.of(new GenericDomainEventMessage<>( - valueAsString(headers, AGGREGATE_TYPE), - valueAsString(headers, AGGREGATE_ID), - valueAsLong(headers, AGGREGATE_SEQ), - message, - () -> Instant.ofEpochMilli(timestamp) + valueAsString(headers, AGGREGATE_TYPE), + valueAsString(headers, AGGREGATE_ID), + valueAsLong(headers, AGGREGATE_SEQ), + message, + () -> Instant.ofEpochMilli(timestamp) )); } @@ -193,11 +232,13 @@ public static class Builder { private Serializer serializer; private SequencingPolicy> sequencingPolicy = SequentialPerAggregatePolicy.instance(); private BiFunction headerValueMapper = byteMapper(); + private EventUpcasterChain upcasterChain = new EventUpcasterChain(); /** * Sets the serializer to serialize the Event Message's payload with. * * @param serializer The serializer to serialize the Event Message's payload with + * * @return the current Builder instance, for fluent interfacing */ public Builder serializer(Serializer serializer) { @@ -211,6 +252,7 @@ public Builder serializer(Serializer serializer) { * the key for the {@link ProducerRecord}. Defaults to a {@link SequentialPerAggregatePolicy} instance. * * @param sequencingPolicy a {@link SequencingPolicy} used to generate the key for the {@link ProducerRecord} + * * @return the current Builder instance, for fluent interfacing */ public Builder sequencingPolicy(SequencingPolicy> sequencingPolicy) { @@ -226,6 +268,7 @@ public Builder sequencingPolicy(SequencingPolicy> sequen * * @param headerValueMapper a {@link BiFunction} of {@link String}, {@link Object} and {@link RecordHeader}, * used for mapping values to Kafka headers + * * @return the current Builder instance, for fluent interfacing */ public Builder headerValueMapper(BiFunction headerValueMapper) { @@ -234,6 +277,19 @@ public Builder headerValueMapper(BiFunction header return this; } + /** + * Sets the {@code upcasterChain} to be used during the consumption of events. + * + * @param upcasterChain upcaster chain to be used on event reading. + * + * @return the current Builder instance, for fluent interfacing + */ + public Builder upcasterChain(EventUpcasterChain upcasterChain) { + assertNonNull(upcasterChain, "UpcasterChain must not be null"); + this.upcasterChain = upcasterChain; + return this; + } + /** * Initializes a {@link DefaultKafkaMessageConverter} as specified through this Builder. * diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java index 8c8db17b..67a38935 100644 --- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java +++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java @@ -27,9 +27,16 @@ import org.axonframework.serialization.FixedValueRevisionResolver; import org.axonframework.serialization.SerializedObject; import org.axonframework.serialization.SimpleSerializedType; +import org.axonframework.serialization.upcasting.Upcaster; +import org.axonframework.serialization.upcasting.event.EventUpcaster; +import org.axonframework.serialization.upcasting.event.EventUpcasterChain; +import org.axonframework.serialization.upcasting.event.IntermediateEventRepresentation; import org.axonframework.serialization.xml.XStreamSerializer; import org.junit.jupiter.api.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + import static org.apache.kafka.clients.consumer.ConsumerRecord.NULL_SIZE; import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP; import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE; @@ -177,11 +184,26 @@ void testReadingMessagePayloadDifferentThanByteShouldReturnEmptyMessage() { @Test void testWritingEventMessageShouldBeReadAsEventMessage() { + AtomicInteger upcasterCalled = new AtomicInteger(0); + + EventUpcasterChain chain = new EventUpcasterChain(new EventUpcaster() { + @Override + public Stream upcast( + Stream intermediateRepresentations) { + upcasterCalled.addAndGet(1); + return intermediateRepresentations; + } + }); + + testSubject = DefaultKafkaMessageConverter.builder().serializer(serializer).upcasterChain(chain).build(); + EventMessage expected = eventMessage(); ProducerRecord senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC); EventMessage actual = receiverMessage(senderMessage); assertEventMessage(actual, expected); + // upcasting should not happen on event messages, but on domain event messages only. + assertEquals(0, upcasterCalled.get()); } @Test @@ -205,6 +227,31 @@ void testWritingDomainEventMessageShouldBeReadAsDomainMessage() { assertDomainMessage((DomainEventMessage) actual, expected); } + @Test + void testWritingDomainEventMessageShouldBeReadAsDomainMessageAndPassUpcaster() { + + AtomicInteger upcasterCalled = new AtomicInteger(0); + + EventUpcasterChain chain = new EventUpcasterChain(new EventUpcaster() { + @Override + public Stream upcast( + Stream intermediateRepresentations) { + upcasterCalled.addAndGet(1); + return intermediateRepresentations; + } + }); + testSubject = DefaultKafkaMessageConverter.builder().serializer(serializer).upcasterChain(chain).build(); + + DomainEventMessage expected = domainMessage(); + ProducerRecord senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC); + EventMessage actual = receiverMessage(senderMessage); + + assertEventMessage(actual, expected); + assertDomainMessage((DomainEventMessage) actual, expected); + assertEquals(1, upcasterCalled.get()); + } + + @Test void testBuildWithoutSerializerThrowsAxonConfigurationException() { DefaultKafkaMessageConverter.Builder testSubject = DefaultKafkaMessageConverter.builder(); @@ -233,6 +280,14 @@ void testBuildWithNullHeaderValueMapperThrowsAxonConfigurationException() { assertThrows(AxonConfigurationException.class, () -> testSubject.headerValueMapper(null)); } + @Test + void testBuildWithNullUpcasterChainThrowsAxonConfigurationException() { + DefaultKafkaMessageConverter.Builder testSubject = DefaultKafkaMessageConverter.builder(); + + assertThrows(AxonConfigurationException.class, () -> testSubject.upcasterChain(null)); + } + + private void assertDomainMessage(DomainEventMessage actual, DomainEventMessage expected) { assertEquals(expected.getAggregateIdentifier(), actual.getAggregateIdentifier()); assertEquals(expected.getSequenceNumber(), actual.getSequenceNumber());