From ca833dbd0befdddd2ec7297a055536416a47ca46 Mon Sep 17 00:00:00 2001 From: Steven van Beelen Date: Thu, 25 Nov 2021 15:10:32 +0100 Subject: [PATCH] Fix serializer issues Fix XStreamSerializer issues due to the Axon Framework 4.5.5 update --- .../StreamableKafkaMessageSource.java | 6 +++--- .../SubscribableKafkaMessageSource.java | 6 +++--- .../DefaultKafkaMessageConverterTest.java | 9 ++++---- .../KafkaTrackingTokenSerializationTest.java | 1 + .../KafkaPublisherIntegrationTest.java | 4 ++-- .../streamable => utils}/TestSerializer.java | 21 ++++++++++++------- 6 files changed, 26 insertions(+), 21 deletions(-) rename kafka/src/test/java/org/axonframework/extensions/kafka/{eventhandling/consumer/streamable => utils}/TestSerializer.java (79%) diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/StreamableKafkaMessageSource.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/StreamableKafkaMessageSource.java index 1040a864..a7ddc553 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/StreamableKafkaMessageSource.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/StreamableKafkaMessageSource.java @@ -155,9 +155,9 @@ public static class Builder { private Fetcher fetcher; @SuppressWarnings("unchecked") private KafkaMessageConverter messageConverter = - (KafkaMessageConverter) DefaultKafkaMessageConverter.builder().serializer( - XStreamSerializer.builder().build() - ).build(); + (KafkaMessageConverter) DefaultKafkaMessageConverter.builder() + .serializer(XStreamSerializer.defaultSerializer()) + .build(); private Supplier> bufferFactory = SortedKafkaMessageBuffer::new; /** diff --git a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/subscribable/SubscribableKafkaMessageSource.java b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/subscribable/SubscribableKafkaMessageSource.java index 30601a6d..cc33d9d9 100644 --- a/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/subscribable/SubscribableKafkaMessageSource.java +++ b/kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/subscribable/SubscribableKafkaMessageSource.java @@ -212,9 +212,9 @@ public static class Builder { private Fetcher> fetcher; @SuppressWarnings("unchecked") private KafkaMessageConverter messageConverter = - (KafkaMessageConverter) DefaultKafkaMessageConverter.builder().serializer( - XStreamSerializer.builder().build() - ).build(); + (KafkaMessageConverter) DefaultKafkaMessageConverter.builder() + .serializer(XStreamSerializer.defaultSerializer()) + .build(); private boolean autoStart = false; private int consumerCount = 1; diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java index 58e44969..a3dcf625 100644 --- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java +++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/DefaultKafkaMessageConverterTest.java @@ -25,14 +25,14 @@ import org.axonframework.eventhandling.DomainEventMessage; import org.axonframework.eventhandling.EventMessage; import org.axonframework.eventhandling.GenericDomainEventMessage; +import org.axonframework.extensions.kafka.utils.TestSerializer; import org.axonframework.messaging.MetaData; import org.axonframework.serialization.FixedValueRevisionResolver; import org.axonframework.serialization.SerializedObject; import org.axonframework.serialization.SimpleSerializedType; import org.axonframework.serialization.upcasting.event.EventUpcasterChain; import org.axonframework.serialization.xml.XStreamSerializer; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import java.util.concurrent.atomic.AtomicInteger; @@ -45,8 +45,7 @@ import static org.axonframework.extensions.kafka.eventhandling.util.HeaderAssertUtil.assertEventHeaders; import static org.axonframework.messaging.Headers.*; import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; /** * Tests for {@link DefaultKafkaMessageConverter}. @@ -207,7 +206,7 @@ void testWritingEventMessageShouldBeReadAsEventMessageAndPassUpcaster() { @Test void testWritingEventMessageWithNullRevisionShouldWriteRevisionAsNull() { testSubject = DefaultKafkaMessageConverter.builder() - .serializer(XStreamSerializer.builder().build()) + .serializer(TestSerializer.XSTREAM.getSerializer()) .build(); EventMessage eventMessage = eventMessage(); ProducerRecord senderMessage = testSubject.createKafkaMessage(eventMessage, SOME_TOPIC); diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/KafkaTrackingTokenSerializationTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/KafkaTrackingTokenSerializationTest.java index d5513700..857fb3ff 100644 --- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/KafkaTrackingTokenSerializationTest.java +++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/KafkaTrackingTokenSerializationTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.TopicPartition; import org.axonframework.eventhandling.ReplayToken; import org.axonframework.eventhandling.TrackingToken; +import org.axonframework.extensions.kafka.utils.TestSerializer; import org.junit.*; import org.junit.runner.*; import org.junit.runners.*; diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisherIntegrationTest.java b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisherIntegrationTest.java index 815691ab..fbb8573d 100644 --- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisherIntegrationTest.java +++ b/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/producer/KafkaPublisherIntegrationTest.java @@ -33,6 +33,7 @@ import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory; import org.axonframework.extensions.kafka.eventhandling.util.KafkaAdminUtils; import org.axonframework.extensions.kafka.eventhandling.util.KafkaContainerTest; +import org.axonframework.extensions.kafka.utils.TestSerializer; import org.axonframework.messaging.EventPublicationFailedException; import org.axonframework.messaging.Message; import org.axonframework.messaging.MetaData; @@ -40,7 +41,6 @@ import org.axonframework.messaging.unitofwork.DefaultUnitOfWork; import org.axonframework.messaging.unitofwork.UnitOfWork; import org.axonframework.monitoring.MessageMonitor; -import org.axonframework.serialization.xml.XStreamSerializer; import org.junit.jupiter.api.*; import java.util.Collections; @@ -383,7 +383,7 @@ void testConfiguringInvalidAckTimeout() { private KafkaPublisher buildPublisher(String topic) { DefaultKafkaMessageConverter messageConverter = DefaultKafkaMessageConverter.builder() - .serializer(XStreamSerializer.builder().build()) + .serializer(TestSerializer.XSTREAM.getSerializer()) .build(); KafkaPublisher kafkaPublisher = KafkaPublisher.builder() .producerFactory(testProducerFactory) diff --git a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/TestSerializer.java b/kafka/src/test/java/org/axonframework/extensions/kafka/utils/TestSerializer.java similarity index 79% rename from kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/TestSerializer.java rename to kafka/src/test/java/org/axonframework/extensions/kafka/utils/TestSerializer.java index 15b39bc1..e4802662 100644 --- a/kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/streamable/TestSerializer.java +++ b/kafka/src/test/java/org/axonframework/extensions/kafka/utils/TestSerializer.java @@ -14,14 +14,16 @@ * limitations under the License. */ -package org.axonframework.extensions.kafka.eventhandling.consumer.streamable; +package org.axonframework.extensions.kafka.utils; +import com.thoughtworks.xstream.XStream; import org.axonframework.serialization.JavaSerializer; import org.axonframework.serialization.SerializedObject; import org.axonframework.serialization.Serializer; import org.axonframework.serialization.SimpleSerializedObject; import org.axonframework.serialization.SimpleSerializedType; import org.axonframework.serialization.json.JacksonSerializer; +import org.axonframework.serialization.xml.CompactDriver; import org.axonframework.serialization.xml.XStreamSerializer; import org.junit.*; @@ -47,12 +49,12 @@ public Serializer getSerializer() { } @Override - protected String serialize(Object object) { + public String serialize(Object object) { return Base64.getEncoder().encodeToString(getSerializer().serialize(object, byte[].class).getData()); } @Override - protected T deserialize(String serialized, Class type) { + public T deserialize(String serialized, Class type) { return getSerializer().deserialize(asSerializedData(Base64.getDecoder().decode(serialized), type)); } }, @@ -60,9 +62,12 @@ protected T deserialize(String serialized, Class type) { private final Serializer serializer = createSerializer(); private XStreamSerializer createSerializer() { - XStreamSerializer xStreamSerializer = XStreamSerializer.builder().build(); - xStreamSerializer.getXStream().setClassLoader(this.getClass().getClassLoader()); - return xStreamSerializer; + XStream xStream = new XStream(new CompactDriver()); + xStream.allowTypesByWildcard(new String[]{"org.apache.kafka.**"}); + return XStreamSerializer.builder() + .xStream(xStream) + .classLoader(this.getClass().getClassLoader()) + .build(); } @Override @@ -79,11 +84,11 @@ public Serializer getSerializer() { } }; - protected String serialize(Object object) { + public String serialize(Object object) { return new String(getSerializer().serialize(object, byte[].class).getData()); } - protected T deserialize(String serialized, Class type) { + public T deserialize(String serialized, Class type) { return getSerializer().deserialize(asSerializedData(serialized.getBytes(), type)); }