Skip to content

Commit

Permalink
Fix serializer issues
Browse files Browse the repository at this point in the history
Fix XStreamSerializer issues due to the Axon Framework 4.5.5 update
  • Loading branch information
smcvb committed Nov 25, 2021
1 parent 82d42c6 commit ca833db
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 21 deletions.
Expand Up @@ -155,9 +155,9 @@ public static class Builder<K, V> {
private Fetcher<K, V, KafkaEventMessage> fetcher;
@SuppressWarnings("unchecked")
private KafkaMessageConverter<K, V> messageConverter =
(KafkaMessageConverter<K, V>) DefaultKafkaMessageConverter.builder().serializer(
XStreamSerializer.builder().build()
).build();
(KafkaMessageConverter<K, V>) DefaultKafkaMessageConverter.builder()
.serializer(XStreamSerializer.defaultSerializer())
.build();
private Supplier<Buffer<KafkaEventMessage>> bufferFactory = SortedKafkaMessageBuffer::new;

/**
Expand Down
Expand Up @@ -212,9 +212,9 @@ public static class Builder<K, V> {
private Fetcher<K, V, EventMessage<?>> fetcher;
@SuppressWarnings("unchecked")
private KafkaMessageConverter<K, V> messageConverter =
(KafkaMessageConverter<K, V>) DefaultKafkaMessageConverter.builder().serializer(
XStreamSerializer.builder().build()
).build();
(KafkaMessageConverter<K, V>) DefaultKafkaMessageConverter.builder()
.serializer(XStreamSerializer.defaultSerializer())
.build();
private boolean autoStart = false;
private int consumerCount = 1;

Expand Down
Expand Up @@ -25,14 +25,14 @@
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.extensions.kafka.utils.TestSerializer;
import org.axonframework.messaging.MetaData;
import org.axonframework.serialization.FixedValueRevisionResolver;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.SimpleSerializedType;
import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;

import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -45,8 +45,7 @@
import static org.axonframework.extensions.kafka.eventhandling.util.HeaderAssertUtil.assertEventHeaders;
import static org.axonframework.messaging.Headers.*;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

/**
* Tests for {@link DefaultKafkaMessageConverter}.
Expand Down Expand Up @@ -207,7 +206,7 @@ void testWritingEventMessageShouldBeReadAsEventMessageAndPassUpcaster() {
@Test
void testWritingEventMessageWithNullRevisionShouldWriteRevisionAsNull() {
testSubject = DefaultKafkaMessageConverter.builder()
.serializer(XStreamSerializer.builder().build())
.serializer(TestSerializer.XSTREAM.getSerializer())
.build();
EventMessage<?> eventMessage = eventMessage();
ProducerRecord<String, byte[]> senderMessage = testSubject.createKafkaMessage(eventMessage, SOME_TOPIC);
Expand Down
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.TopicPartition;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.extensions.kafka.utils.TestSerializer;
import org.junit.*;
import org.junit.runner.*;
import org.junit.runners.*;
Expand Down
Expand Up @@ -33,14 +33,14 @@
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.util.KafkaAdminUtils;
import org.axonframework.extensions.kafka.eventhandling.util.KafkaContainerTest;
import org.axonframework.extensions.kafka.utils.TestSerializer;
import org.axonframework.messaging.EventPublicationFailedException;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MetaData;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.DefaultUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.junit.jupiter.api.*;

import java.util.Collections;
Expand Down Expand Up @@ -383,7 +383,7 @@ void testConfiguringInvalidAckTimeout() {
private KafkaPublisher<String, byte[]> buildPublisher(String topic) {
DefaultKafkaMessageConverter messageConverter =
DefaultKafkaMessageConverter.builder()
.serializer(XStreamSerializer.builder().build())
.serializer(TestSerializer.XSTREAM.getSerializer())
.build();
KafkaPublisher<String, byte[]> kafkaPublisher = KafkaPublisher.<String, byte[]>builder()
.producerFactory(testProducerFactory)
Expand Down
Expand Up @@ -14,14 +14,16 @@
* limitations under the License.
*/

package org.axonframework.extensions.kafka.eventhandling.consumer.streamable;
package org.axonframework.extensions.kafka.utils;

import com.thoughtworks.xstream.XStream;
import org.axonframework.serialization.JavaSerializer;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.SimpleSerializedObject;
import org.axonframework.serialization.SimpleSerializedType;
import org.axonframework.serialization.json.JacksonSerializer;
import org.axonframework.serialization.xml.CompactDriver;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.junit.*;

Expand All @@ -47,22 +49,25 @@ public Serializer getSerializer() {
}

@Override
protected String serialize(Object object) {
public String serialize(Object object) {
return Base64.getEncoder().encodeToString(getSerializer().serialize(object, byte[].class).getData());
}

@Override
protected <T> T deserialize(String serialized, Class<T> type) {
public <T> T deserialize(String serialized, Class<T> type) {
return getSerializer().deserialize(asSerializedData(Base64.getDecoder().decode(serialized), type));
}
},
XSTREAM {
private final Serializer serializer = createSerializer();

private XStreamSerializer createSerializer() {
XStreamSerializer xStreamSerializer = XStreamSerializer.builder().build();
xStreamSerializer.getXStream().setClassLoader(this.getClass().getClassLoader());
return xStreamSerializer;
XStream xStream = new XStream(new CompactDriver());
xStream.allowTypesByWildcard(new String[]{"org.apache.kafka.**"});
return XStreamSerializer.builder()
.xStream(xStream)
.classLoader(this.getClass().getClassLoader())
.build();
}

@Override
Expand All @@ -79,11 +84,11 @@ public Serializer getSerializer() {
}
};

protected String serialize(Object object) {
public String serialize(Object object) {
return new String(getSerializer().serialize(object, byte[].class).getData());
}

protected <T> T deserialize(String serialized, Class<T> type) {
public <T> T deserialize(String serialized, Class<T> type) {
return getSerializer().deserialize(asSerializedData(serialized.getBytes(), type));
}

Expand Down

0 comments on commit ca833db

Please sign in to comment.