Skip to content

Commit

Permalink
Improve single entry upcaster
Browse files Browse the repository at this point in the history
  • Loading branch information
renedewaele committed Nov 7, 2016
1 parent 30b42cc commit 475a73f
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 78 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@ public abstract class AbstractSingleEntryUpcaster<T> implements Upcaster<T> {


@Override @Override
public Stream<T> upcast(Stream<T> intermediateRepresentations) { public Stream<T> upcast(Stream<T> intermediateRepresentations) {
return intermediateRepresentations.map(entry -> requireNonNull(doUpcast(entry), return intermediateRepresentations.map(entry -> {
() -> "Result from #doUpcast() should not be " + if (!canUpcast(entry)) {
"null. To remove an " + return entry;
"intermediateRepresentation add a " + }
"filter to the input stream.")); return requireNonNull(doUpcast(entry),
() -> "Result from #doUpcast() should not be null. To remove an " +
"intermediateRepresentation add a filter to the input stream.");
});
} }


protected abstract boolean canUpcast(T intermediateRepresentation);

protected abstract T doUpcast(T intermediateRepresentation); protected abstract T doUpcast(T intermediateRepresentation);
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -16,25 +16,22 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Stream; import java.util.stream.Stream;


import static java.util.stream.Collectors.toList;

/** /**
* @author Rene de Waele * @author Rene de Waele
*/ */
public class GenericUpcasterChain<T> implements Upcaster<T> { public class GenericUpcasterChain<T> implements Upcaster<T> {


private final List<Supplier<Upcaster<T>>> upcasterSuppliers; private final List<? extends Upcaster<T>> upcasters;


@SafeVarargs @SafeVarargs
public GenericUpcasterChain(Upcaster<T>... upcasters) { public GenericUpcasterChain(Upcaster<T>... upcasters) {
this(Arrays.stream(upcasters).map(upcaster -> (Supplier<Upcaster<T>>) () -> upcaster).collect(toList())); this(Arrays.asList(upcasters));
} }


public GenericUpcasterChain(List<Supplier<Upcaster<T>>> upcasterSuppliers) { public GenericUpcasterChain(List<? extends Upcaster<T>> upcasters) {
this.upcasterSuppliers = new ArrayList<>(upcasterSuppliers); this.upcasters = new ArrayList<>(upcasters);
} }


@Override @Override
Expand All @@ -46,8 +43,8 @@ public Stream<T> upcast(Stream<T> initialRepresentations) {
return result; return result;
} }


protected List<Upcaster<T>> getUpcasters() { protected List<? extends Upcaster<T>> getUpcasters() {
return upcasterSuppliers.stream().map(Supplier::get).collect(toList()); return upcasters;
} }


} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@
import org.axonframework.serialization.upcasting.Upcaster; import org.axonframework.serialization.upcasting.Upcaster;


import java.util.List; import java.util.List;
import java.util.function.Supplier;


/** /**
* @author Rene de Waele * @author Rene de Waele
*/ */
public class DefaultEventUpcasterChain extends GenericUpcasterChain<IntermediateEventRepresentation> implements EventUpcaster { public class EventUpcasterChain extends GenericUpcasterChain<IntermediateEventRepresentation> implements EventUpcaster {


@SafeVarargs @SafeVarargs
public DefaultEventUpcasterChain(Upcaster<IntermediateEventRepresentation>... upcasters) { public EventUpcasterChain(Upcaster<IntermediateEventRepresentation>... upcasters) {
super(upcasters); super(upcasters);
} }


public DefaultEventUpcasterChain(List<Supplier<Upcaster<IntermediateEventRepresentation>>> upcasterSuppliers) { public EventUpcasterChain(List<? extends Upcaster<IntermediateEventRepresentation>> upcasters) {
super(upcasterSuppliers); super(upcasters);
} }
} }
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


import org.axonframework.common.IdentifierFactory; import org.axonframework.common.IdentifierFactory;
import org.axonframework.serialization.Serializer; import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.upcasting.event.DefaultEventUpcasterChain; import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.upcasting.event.NoOpEventUpcaster; import org.axonframework.serialization.upcasting.event.NoOpEventUpcaster;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
Expand Down Expand Up @@ -56,7 +56,7 @@ public void testDomainEventStream_lastSequenceNumberEqualToLastProcessedEntryAft
Exception { Exception {
DomainEventStream eventStream = EventUtils DomainEventStream eventStream = EventUtils
.upcastAndDeserializeDomainEvents(Stream.of(createEntry(1), createEntry(2), createEntry(3)), serializer, .upcastAndDeserializeDomainEvents(Stream.of(createEntry(1), createEntry(2), createEntry(3)), serializer,
new DefaultEventUpcasterChain(e -> e new EventUpcasterChain(e -> e
.filter(entry -> entry.getSequenceNumber().get() < 2L)), .filter(entry -> entry.getSequenceNumber().get() < 2L)),
false); false);
assertNull(eventStream.getLastSequenceNumber()); assertNull(eventStream.getLastSequenceNumber());
Expand All @@ -70,7 +70,7 @@ public void testDomainEventStream_lastSequenceNumberEqualToLastProcessedEntryAft
Exception { Exception {
DomainEventStream eventStream = EventUtils DomainEventStream eventStream = EventUtils
.upcastAndDeserializeDomainEvents(Stream.of(createEntry(1)), serializer, .upcastAndDeserializeDomainEvents(Stream.of(createEntry(1)), serializer,
new DefaultEventUpcasterChain(s -> s.filter(e -> false)), false); new EventUpcasterChain(s -> s.filter(e -> false)), false);
assertNull(eventStream.getLastSequenceNumber()); assertNull(eventStream.getLastSequenceNumber());
assertFalse(eventStream.hasNext()); assertFalse(eventStream.hasNext());
eventStream.forEachRemaining(Objects::requireNonNull); eventStream.forEachRemaining(Objects::requireNonNull);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@


import org.junit.Test; import org.junit.Test;


import java.util.*; import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Stream; import java.util.stream.Stream;


import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertEquals;
import static org.mockito.Mockito.*;


/** /**
* @author Rene de Waele * @author Rene de Waele
Expand Down Expand Up @@ -53,21 +54,6 @@ public void testUpcastingResultOfOtherUpcasterOnlyWorksIfUpcastersAreInCorrectOr
assertEquals(Arrays.asList(b, c, b, c), result.collect(toList())); assertEquals(Arrays.asList(b, c, b, c), result.collect(toList()));
} }


@Test
@SuppressWarnings("unchecked")
public void testNewUpcasterIsSuppliedForEachRoundOfUpcasting() {
Object a = "a", b = "b", c = "c";
Supplier<Upcaster<Object>> mockSupplier = mock(Supplier.class);
when(mockSupplier.get()).thenReturn(new AToBUpcaster(a, b));
Upcaster<Object> testSubject = new GenericUpcasterChain<>(Collections.singletonList(mockSupplier));
Stream<Object> result = testSubject.upcast(Stream.of(a, b, a, c));
assertEquals(Arrays.asList(b, b, b, c), result.collect(toList()));
verify(mockSupplier).get();
result = testSubject.upcast(Stream.of(a, b, a, c));
assertEquals(Arrays.asList(b, b, b, c), result.collect(toList()));
verify(mockSupplier, times(2)).get();
}

@Test @Test
public void testRemainderAddedAndUpcasted() { public void testRemainderAddedAndUpcasted() {
Object a = "a", b = "b", c = "c"; Object a = "a", b = "b", c = "c";
Expand Down Expand Up @@ -102,9 +88,14 @@ private AToBUpcaster(Object a, Object b) {
this.b = b; this.b = b;
} }


@Override
protected boolean canUpcast(Object intermediateRepresentation) {
return intermediateRepresentation == a;
}

@Override @Override
protected Object doUpcast(Object intermediateRepresentation) { protected Object doUpcast(Object intermediateRepresentation) {
return intermediateRepresentation == a ? b : intermediateRepresentation; return b;
} }
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -134,15 +134,17 @@ private StubEventUpcaster(String newNameValue) {
this.newNameValue = newNameValue; this.newNameValue = newNameValue;
} }


@Override
protected boolean canUpcast(IntermediateEventRepresentation intermediateRepresentation) {
return intermediateRepresentation.getOutputType().equals(targetType);
}

@Override @Override
protected IntermediateEventRepresentation doUpcast(IntermediateEventRepresentation ir) { protected IntermediateEventRepresentation doUpcast(IntermediateEventRepresentation ir) {
if (ir.getOutputType().equals(targetType)) { return ir.upcastPayload(new SimpleSerializedType(targetType.getName(), "1"), expectedType, doc -> {
return ir.upcastPayload(new SimpleSerializedType(targetType.getName(), "1"), expectedType, doc -> { doc.getRootElement().element("name").setText(newNameValue);
doc.getRootElement().element("name").setText(newNameValue); return doc;
return doc; });
});
}
return ir;
} }
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.axonframework.serialization.SerializedType; import org.axonframework.serialization.SerializedType;
import org.axonframework.serialization.SimpleSerializedType; import org.axonframework.serialization.SimpleSerializedType;
import org.axonframework.serialization.upcasting.event.AbstractSingleEventUpcaster; import org.axonframework.serialization.upcasting.event.AbstractSingleEventUpcaster;
import org.axonframework.serialization.upcasting.event.DefaultEventUpcasterChain; import org.axonframework.serialization.upcasting.event.EventUpcasterChain;
import org.axonframework.serialization.upcasting.event.IntermediateEventRepresentation; import org.axonframework.serialization.upcasting.event.IntermediateEventRepresentation;
import org.axonframework.serialization.xml.XStreamSerializer; import org.axonframework.serialization.xml.XStreamSerializer;
import org.dom4j.Document; import org.dom4j.Document;
Expand Down Expand Up @@ -61,8 +61,9 @@ public static void main(String[] args) throws IOException {


// initialize EventStorage to use our upcaster // initialize EventStorage to use our upcaster
JdbcEventStorageEngine storageEngine = JdbcEventStorageEngine storageEngine =
new JdbcEventStorageEngine(serializer, new DefaultEventUpcasterChain(new ToDoItemUpcaster()), null, new UnitOfWorkAwareConnectionProviderWrapper( new JdbcEventStorageEngine(serializer, new EventUpcasterChain(new ToDoItemUpcaster()), null,
new DataSourceConnectionProvider(dataSource))); new UnitOfWorkAwareConnectionProviderWrapper(
new DataSourceConnectionProvider(dataSource)));
storageEngine.createSchema(HsqlEventTableFactory.INSTANCE); storageEngine.createSchema(HsqlEventTableFactory.INSTANCE);


// create an EmdeddedEventStore (we don't want to run a separate server) // create an EmdeddedEventStore (we don't want to run a separate server)
Expand All @@ -71,10 +72,12 @@ public static void main(String[] args) throws IOException {


// we append some events. Notice we append a "ToDoItemCreatedEvent". // we append some events. Notice we append a "ToDoItemCreatedEvent".
eventStore.publish(new GenericDomainEventMessage<>("type", "todo1", 0, new ToDoItemCreatedEvent("todo1", eventStore.publish(new GenericDomainEventMessage<>("type", "todo1", 0, new ToDoItemCreatedEvent("todo1",
"I need to do this today")), "I need to do" +
" this today")),
new GenericDomainEventMessage<>("type", "todo1", 1, new ToDoItemCompletedEvent("todo1"))); new GenericDomainEventMessage<>("type", "todo1", 1, new ToDoItemCompletedEvent("todo1")));
eventStore.publish(new GenericDomainEventMessage<>("type", "todo2", 0, new ToDoItemCreatedEvent("todo2", eventStore.publish(new GenericDomainEventMessage<>("type", "todo2", 0, new ToDoItemCreatedEvent("todo2",
"I also need to do this"))); "I also need " +
"to do this")));


// now, we read the events from the "todo1" stream // now, we read the events from the "todo1" stream
DomainEventStream eventStream = eventStore.readEvents("todo1"); DomainEventStream eventStream = eventStore.readEvents("todo1");
Expand All @@ -92,33 +95,34 @@ public static void main(String[] args) throws IOException {
public static class ToDoItemUpcaster extends AbstractSingleEventUpcaster { public static class ToDoItemUpcaster extends AbstractSingleEventUpcaster {


@Override @Override
protected IntermediateEventRepresentation doUpcast(IntermediateEventRepresentation intermediateRepresentation) { protected boolean canUpcast(IntermediateEventRepresentation intermediateRepresentation) {
// we can upcast the object if it's type name is the fully qualified class name of the ToDoItemCreatedEvent. // we can upcast the object if it's type name is the fully qualified class name of the ToDoItemCreatedEvent.
// normally, you would also want to check the revision // normally, you would also want to check the revision
if (intermediateRepresentation.getOutputType().getName().equals(ToDoItemCreatedEvent.class.getName())) { return intermediateRepresentation.getOutputType().getName().equals(ToDoItemCreatedEvent.class.getName());
// we describe the refactoring that we have done. Since we want to simulate a new revision and need to }
// change the class name, we pass both details in the returned SerializedType.
SerializedType targetType = @Override
new SimpleSerializedType(NewToDoItemWithDeadlineCreatedEvent.class.getName(), "1.1"); protected IntermediateEventRepresentation doUpcast(IntermediateEventRepresentation intermediateRepresentation) {
Instant timestamp = intermediateRepresentation.getTimestamp(); // we describe the refactoring that we have done. Since we want to simulate a new revision and need to
// here, we convert the XML format of the old event to that of the new event. // change the class name, we pass both details in the returned SerializedType.
intermediateRepresentation = SerializedType targetType =
// we want to get the data as a Dom4J document. new SimpleSerializedType(NewToDoItemWithDeadlineCreatedEvent.class.getName(), "1.1");
// Axon will automatically convert the serialized form. Instant timestamp = intermediateRepresentation.getTimestamp();
intermediateRepresentation.upcastPayload(targetType, Document.class, oldDocument -> {
Element rootElement = oldDocument.getRootElement(); // here, we convert the XML format of the old event to that of the new event.
// change the name of the root element to reflect the changed class name // We want to get the data as a Dom4J document. Axon will automatically convert the serialized form.
rootElement.setName(NewToDoItemWithDeadlineCreatedEvent.class.getName()); return intermediateRepresentation.upcastPayload(targetType, Document.class, oldDocument -> {
// and add an element for the new "deadline" field Element rootElement = oldDocument.getRootElement();
rootElement.addElement("deadline") // change the name of the root element to reflect the changed class name
// we set the value of the field to the default value: rootElement.setName(NewToDoItemWithDeadlineCreatedEvent.class.getName());
// one day after the event was created // and add an element for the new "deadline" field
.setText(timestamp.plus(1, ChronoUnit.DAYS).toString()); rootElement.addElement("deadline")
// we return the modified Document // we set the value of the field to the default value:
return oldDocument; // one day after the event was created
}); .setText(timestamp.plus(1, ChronoUnit.DAYS).toString());
} // we return the modified Document
return intermediateRepresentation; return oldDocument;
});
} }
} }


Expand Down

0 comments on commit 475a73f

Please sign in to comment.