Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: implement support for event upcasters, fix #193 #195

Merged
merged 8 commits into from Nov 3, 2021
Expand Up @@ -35,6 +35,7 @@
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.spring.config.AxonConfiguration;
import org.axonframework.springboot.autoconfig.AxonAutoConfiguration;
import org.axonframework.springboot.autoconfig.InfraConfiguration;
Expand Down Expand Up @@ -84,9 +85,16 @@ public KafkaAutoConfiguration(KafkaProperties properties) {
@Bean
@ConditionalOnMissingBean
public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(
@Qualifier("eventSerializer") Serializer eventSerializer
@Qualifier("eventSerializer") Serializer eventSerializer,
EventUpcasterChain eventUpcasterChain
zambrovski marked this conversation as resolved.
Show resolved Hide resolved
) {
return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).build();
return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).upcasterChain(eventUpcasterChain).build();
}

@Bean
@ConditionalOnMissingBean
public EventUpcasterChain emptyUpcasterChain() {
return new EventUpcasterChain();
}

@Bean("axonKafkaProducerFactory")
Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.header.internals.RecordHeader;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventEntry;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.async.SequencingPolicy;
Expand All @@ -32,25 +33,28 @@
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.SimpleSerializedObject;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.upcasting.event.InitialEventRepresentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.stream.Stream;

import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.extensions.kafka.eventhandling.HeaderUtils.*;
import static org.axonframework.messaging.Headers.*;

/**
* Converts and {@link EventMessage} to a {@link ProducerRecord} Kafka message and {from a @link ConsumerRecord} Kafka
* Converts and {@link EventMessage} to a {@link ProducerRecord} Kafka message and from a {@link ConsumerRecord} Kafka
zambrovski marked this conversation as resolved.
Show resolved Hide resolved
* message back to an EventMessage (if possible).
* <p>
* During conversion meta data entries with the {@code 'axon-metadata-'} prefix are passed to the {@link Headers}. Other
* message-specific attributes are added as metadata. The {@link EventMessage#getPayload()} is serialized using the
* configured {@link Serializer} and passed as the Kafka recordd's body.
* configured {@link Serializer} and passed as the Kafka record's body.
* <p>
* This implementation will suffice in most cases.
*
Expand All @@ -65,6 +69,7 @@ public class DefaultKafkaMessageConverter implements KafkaMessageConverter<Strin
private final Serializer serializer;
private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
private final BiFunction<String, Object, RecordHeader> headerValueMapper;
private final EventUpcasterChain upcasterChain;

/**
* Instantiate a {@link DefaultKafkaMessageConverter} based on the fields contained in the {@link Builder}.
Expand All @@ -80,6 +85,7 @@ protected DefaultKafkaMessageConverter(Builder builder) {
this.serializer = builder.serializer;
this.sequencingPolicy = builder.sequencingPolicy;
this.headerValueMapper = builder.headerValueMapper;
this.upcasterChain = builder.upcasterChain;
}

/**
Expand Down Expand Up @@ -114,9 +120,9 @@ public static Builder builder() {
public ProducerRecord<String, byte[]> createKafkaMessage(EventMessage<?> eventMessage, String topic) {
SerializedObject<byte[]> serializedObject = eventMessage.serializePayload(serializer, byte[].class);
return new ProducerRecord<>(
topic, null, null, recordKey(eventMessage),
serializedObject.getData(),
toHeaders(eventMessage, serializedObject, headerValueMapper)
topic, null, null, recordKey(eventMessage),
serializedObject.getData(),
toHeaders(eventMessage, serializedObject, headerValueMapper)
);
}

Expand All @@ -130,9 +136,16 @@ public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, byte[]>
try {
Headers headers = consumerRecord.headers();
if (isAxonMessage(headers)) {

byte[] messageBody = consumerRecord.value();
SerializedMessage<?> message = extractSerializedMessage(headers, messageBody);
return buildMessage(headers, message);
final Optional<SerializedMessage<?>> message;
// domain events may be upcasted
zambrovski marked this conversation as resolved.
Show resolved Hide resolved
if (isDomainEvent(headers)) {
zambrovski marked this conversation as resolved.
Show resolved Hide resolved
message = createDomainEventAndUpcast(headers, messageBody);
} else {
message = createEvent(headers, messageBody);
}
return message.flatMap(serializedMessage -> buildMessage(headers, serializedMessage));
}
} catch (Exception e) {
logger.trace("Error converting ConsumerRecord [{}] to an EventMessage", consumerRecord, e);
Expand All @@ -145,35 +158,61 @@ private boolean isAxonMessage(Headers headers) {
return keys(headers).containsAll(Arrays.asList(MESSAGE_ID, MESSAGE_TYPE));
}

private SerializedMessage<?> extractSerializedMessage(Headers headers, byte[] messageBody) {
private Optional<SerializedMessage<?>> createEvent(Headers headers, byte[] messageBody) {
SimpleSerializedObject<byte[]> serializedObject = new SimpleSerializedObject<>(
messageBody,
byte[].class,
valueAsString(headers, MESSAGE_TYPE),
valueAsString(headers, MESSAGE_REVISION, null)
messageBody,
byte[].class,
valueAsString(headers, MESSAGE_TYPE),
valueAsString(headers, MESSAGE_REVISION, null)
);

return new SerializedMessage<>(
valueAsString(headers, MESSAGE_ID),
new LazyDeserializingObject<>(serializedObject, serializer),
new LazyDeserializingObject<>(MetaData.from(extractAxonMetadata(headers)))
return Optional.of(new SerializedMessage<>(
valueAsString(headers, MESSAGE_ID),
new LazyDeserializingObject<>(serializedObject, serializer),
new LazyDeserializingObject<>(MetaData.from(extractAxonMetadata(headers)))
));
}

private Optional<SerializedMessage<?>> createDomainEventAndUpcast(Headers headers, byte[] messageBody) {
GenericDomainEventEntry<Object> domainEventEntry = new GenericDomainEventEntry<>(
valueAsString(headers, AGGREGATE_TYPE),
valueAsString(headers, AGGREGATE_ID),
valueAsLong(headers, AGGREGATE_SEQ),
valueAsString(headers, MESSAGE_ID),
valueAsLong(headers, MESSAGE_TIMESTAMP),
valueAsString(headers, MESSAGE_TYPE),
valueAsString(headers, MESSAGE_REVISION, null),
messageBody,
serializer.serialize(MetaData.from(extractAxonMetadata(headers)), byte[].class).getData()
);

return upcasterChain.upcast(
Stream.of(new InitialEventRepresentation(domainEventEntry, serializer))
zambrovski marked this conversation as resolved.
Show resolved Hide resolved
).findFirst().map(upcastedEventData -> new SerializedMessage<>(
valueAsString(headers, MESSAGE_ID),
new LazyDeserializingObject<>(upcastedEventData.getData(), serializer),
upcastedEventData.getMetaData()
));
}

private boolean isDomainEvent(Headers headers) {
return headers.lastHeader(AGGREGATE_ID) != null;
}

private Optional<EventMessage<?>> buildMessage(Headers headers, SerializedMessage<?> message) {
long timestamp = valueAsLong(headers, MESSAGE_TIMESTAMP);
return headers.lastHeader(AGGREGATE_ID) != null
? buildDomainEvent(headers, message, timestamp)
: buildEvent(message, timestamp);
return isDomainEvent(headers)
? buildDomainEvent(headers, message, timestamp)
: buildEvent(message, timestamp);
}

private Optional<EventMessage<?>> buildDomainEvent(Headers headers, SerializedMessage<?> message, long timestamp) {
return Optional.of(new GenericDomainEventMessage<>(
valueAsString(headers, AGGREGATE_TYPE),
valueAsString(headers, AGGREGATE_ID),
valueAsLong(headers, AGGREGATE_SEQ),
message,
() -> Instant.ofEpochMilli(timestamp)
valueAsString(headers, AGGREGATE_TYPE),
valueAsString(headers, AGGREGATE_ID),
valueAsLong(headers, AGGREGATE_SEQ),
message,
() -> Instant.ofEpochMilli(timestamp)
));
}

Expand All @@ -193,11 +232,13 @@ public static class Builder {
private Serializer serializer;
private SequencingPolicy<? super EventMessage<?>> sequencingPolicy = SequentialPerAggregatePolicy.instance();
private BiFunction<String, Object, RecordHeader> headerValueMapper = byteMapper();
private EventUpcasterChain upcasterChain = new EventUpcasterChain();

/**
* Sets the serializer to serialize the Event Message's payload with.
*
* @param serializer The serializer to serialize the Event Message's payload with
*
* @return the current Builder instance, for fluent interfacing
*/
public Builder serializer(Serializer serializer) {
Expand All @@ -211,6 +252,7 @@ public Builder serializer(Serializer serializer) {
* the key for the {@link ProducerRecord}. Defaults to a {@link SequentialPerAggregatePolicy} instance.
*
* @param sequencingPolicy a {@link SequencingPolicy} used to generate the key for the {@link ProducerRecord}
*
* @return the current Builder instance, for fluent interfacing
*/
public Builder sequencingPolicy(SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
Expand All @@ -226,6 +268,7 @@ public Builder sequencingPolicy(SequencingPolicy<? super EventMessage<?>> sequen
*
* @param headerValueMapper a {@link BiFunction} of {@link String}, {@link Object} and {@link RecordHeader},
* used for mapping values to Kafka headers
*
* @return the current Builder instance, for fluent interfacing
*/
public Builder headerValueMapper(BiFunction<String, Object, RecordHeader> headerValueMapper) {
Expand All @@ -234,6 +277,19 @@ public Builder headerValueMapper(BiFunction<String, Object, RecordHeader> header
return this;
}

/**
* Sets the {@code upcasterChain} to be used during the consumption of events.
*
* @param upcasterChain upcaster chain to be used on event reading.
*
* @return the current Builder instance, for fluent interfacing
*/
public Builder upcasterChain(EventUpcasterChain upcasterChain) {
assertNonNull(upcasterChain, "UpcasterChain must not be null");
this.upcasterChain = upcasterChain;
return this;
}

/**
* Initializes a {@link DefaultKafkaMessageConverter} as specified through this Builder.
*
Expand Down
Expand Up @@ -27,9 +27,16 @@
import org.axonframework.serialization.FixedValueRevisionResolver;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.SimpleSerializedType;
import org.axonframework.serialization.upcasting.Upcaster;
import org.axonframework.serialization.upcasting.event.EventUpcaster;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.upcasting.event.IntermediateEventRepresentation;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.junit.jupiter.api.*;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import static org.apache.kafka.clients.consumer.ConsumerRecord.NULL_SIZE;
import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE;
Expand Down Expand Up @@ -177,11 +184,26 @@ void testReadingMessagePayloadDifferentThanByteShouldReturnEmptyMessage() {

@Test
void testWritingEventMessageShouldBeReadAsEventMessage() {
AtomicInteger upcasterCalled = new AtomicInteger(0);

EventUpcasterChain chain = new EventUpcasterChain(new EventUpcaster() {
@Override
public Stream<IntermediateEventRepresentation> upcast(
Stream<IntermediateEventRepresentation> intermediateRepresentations) {
upcasterCalled.addAndGet(1);
return intermediateRepresentations;
}
});

testSubject = DefaultKafkaMessageConverter.builder().serializer(serializer).upcasterChain(chain).build();

EventMessage<?> expected = eventMessage();
ProducerRecord<String, byte[]> senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC);
EventMessage<?> actual = receiverMessage(senderMessage);

assertEventMessage(actual, expected);
// upcasting should not happen on event messages, but on domain event messages only.
assertEquals(0, upcasterCalled.get());
}

@Test
Expand All @@ -205,6 +227,31 @@ void testWritingDomainEventMessageShouldBeReadAsDomainMessage() {
assertDomainMessage((DomainEventMessage<?>) actual, expected);
}

@Test
void testWritingDomainEventMessageShouldBeReadAsDomainMessageAndPassUpcaster() {

AtomicInteger upcasterCalled = new AtomicInteger(0);

EventUpcasterChain chain = new EventUpcasterChain(new EventUpcaster() {
@Override
public Stream<IntermediateEventRepresentation> upcast(
Stream<IntermediateEventRepresentation> intermediateRepresentations) {
upcasterCalled.addAndGet(1);
return intermediateRepresentations;
}
});
testSubject = DefaultKafkaMessageConverter.builder().serializer(serializer).upcasterChain(chain).build();

DomainEventMessage<?> expected = domainMessage();
ProducerRecord<String, byte[]> senderMessage = testSubject.createKafkaMessage(expected, SOME_TOPIC);
EventMessage<?> actual = receiverMessage(senderMessage);

assertEventMessage(actual, expected);
assertDomainMessage((DomainEventMessage<?>) actual, expected);
assertEquals(1, upcasterCalled.get());
}


@Test
void testBuildWithoutSerializerThrowsAxonConfigurationException() {
DefaultKafkaMessageConverter.Builder testSubject = DefaultKafkaMessageConverter.builder();
Expand Down Expand Up @@ -233,6 +280,14 @@ void testBuildWithNullHeaderValueMapperThrowsAxonConfigurationException() {
assertThrows(AxonConfigurationException.class, () -> testSubject.headerValueMapper(null));
}

@Test
void testBuildWithNullUpcasterChainThrowsAxonConfigurationException() {
DefaultKafkaMessageConverter.Builder testSubject = DefaultKafkaMessageConverter.builder();

assertThrows(AxonConfigurationException.class, () -> testSubject.upcasterChain(null));
}


private void assertDomainMessage(DomainEventMessage<?> actual, DomainEventMessage<?> expected) {
assertEquals(expected.getAggregateIdentifier(), actual.getAggregateIdentifier());
assertEquals(expected.getSequenceNumber(), actual.getSequenceNumber());
Expand Down