Skip to content

Commit

Permalink
Fix for missing upcaster
Browse files Browse the repository at this point in the history
Resolved high memory consumption in EventBuffer when event stream was closed by client
Simplified code for readEvents
  • Loading branch information
MGathier committed Jul 13, 2018
1 parent 0cf1f5d commit 46fb6fe
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 34 deletions.
Expand Up @@ -33,7 +33,16 @@
import org.axonframework.eventhandling.EventMessage; import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.DomainEventMessage; import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.GenericDomainEventMessage; import org.axonframework.eventsourcing.GenericDomainEventMessage;
import org.axonframework.eventsourcing.eventstore.*; import org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.AbstractEventStore;
import org.axonframework.eventsourcing.eventstore.DomainEventData;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStoreException;
import org.axonframework.eventsourcing.eventstore.EventUtils;
import org.axonframework.eventsourcing.eventstore.GlobalSequenceTrackingToken;
import org.axonframework.eventsourcing.eventstore.TrackedEventData;
import org.axonframework.eventsourcing.eventstore.TrackingEventStream;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork; import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.serialization.MessageSerializer; import org.axonframework.serialization.MessageSerializer;
import org.axonframework.serialization.SerializedObject; import org.axonframework.serialization.SerializedObject;
Expand All @@ -45,10 +54,10 @@


import java.time.Instant; import java.time.Instant;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;


import static org.axonframework.common.ObjectUtils.getOrDefault; import static org.axonframework.common.ObjectUtils.getOrDefault;
Expand Down Expand Up @@ -131,7 +140,7 @@ private AxonIQEventStorageEngine(Serializer serializer,
AxonHubConfiguration configuration, AxonHubConfiguration configuration,
AxonDBClient eventStoreClient) { AxonDBClient eventStoreClient) {
super(serializer, upcasterChain, null); super(serializer, upcasterChain, null);
this.upcasterChain = upcasterChain; this.upcasterChain = getOrDefault(upcasterChain, NoOpEventUpcaster.INSTANCE);
this.configuration = configuration; this.configuration = configuration;
this.eventStoreClient = eventStoreClient; this.eventStoreClient = eventStoreClient;
this.converter = new GrpcMetaDataConverter(serializer); this.converter = new GrpcMetaDataConverter(serializer);
Expand All @@ -143,7 +152,7 @@ private AxonIQEventStorageEngine(Serializer snapshotSerializer,
AxonHubConfiguration configuration, AxonHubConfiguration configuration,
AxonDBClient eventStoreClient) { AxonDBClient eventStoreClient) {
super(snapshotSerializer, upcasterChain, null, serializer, null); super(snapshotSerializer, upcasterChain, null, serializer, null);
this.upcasterChain = upcasterChain; this.upcasterChain = getOrDefault(upcasterChain, NoOpEventUpcaster.INSTANCE);
this.configuration = configuration; this.configuration = configuration;
this.eventStoreClient = eventStoreClient; this.eventStoreClient = eventStoreClient;
this.converter = new GrpcMetaDataConverter(serializer); this.converter = new GrpcMetaDataConverter(serializer);
Expand Down Expand Up @@ -279,24 +288,22 @@ public void onCompleted() {
@Override @Override
public DomainEventStream readEvents(String aggregateIdentifier) { public DomainEventStream readEvents(String aggregateIdentifier) {
Stream<? extends DomainEventData<?>> input = this.readEventData(aggregateIdentifier, ALLOW_SNAPSHOTS_MAGIC_VALUE); Stream<? extends DomainEventData<?>> input = this.readEventData(aggregateIdentifier, ALLOW_SNAPSHOTS_MAGIC_VALUE);
List<? extends DomainEventData<?>> eventDataList = input.collect(Collectors.toList()); return DomainEventStream.of(input.map(this::upcastAndDeserializeDomainEvent).filter(Objects::nonNull));
if( startsWithSnapshot(eventDataList)) {
DomainEventData<?> snapshot = eventDataList.remove(0);
DomainEventStream stream =
EventUtils.upcastAndDeserializeDomainEvents(Stream.of(snapshot), new GrpcMetaDataAwareSerializer(getSerializer()), upcasterChain, false);

return DomainEventStream.concat(stream,
EventUtils.upcastAndDeserializeDomainEvents(eventDataList.stream(), new GrpcMetaDataAwareSerializer(getEventSerializer()), this.upcasterChain, false));
}

return EventUtils.upcastAndDeserializeDomainEvents(eventDataList.stream(), new GrpcMetaDataAwareSerializer(this.getEventSerializer()), this.upcasterChain, false);
} }


private boolean startsWithSnapshot(List<? extends DomainEventData<?>> eventDataList) { private DomainEventMessage<?> upcastAndDeserializeDomainEvent(DomainEventData<?> domainEventData) {
if( eventDataList.isEmpty()) return false; DomainEventStream upcastedStream = EventUtils.upcastAndDeserializeDomainEvents(Stream.of(domainEventData),
new GrpcMetaDataAwareSerializer(
isSnapshot(
domainEventData) ? getSerializer() : getEventSerializer()),
upcasterChain,
false);
return upcastedStream.hasNext() ? upcastedStream.next() : null;
}


if( eventDataList.get(0) instanceof GrpcBackedDomainEventData) { private boolean isSnapshot(DomainEventData<?> domainEventData) {
GrpcBackedDomainEventData grpcBackedDomainEventData = (GrpcBackedDomainEventData)eventDataList.get(0); if( domainEventData instanceof GrpcBackedDomainEventData) {
GrpcBackedDomainEventData grpcBackedDomainEventData = (GrpcBackedDomainEventData)domainEventData;
return grpcBackedDomainEventData.isSnapshot(); return grpcBackedDomainEventData.isSnapshot();
} }
return false; return false;
Expand Down
33 changes: 18 additions & 15 deletions src/main/java/io/axoniq/axonhub/client/event/axon/EventBuffer.java
Expand Up @@ -55,7 +55,8 @@ public class EventBuffer implements TrackingEventStream {
private TrackedEventData<byte[]> peekData; private TrackedEventData<byte[]> peekData;
private TrackedEventMessage<?> peekEvent; private TrackedEventMessage<?> peekEvent;
private Consumer<EventBuffer> closeCallback; private Consumer<EventBuffer> closeCallback;
private RuntimeException exception; private volatile RuntimeException exception;
private volatile boolean closed;
private Consumer<Integer> consumeListener = i -> { private Consumer<Integer> consumeListener = i -> {
}; };


Expand Down Expand Up @@ -127,14 +128,14 @@ public Optional<TrackedEventMessage<?>> peek() {


@Override @Override
public boolean hasNextAvailable(int timeout, TimeUnit timeUnit) throws InterruptedException { public boolean hasNextAvailable(int timeout, TimeUnit timeUnit) throws InterruptedException {
if (exception != null) {
RuntimeException runtimeException = exception;
this.exception = null;
throw runtimeException;
}
long deadline = System.currentTimeMillis() + timeUnit.toMillis(timeout); long deadline = System.currentTimeMillis() + timeUnit.toMillis(timeout);
try { try {
while (peekEvent == null && !eventStream.hasNext() && System.currentTimeMillis() < deadline) { while (peekEvent == null && !eventStream.hasNext() && System.currentTimeMillis() < deadline) {
if (exception != null) {
RuntimeException runtimeException = exception;
this.exception = null;
throw runtimeException;
}
waitForData(deadline); waitForData(deadline);
} }
return peekEvent != null || eventStream.hasNext(); return peekEvent != null || eventStream.hasNext();
Expand All @@ -147,11 +148,6 @@ public boolean hasNextAvailable(int timeout, TimeUnit timeUnit) throws Interrupt


@Override @Override
public TrackedEventMessage<?> nextAvailable() throws InterruptedException { public TrackedEventMessage<?> nextAvailable() throws InterruptedException {
if (exception != null) {
RuntimeException runtimeException = exception;
this.exception = null;
throw runtimeException;
}
try { try {
hasNextAvailable(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); hasNextAvailable(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
return peekEvent == null ? eventStream.next() : peekEvent; return peekEvent == null ? eventStream.next() : peekEvent;
Expand All @@ -166,17 +162,24 @@ public TrackedEventMessage<?> nextAvailable() throws InterruptedException {


@Override @Override
public void close() { public void close() {
closed = true;
if (closeCallback != null) closeCallback.accept(this); if (closeCallback != null) closeCallback.accept(this);
events.clear();
} }


public void push(EventWithToken event) { public boolean push(EventWithToken event) {
if( closed) {
logger.debug("Received event while closed: {}", event.getToken());
return false;
}
try { try {
TrackingToken trackingToken = new GlobalSequenceTrackingToken(event.getToken()); TrackingToken trackingToken = new GlobalSequenceTrackingToken(event.getToken());
events.put(new TrackedDomainEventData<>(trackingToken, new GrpcBackedDomainEventData(event.getEvent()))); events.put(new TrackedDomainEventData<>(trackingToken, new GrpcBackedDomainEventData(event.getEvent())));
} catch (Exception e) { } catch (InterruptedException e) {
logger.info(e.getMessage()); closeCallback.accept(this);
throw new RuntimeException(e); return false;
} }
return true;
} }


public void fail(RuntimeException e) { public void fail(RuntimeException e) {
Expand Down

0 comments on commit 46fb6fe

Please sign in to comment.