Skip to content

Commit

Permalink
feature: implement support for event upcasters, fix #193
Browse files Browse the repository at this point in the history
  • Loading branch information
zambrovski committed Oct 12, 2021
1 parent bbe856f commit d80be3b
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 26 deletions.
Expand Up @@ -35,6 +35,7 @@
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.spring.config.AxonConfiguration;
import org.axonframework.springboot.autoconfig.AxonAutoConfiguration;
import org.axonframework.springboot.autoconfig.InfraConfiguration;
Expand Down Expand Up @@ -84,9 +85,16 @@ public KafkaAutoConfiguration(KafkaProperties properties) {
@Bean
@ConditionalOnMissingBean
public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(
@Qualifier("eventSerializer") Serializer eventSerializer
@Qualifier("eventSerializer") Serializer eventSerializer,
EventUpcasterChain eventUpcasterChain
) {
return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).build();
return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).upcasterChain(eventUpcasterChain).build();
}

@Bean
@ConditionalOnMissingBean
public EventUpcasterChain emptyUpcasterChain() {
return new EventUpcasterChain();
}

@Bean("axonKafkaProducerFactory")
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.header.internals.RecordHeader;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventEntry;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.async.SequencingPolicy;
Expand All @@ -32,25 +33,28 @@
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.SimpleSerializedObject;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.upcasting.event.InitialEventRepresentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.stream.Stream;

import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.extensions.kafka.eventhandling.HeaderUtils.*;
import static org.axonframework.messaging.Headers.*;

/**
* Converts and {@link EventMessage} to a {@link ProducerRecord} Kafka message and {from a @link ConsumerRecord} Kafka
* Converts and {@link EventMessage} to a {@link ProducerRecord} Kafka message and from a {@link ConsumerRecord} Kafka
* message back to an EventMessage (if possible).
* <p>
* During conversion meta data entries with the {@code 'axon-metadata-'} prefix are passed to the {@link Headers}. Other
* message-specific attributes are added as metadata. The {@link EventMessage#getPayload()} is serialized using the
* configured {@link Serializer} and passed as the Kafka recordd's body.
* configured {@link Serializer} and passed as the Kafka record's body.
* <p>
* This implementation will suffice in most cases.
*
Expand All @@ -65,6 +69,7 @@ public class DefaultKafkaMessageConverter implements KafkaMessageConverter<Strin
private final Serializer serializer;
private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
private final BiFunction<String, Object, RecordHeader> headerValueMapper;
private final EventUpcasterChain upcasterChain;

/**
* Instantiate a {@link DefaultKafkaMessageConverter} based on the fields contained in the {@link Builder}.
Expand All @@ -80,6 +85,7 @@ protected DefaultKafkaMessageConverter(Builder builder) {
this.serializer = builder.serializer;
this.sequencingPolicy = builder.sequencingPolicy;
this.headerValueMapper = builder.headerValueMapper;
this.upcasterChain = builder.upcasterChain;
}

/**
Expand Down Expand Up @@ -114,9 +120,9 @@ public static Builder builder() {
public ProducerRecord<String, byte[]> createKafkaMessage(EventMessage<?> eventMessage, String topic) {
SerializedObject<byte[]> serializedObject = eventMessage.serializePayload(serializer, byte[].class);
return new ProducerRecord<>(
topic, null, null, recordKey(eventMessage),
serializedObject.getData(),
toHeaders(eventMessage, serializedObject, headerValueMapper)
topic, null, null, recordKey(eventMessage),
serializedObject.getData(),
toHeaders(eventMessage, serializedObject, headerValueMapper)
);
}

Expand All @@ -130,9 +136,16 @@ public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, byte[]>
try {
Headers headers = consumerRecord.headers();
if (isAxonMessage(headers)) {

byte[] messageBody = consumerRecord.value();
SerializedMessage<?> message = extractSerializedMessage(headers, messageBody);
return buildMessage(headers, message);
final Optional<SerializedMessage<?>> message;
// domain events may be upcasted
if (isDomainEvent(headers)) {
message = createDomainEventAndUpcast(headers, messageBody);
} else {
message = createEvent(headers, messageBody);
}
return message.flatMap(serializedMessage -> buildMessage(headers, serializedMessage));
}
} catch (Exception e) {
logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, e);
Expand All @@ -145,35 +158,61 @@ private boolean isAxonMessage(Headers headers) {
return keys(headers).containsAll(Arrays.asList(MESSAGE_ID, MESSAGE_TYPE));
}

private SerializedMessage<?> extractSerializedMessage(Headers headers, byte[] messageBody) {
private Optional<SerializedMessage<?>> createEvent(Headers headers, byte[] messageBody) {
SimpleSerializedObject<byte[]> serializedObject = new SimpleSerializedObject<>(
messageBody,
byte[].class,
valueAsString(headers, MESSAGE_TYPE),
valueAsString(headers, MESSAGE_REVISION, null)
messageBody,
byte[].class,
valueAsString(headers, MESSAGE_TYPE),
valueAsString(headers, MESSAGE_REVISION, null)
);

return new SerializedMessage<>(
valueAsString(headers, MESSAGE_ID),
new LazyDeserializingObject<>(serializedObject, serializer),
new LazyDeserializingObject<>(MetaData.from(extractAxonMetadata(headers)))
return Optional.of(new SerializedMessage<>(
valueAsString(headers, MESSAGE_ID),
new LazyDeserializingObject<>(serializedObject, serializer),
new LazyDeserializingObject<>(MetaData.from(extractAxonMetadata(headers)))
));
}

private Optional<SerializedMessage<?>> createDomainEventAndUpcast(Headers headers, byte[] messageBody) {
GenericDomainEventEntry<Object> domainEventEntry = new GenericDomainEventEntry<>(
valueAsString(headers, AGGREGATE_TYPE),
valueAsString(headers, AGGREGATE_ID),
valueAsLong(headers, AGGREGATE_SEQ),
valueAsString(headers, MESSAGE_ID),
valueAsLong(headers, MESSAGE_TIMESTAMP),
valueAsString(headers, MESSAGE_TYPE),
valueAsString(headers, MESSAGE_REVISION, null),
messageBody,
serializer.serialize(MetaData.from(extractAxonMetadata(headers)), byte[].class).getData()
);

return upcasterChain.upcast(
Stream.of(new InitialEventRepresentation(domainEventEntry, serializer))
).findFirst().map(upcastedEventData -> new SerializedMessage<>(
valueAsString(headers, MESSAGE_ID),
new LazyDeserializingObject<>(upcastedEventData.getData(), serializer),
upcastedEventData.getMetaData()
));
}

private boolean isDomainEvent(Headers headers) {
return headers.lastHeader(AGGREGATE_ID) != null;
}

private Optional<EventMessage<?>> buildMessage(Headers headers, SerializedMessage<?> message) {
long timestamp = valueAsLong(headers, MESSAGE_TIMESTAMP);
return headers.lastHeader(AGGREGATE_ID) != null
? buildDomainEvent(headers, message, timestamp)
: buildEvent(message, timestamp);
return isDomainEvent(headers)
? buildDomainEvent(headers, message, timestamp)
: buildEvent(message, timestamp);
}

private Optional<EventMessage<?>> buildDomainEvent(Headers headers, SerializedMessage<?> message, long timestamp) {
return Optional.of(new GenericDomainEventMessage<>(
valueAsString(headers, AGGREGATE_TYPE),
valueAsString(headers, AGGREGATE_ID),
valueAsLong(headers, AGGREGATE_SEQ),
message,
() -> Instant.ofEpochMilli(timestamp)
valueAsString(headers, AGGREGATE_TYPE),
valueAsString(headers, AGGREGATE_ID),
valueAsLong(headers, AGGREGATE_SEQ),
message,
() -> Instant.ofEpochMilli(timestamp)
));
}

Expand All @@ -193,11 +232,13 @@ public static class Builder {
private Serializer serializer;
private SequencingPolicy<? super EventMessage<?>> sequencingPolicy = SequentialPerAggregatePolicy.instance();
private BiFunction<String, Object, RecordHeader> headerValueMapper = byteMapper();
private EventUpcasterChain upcasterChain = new EventUpcasterChain();

/**
* Sets the serializer to serialize the Event Message's payload with.
*
* @param serializer The serializer to serialize the Event Message's payload with
*
* @return the current Builder instance, for fluent interfacing
*/
public Builder serializer(Serializer serializer) {
Expand All @@ -211,6 +252,7 @@ public Builder serializer(Serializer serializer) {
* the key for the {@link ProducerRecord}. Defaults to a {@link SequentialPerAggregatePolicy} instance.
*
* @param sequencingPolicy a {@link SequencingPolicy} used to generate the key for the {@link ProducerRecord}
*
* @return the current Builder instance, for fluent interfacing
*/
public Builder sequencingPolicy(SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
Expand All @@ -226,6 +268,7 @@ public Builder sequencingPolicy(SequencingPolicy<? super EventMessage<?>> sequen
*
* @param headerValueMapper a {@link BiFunction} of {@link String}, {@link Object} and {@link RecordHeader},
* used for mapping values to Kafka headers
*
* @return the current Builder instance, for fluent interfacing
*/
public Builder headerValueMapper(BiFunction<String, Object, RecordHeader> headerValueMapper) {
Expand All @@ -234,6 +277,19 @@ public Builder headerValueMapper(BiFunction<String, Object, RecordHeader> header
return this;
}

/**
* Sets the {@code upcasterChain} to be used during the consumption of events.
*
* @param upcasterChain upcaster chain to be used on event reading.
*
* @return the current Builder instance, for fluent interfacing
*/
public Builder upcasterChain(EventUpcasterChain upcasterChain) {
assertNonNull(upcasterChain, "UpcasterChain must not be null");
this.upcasterChain = upcasterChain;
return this;
}

/**
* Initializes a {@link DefaultKafkaMessageConverter} as specified through this Builder.
*
Expand Down
Expand Up @@ -27,9 +27,16 @@
import org.axonframework.serialization.FixedValueRevisionResolver;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.SimpleSerializedType;
import org.axonframework.serialization.upcasting.Upcaster;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.upcasting.event.IntermediateEventRepresentation;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.junit.jupiter.api.*;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import static org.apache.kafka.clients.consumer.ConsumerRecord.NULL_SIZE;
import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE;
Expand Down Expand Up @@ -177,11 +184,26 @@ void testReadingMessagePayloadDifferentThanByteShouldReturnEmptyMessage() {

@Test
void testWritingEventMessageShouldBeReadAsEventMessage() {
AtomicInteger upcasterCalled = new AtomicInteger(0);

EventUpcasterChain chain = new EventUpcasterChain(new EventUpcaster() {
@Override
public Stream<IntermediateEventRepresentation> upcast(
Stream<IntermediateEventRepresentation> intermediateRepresentations) {
upcasterCalled.addAndGet(1);
return intermediateRepresentations;
}
});

testSubject = DefaultKafkaMessageConverter.builder().serializer(serializer).upcasterChain(chain).build();

EventMessage<?> expected = eventMessage();
ProducerRecord<String, byte[]> senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC);
EventMessage<?> actual = receiverMessage(senderMessage);

assertEventMessage(actual, expected);
// upcasting should not happen on event messages, but on domain event messages only.
assertEquals(0, upcasterCalled.get());
}

@Test
Expand All @@ -205,6 +227,31 @@ void testWritingDomainEventMessageShouldBeReadAsDomainMessage() {
assertDomainMessage((DomainEventMessage<?>) actual, expected);
}

@Test
void testWritingDomainEventMessageShouldBeReadAsDomainMessageAndPassUpcaster() {

AtomicInteger upcasterCalled = new AtomicInteger(0);

EventUpcasterChain chain = new EventUpcasterChain(new EventUpcaster() {
@Override
public Stream<IntermediateEventRepresentation> upcast(
Stream<IntermediateEventRepresentation> intermediateRepresentations) {
upcasterCalled.addAndGet(1);
return intermediateRepresentations;
}
});
testSubject = DefaultKafkaMessageConverter.builder().serializer(serializer).upcasterChain(chain).build();

DomainEventMessage<?> expected = domainMessage();
ProducerRecord<String, byte[]> senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC);
EventMessage<?> actual = receiverMessage(senderMessage);

assertEventMessage(actual, expected);
assertDomainMessage((DomainEventMessage<?>) actual, expected);
assertEquals(1, upcasterCalled.get());
}


@Test
void testBuildWithoutSerializerThrowsAxonConfigurationException() {
DefaultKafkaMessageConverter.Builder testSubject = DefaultKafkaMessageConverter.builder();
Expand Down Expand Up @@ -233,6 +280,14 @@ void testBuildWithNullHeaderValueMapperThrowsAxonConfigurationException() {
assertThrows(AxonConfigurationException.class, () -> testSubject.headerValueMapper(null));
}

@Test
void testBuildWithNullUpcasterChainThrowsAxonConfigurationException() {
DefaultKafkaMessageConverter.Builder testSubject = DefaultKafkaMessageConverter.builder();

assertThrows(AxonConfigurationException.class, () -> testSubject.upcasterChain(null));
}


private void assertDomainMessage(DomainEventMessage<?> actual, DomainEventMessage<?> expected) {
assertEquals(expected.getAggregateIdentifier(), actual.getAggregateIdentifier());
assertEquals(expected.getSequenceNumber(), actual.getSequenceNumber());
Expand Down

0 comments on commit d80be3b

Please sign in to comment.