Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated serializer configuration and made them lazy #280

Merged
merged 5 commits into from Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,6 +16,10 @@

package org.axonframework.extensions.kafka.autoconfig;

import com.thoughtworks.xstream.XStream;
import java.io.File;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
Expand Down Expand Up @@ -44,23 +48,28 @@
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.CompactDriver;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.axonframework.spring.config.AxonConfiguration;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.File;
import java.util.Collections;
import java.util.Map;

import static org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher.DEFAULT_PROCESSING_GROUP;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Tests for the {@link KafkaAutoConfiguration}, verifying the minimal set of requirements and full fledged adjustments
Expand Down Expand Up @@ -555,7 +564,9 @@ protected static class TestConfiguration {

@Bean
public Serializer eventSerializer() {
return XStreamSerializer.defaultSerializer();
return XStreamSerializer.builder()
gklijs marked this conversation as resolved.
Show resolved Hide resolved
.xStream(new XStream(new CompactDriver()))
.build();
}

@Bean
Expand Down
Expand Up @@ -16,6 +16,15 @@

package org.axonframework.extensions.kafka.eventhandling.consumer.streamable;

import com.thoughtworks.xstream.XStream;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.axonframework.common.AxonConfigurationException;
Expand All @@ -29,19 +38,12 @@
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.CompactDriver;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Supplier;

import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.common.BuilderUtils.assertThat;

Expand Down Expand Up @@ -70,23 +72,6 @@ public class StreamableKafkaMessageSource<K, V> implements StreamableMessageSour
private final KafkaMessageConverter<K, V> messageConverter;
private final Supplier<Buffer<KafkaEventMessage>> bufferFactory;

/**
* Instantiate a Builder to be able to create a {@link StreamableKafkaMessageSource}.
* <p>
* The {@code topics} list is defaulted to single entry of {@code "Axon.Events"}, {@code groupIdPrefix} defaults to
* {@code "Axon.Streamable.Consumer-"} and it's {@code groupIdSuffixFactory} to a {@link UUID#randomUUID()}
* operation, the {@link KafkaMessageConverter} to a {@link DefaultKafkaMessageConverter} using the {@link
* XStreamSerializer} and the {@code bufferFactory} the {@link SortedKafkaMessageBuffer} constructor. The {@link
* ConsumerFactory} and {@link Fetcher} are <b>hard requirements</b> and as such should be provided.
*
* @param <K> the key of the {@link ConsumerRecords} to consume, fetch and convert
* @param <V> the value type of {@link ConsumerRecords} to consume, fetch and convert
* @return a Builder to be able to create an {@link StreamableKafkaMessageSource}
*/
public static <K, V> Builder<K, V> builder() {
return new Builder<>();
}

/**
* Instantiate a {@link StreamableKafkaMessageSource} based on the fields contained in the {@link Builder}.
* <p>
Expand All @@ -106,6 +91,23 @@ protected StreamableKafkaMessageSource(Builder<K, V> builder) {
this.bufferFactory = builder.bufferFactory;
}

/**
* Instantiate a Builder to be able to create a {@link StreamableKafkaMessageSource}.
* <p>
* The {@code topics} list is defaulted to single entry of {@code "Axon.Events"}, {@code groupIdPrefix} defaults to
* {@code "Axon.Streamable.Consumer-"} and it's {@code groupIdSuffixFactory} to a {@link UUID#randomUUID()}
* operation, the {@link KafkaMessageConverter} to a {@link DefaultKafkaMessageConverter} using the {@link
* XStreamSerializer} and the {@code bufferFactory} the {@link SortedKafkaMessageBuffer} constructor. The {@link
* ConsumerFactory} and {@link Fetcher} are <b>hard requirements</b> and as such should be provided.
*
* @param <K> the key of the {@link ConsumerRecords} to consume, fetch and convert
* @param <V> the value type of {@link ConsumerRecords} to consume, fetch and convert
* @return a Builder to be able to create an {@link StreamableKafkaMessageSource}
*/
public static <K, V> Builder<K, V> builder() {
return new Builder<>();
}

/**
* {@inheritDoc}
* <p>
Expand Down Expand Up @@ -153,12 +155,22 @@ public static class Builder<K, V> {
private Supplier<String> groupIdSuffixFactory = () -> UUID.randomUUID().toString();
private ConsumerFactory<K, V> consumerFactory;
private Fetcher<K, V, KafkaEventMessage> fetcher;
@SuppressWarnings({"unchecked", "squid:S1874"})
private KafkaMessageConverter<K, V> messageConverter =
(KafkaMessageConverter<K, V>) DefaultKafkaMessageConverter.builder()
.serializer(XStreamSerializer.defaultSerializer())
.build();
private KafkaMessageConverter<K, V> messageConverter;
private Supplier<Buffer<KafkaEventMessage>> bufferFactory = SortedKafkaMessageBuffer::new;
private Supplier<Serializer> serializer;
gklijs marked this conversation as resolved.
Show resolved Hide resolved

/**
* Sets the {@link Serializer} used to serialize and deserialize messages. Defaults to a {@link
* XStreamSerializer}.
*
* @param serializer a {@link Serializer} used to serialize and deserialize messages
* @return the current Builder instance, for fluent interfacing
*/
public Builder<K, V> serializer(Serializer serializer) {
assertNonNull(serializer, "The Serializer may not be null");
this.serializer = () -> serializer;
return this;
}

/**
* Set the Kafka {@code topics} to read {@link org.axonframework.eventhandling.EventMessage}s from. Defaults to
Expand Down Expand Up @@ -308,6 +320,23 @@ public StreamableKafkaMessageSource<K, V> build() {
protected void validate() throws AxonConfigurationException {
assertNonNull(consumerFactory, "The ConsumerFactory is a hard requirement and should be provided");
assertNonNull(fetcher, "The Fetcher is a hard requirement and should be provided");
if (serializer == null) {
logger.warn(
"The default XStreamSerializer is used, whereas it is strongly recommended to configure"
+ " the security context of the XStream instance.",
new AxonConfigurationException(
"A default XStreamSerializer is used, without specifying the security context"
)
);
serializer = () -> XStreamSerializer.builder()
.xStream(new XStream(new CompactDriver()))
.build();
}
if (messageConverter == null) {
messageConverter = (KafkaMessageConverter<K, V>) DefaultKafkaMessageConverter.builder()
.serializer(serializer.get())
.build();
}
}
}
}
Expand Up @@ -16,6 +16,20 @@

package org.axonframework.extensions.kafka.eventhandling.consumer.subscribable;

import com.thoughtworks.xstream.XStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.axonframework.common.AxonConfigurationException;
Expand All @@ -27,23 +41,12 @@
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.CompactDriver;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.common.BuilderUtils.assertThat;

Expand Down Expand Up @@ -82,22 +85,6 @@ public class SubscribableKafkaMessageSource<K, V> implements SubscribableMessage
private final List<Registration> fetcherRegistrations = new CopyOnWriteArrayList<>();
private final AtomicBoolean inProgress = new AtomicBoolean(false);

/**
* Instantiate a Builder to be able to create a {@link SubscribableKafkaMessageSource}.
* <p>
* The {@code topics} list is defaulted to single entry of {@code "Axon.Events"} and the {@link
* KafkaMessageConverter} to a {@link DefaultKafkaMessageConverter} using the {@link XStreamSerializer}. The {@code
* groupId}, {@link ConsumerFactory} and {@link Fetcher} are <b>hard requirements</b> and as such should be
* provided.
*
* @param <K> the key of the {@link ConsumerRecords} to consume, fetch and convert
* @param <V> the value type of {@link ConsumerRecords} to consume, fetch and convert
* @return a Builder to be able to create a {@link SubscribableKafkaMessageSource}
*/
public static <K, V> Builder<K, V> builder() {
return new Builder<>();
}

/**
* Instantiate a {@link SubscribableKafkaMessageSource} based on the fields contained in the {@link Builder}.
* <p>
Expand All @@ -118,6 +105,22 @@ protected SubscribableKafkaMessageSource(Builder<K, V> builder) {
this.consumerCount = builder.consumerCount;
}

/**
* Instantiate a Builder to be able to create a {@link SubscribableKafkaMessageSource}.
* <p>
* The {@code topics} list is defaulted to single entry of {@code "Axon.Events"} and the {@link
* KafkaMessageConverter} to a {@link DefaultKafkaMessageConverter} using the {@link XStreamSerializer}. The {@code
* groupId}, {@link ConsumerFactory} and {@link Fetcher} are <b>hard requirements</b> and as such should be
* provided.
*
* @param <K> the key of the {@link ConsumerRecords} to consume, fetch and convert
* @param <V> the value type of {@link ConsumerRecords} to consume, fetch and convert
* @return a Builder to be able to create a {@link SubscribableKafkaMessageSource}
*/
public static <K, V> Builder<K, V> builder() {
return new Builder<>();
}

/**
* {@inheritDoc}
* <p>
Expand Down Expand Up @@ -210,13 +213,23 @@ public static class Builder<K, V> {
private String groupId;
private ConsumerFactory<K, V> consumerFactory;
private Fetcher<K, V, EventMessage<?>> fetcher;
@SuppressWarnings({"unchecked", "squid:S1874"})
private KafkaMessageConverter<K, V> messageConverter =
(KafkaMessageConverter<K, V>) DefaultKafkaMessageConverter.builder()
.serializer(XStreamSerializer.defaultSerializer())
.build();
private KafkaMessageConverter<K, V> messageConverter;
private boolean autoStart = false;
private int consumerCount = 1;
private Supplier<Serializer> serializer;

/**
* Sets the {@link Serializer} used to serialize and deserialize messages. Defaults to a {@link
* XStreamSerializer}.
*
* @param serializer a {@link Serializer} used to serialize and deserialize messages
* @return the current Builder instance, for fluent interfacing
*/
public Builder<K, V> serializer(Serializer serializer) {
assertNonNull(serializer, "The Serializer may not be null");
this.serializer = () -> serializer;
return this;
}

/**
* Set the Kafka {@code topics} to read {@link org.axonframework.eventhandling.EventMessage}s from. Defaults to
Expand Down Expand Up @@ -362,6 +375,23 @@ protected void validate() throws AxonConfigurationException {
assertNonNull(groupId, "The Consumer Group Id is a hard requirement and should be provided");
assertNonNull(consumerFactory, "The ConsumerFactory is a hard requirement and should be provided");
assertNonNull(fetcher, "The Fetcher is a hard requirement and should be provided");
if (serializer == null) {
logger.warn(
"The default XStreamSerializer is used, whereas it is strongly recommended to configure"
+ " the security context of the XStream instance.",
new AxonConfigurationException(
"A default XStreamSerializer is used, without specifying the security context"
)
);
serializer = () -> XStreamSerializer.builder()
.xStream(new XStream(new CompactDriver()))
.build();
}
if (messageConverter == null) {
messageConverter = (KafkaMessageConverter<K, V>) DefaultKafkaMessageConverter.builder()
.serializer(serializer.get())
.build();
}
}
}
}