Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: implement support for event upcasters, fix #193 #195

Merged
merged 8 commits into from Nov 3, 2021
Expand Up @@ -35,6 +35,7 @@
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.spring.config.AxonConfiguration;
import org.axonframework.springboot.autoconfig.AxonAutoConfiguration;
import org.axonframework.springboot.autoconfig.InfraConfiguration;
Expand Down Expand Up @@ -84,9 +85,10 @@ public KafkaAutoConfiguration(KafkaProperties properties) {
@Bean
@ConditionalOnMissingBean
public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(
@Qualifier("eventSerializer") Serializer eventSerializer
@Qualifier("eventSerializer") Serializer eventSerializer,
AxonConfiguration config
) {
return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).build();
return DefaultKafkaMessageConverter.builder().serializer(eventSerializer).upcasterChain(config.upcasterChain() != null ? config.upcasterChain() : new EventUpcasterChain()).build();
}

@Bean("axonKafkaProducerFactory")
Expand All @@ -96,18 +98,18 @@ public ProducerFactory<String, byte[]> kafkaProducerFactory() {
String transactionIdPrefix = properties.getProducer().getTransactionIdPrefix();

DefaultProducerFactory.Builder<String, byte[]> builder =
DefaultProducerFactory.<String, byte[]>builder()
.configuration(properties.buildProducerProperties())
.confirmationMode(confirmationMode);
DefaultProducerFactory.<String, byte[]>builder()
.configuration(properties.buildProducerProperties())
.confirmationMode(confirmationMode);

if (isNonEmptyString(transactionIdPrefix)) {
builder.transactionalIdPrefix(transactionIdPrefix)
.confirmationMode(ConfirmationMode.TRANSACTIONAL);
if (!confirmationMode.isTransactional()) {
logger.warn(
"The confirmation mode is set to [{}], whilst a transactional id prefix is present. "
+ "The transactional id prefix overwrites the confirmation mode choice to TRANSACTIONAL",
confirmationMode
"The confirmation mode is set to [{}], whilst a transactional id prefix is present. "
+ "The transactional id prefix overwrites the confirmation mode choice to TRANSACTIONAL",
confirmationMode
);
}
}
Expand All @@ -122,10 +124,11 @@ private boolean isNonEmptyString(String s) {
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@ConditionalOnMissingBean
@Bean(destroyMethod = "shutDown")
@ConditionalOnBean({ProducerFactory.class, KafkaMessageConverter.class})
public KafkaPublisher<String, byte[]> kafkaPublisher(ProducerFactory<String, byte[]> axonKafkaProducerFactory,
KafkaMessageConverter<String, byte[]> kafkaMessageConverter,
AxonConfiguration configuration) {
@ConditionalOnBean({ ProducerFactory.class, KafkaMessageConverter.class })
public KafkaPublisher<String, byte[]> kafkaPublisher(
ProducerFactory<String, byte[]> axonKafkaProducerFactory,
KafkaMessageConverter<String, byte[]> kafkaMessageConverter,
AxonConfiguration configuration) {
return KafkaPublisher.<String, byte[]>builder()
.producerFactory(axonKafkaProducerFactory)
.messageConverter(kafkaMessageConverter)
Expand All @@ -137,12 +140,13 @@ public KafkaPublisher<String, byte[]> kafkaPublisher(ProducerFactory<String, byt
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean({KafkaPublisher.class})
public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(KafkaPublisher<String, byte[]> kafkaPublisher,
KafkaProperties kafkaProperties,
EventProcessingConfigurer eventProcessingConfigurer) {
@ConditionalOnBean({ KafkaPublisher.class })
zambrovski marked this conversation as resolved.
Show resolved Hide resolved
public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(
KafkaPublisher<String, byte[]> kafkaPublisher,
KafkaProperties kafkaProperties,
EventProcessingConfigurer eventProcessingConfigurer) {
KafkaEventPublisher<String, byte[]> kafkaEventPublisher =
KafkaEventPublisher.<String, byte[]>builder().kafkaPublisher(kafkaPublisher).build();
KafkaEventPublisher.<String, byte[]>builder().kafkaPublisher(kafkaPublisher).build();

/*
* Register an invocation error handler which re-throws any exception.
Expand All @@ -152,11 +156,11 @@ public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(KafkaPublisher<St
*/
eventProcessingConfigurer.registerEventHandler(configuration -> kafkaEventPublisher)
.registerListenerInvocationErrorHandler(
DEFAULT_PROCESSING_GROUP, configuration -> PropagatingErrorHandler.instance()
DEFAULT_PROCESSING_GROUP, configuration -> PropagatingErrorHandler.instance()
)
.assignHandlerTypesMatching(
DEFAULT_PROCESSING_GROUP,
clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class)
DEFAULT_PROCESSING_GROUP,
clazz -> clazz.isAssignableFrom(KafkaEventPublisher.class)
);

KafkaProperties.EventProcessorMode processorMode = kafkaProperties.getProducer().getEventProcessorMode();
Expand Down Expand Up @@ -189,20 +193,20 @@ public ConsumerFactory<String, byte[]> kafkaConsumerFactory() {

@Bean
@ConditionalOnMissingBean
@ConditionalOnBean({ConsumerFactory.class, KafkaMessageConverter.class, Fetcher.class})
@ConditionalOnBean({ ConsumerFactory.class, KafkaMessageConverter.class, Fetcher.class })
@Conditional(StreamingProcessorModeCondition.class)
public StreamableKafkaMessageSource<String, byte[]> streamableKafkaMessageSource(
ConsumerFactory<String, byte[]> kafkaConsumerFactory,
Fetcher<String, byte[], KafkaEventMessage> kafkaFetcher,
KafkaMessageConverter<String, byte[]> kafkaMessageConverter
ConsumerFactory<String, byte[]> kafkaConsumerFactory,
Fetcher<String, byte[], KafkaEventMessage> kafkaFetcher,
KafkaMessageConverter<String, byte[]> kafkaMessageConverter
) {
return StreamableKafkaMessageSource.<String, byte[]>builder()
.topics(Collections.singletonList(properties.getDefaultTopic()))
.consumerFactory(kafkaConsumerFactory)
.fetcher(kafkaFetcher)
.messageConverter(kafkaMessageConverter)
.bufferFactory(() -> new SortedKafkaMessageBuffer<>(
properties.getFetcher().getBufferSize()
properties.getFetcher().getBufferSize()
))
.build();
}
Expand Down