Skip to content

Commit

Permalink
Merge pull request #210 from AxonIQ/feature/aggregate-event-timeout
Browse files Browse the repository at this point in the history
Throw TimeoutException when loading next even for aggregate takes too long
  • Loading branch information
CodeDrivenMitch committed Jul 11, 2022
2 parents 3c180cd + 230cbda commit 9269897
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import io.axoniq.axonserver.connector.event.AggregateEventStream;
import io.axoniq.axonserver.connector.impl.FlowControlledBuffer;
import io.axoniq.axonserver.connector.impl.StreamClosedException;
import io.axoniq.axonserver.connector.impl.StreamTimeoutException;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.GetAggregateEventsRequest;

import java.util.concurrent.TimeUnit;

/**
* Buffering implementation of the {@link AggregateEventStream} used for Event Sourced Aggregates.
*/
Expand All @@ -31,12 +34,16 @@ public class BufferedAggregateEventStream
implements AggregateEventStream {

private static final Event TERMINAL_MESSAGE = Event.newBuilder().setAggregateSequenceNumber(-1729).build();
private static final int TAKE_TIMEOUT_MILLIS = Integer.parseInt(
System.getProperty("AGGREGATE_TAKE_EVENT_TIMEOUT_MILLIS", "10000")
);

private Event peeked;
private long lastSequenceNumber = -1;

/**
* Constructs a {@link BufferedAggregateEventStream} with a buffer size of {@code 512} and a {@code refillBatch}
* of 16.
* Constructs a {@link BufferedAggregateEventStream} with a buffer size of {@code 512} and a {@code refillBatch} of
* 16.
*/
public BufferedAggregateEventStream() {
this(512, 16);
Expand All @@ -60,6 +67,9 @@ public Event next() throws InterruptedException {
} else {
taken = take();
}
if (taken != null) {
lastSequenceNumber = taken.getAggregateSequenceNumber();
}
return taken;
}

Expand All @@ -69,11 +79,16 @@ public boolean hasNext() {
return true;
}
try {
peeked = tryTake();
peeked = tryTake(TAKE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, true);
} catch (InterruptedException e) {
cancel();
Thread.currentThread().interrupt();
return false;
} catch (StreamTimeoutException e) {
throw new RuntimeException(String.format(
"Was unable to load aggregate due to timeout while waiting for events. Last sequence number received: %d",
lastSequenceNumber
), e);
}
if (peeked == null) {
Throwable errorResult = getErrorResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.axoniq.axonserver.connector.ErrorCategory;
import io.grpc.stub.ClientCallStreamObserver;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -56,20 +57,28 @@ public FlowControlledBuffer(String clientId, int bufferSize, int refillBatch) {

@Override
public void onNext(T value) {
Objects.requireNonNull(value, "Next value of buffer is not allowed to be null");
buffer.offer(value);
}

@Override
public void onError(Throwable t) {
errorResult.set(t);
buffer.offer(terminalMessage());
buffer.offer(getAndValidateTerminalMessage());
}

@Override
public void onCompleted() {
buffer.offer(terminalMessage());
buffer.offer(getAndValidateTerminalMessage());
}

private T getAndValidateTerminalMessage() {
T message = terminalMessage();
Objects.requireNonNull(message, "Result of terminalMessage is not allowed to be null");
return message;
}


public void close() {
errorResult.set(new AxonServerException(ErrorCategory.OTHER, "Stream closed on client request", ""));
}
Expand Down Expand Up @@ -111,11 +120,31 @@ protected T tryTakeNow() {
*
* @param timeout the duration to wait for an entry to become available in the buffer
* @param timeUnit the {@link TimeUnit} used to specify the duration together with the {@code timeout}
* @return an entry of type {@code T} from this buffer if present, otherwise {@code null}
* @return an entry of type {@code T} from this buffer if present, otherwise {@code null}. Timeouts will result in
* null as well.
* @throws InterruptedException while waiting for an entry to be taken
*/
protected T tryTake(long timeout, TimeUnit timeUnit) throws InterruptedException {
T taken = validate(buffer.poll(timeout, timeUnit), true);
return tryTake(timeout, timeUnit, false);
}

/**
* Try to retrieve an entry of type {@code T} from the buffer, waiting for the duration of {@code timeout} in the
* given {@code timeUnit}. If none is present, {@code null} will be returned.
*
* @param timeout the duration to wait for an entry to become available in the buffer
* @param timeUnit the {@link TimeUnit} used to specify the duration together with the {@code timeout}
* @param exceptionOnTimeout Whether a {@code TimeoutException} should be thrown when the operation times out
* @return an entry of type {@code T} from this buffer if present, otherwise {@code null}
* @throws InterruptedException while waiting for an entry to be taken
* @throws StreamTimeoutException If there is no message available after waiting the allotted period
*/
protected T tryTake(long timeout, TimeUnit timeUnit, boolean exceptionOnTimeout) throws InterruptedException {
T poll = buffer.poll(timeout, timeUnit);
if (poll == null && exceptionOnTimeout) {
throw new StreamTimeoutException("Timeout while trying to peek next event from the stream");
}
T taken = validate(poll, true);
if (taken != null) {
markConsumed();
}
Expand Down Expand Up @@ -162,7 +191,7 @@ private T validate(T peek, boolean nullOnTerminal) {
if (terminalMessage().equals(peek)) {
if (buffer.isEmpty()) {
// just to make sure there is always a TERMINAL entry left in a terminated buffer
buffer.offer(terminalMessage());
buffer.offer(getAndValidateTerminalMessage());
}
if (nullOnTerminal) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.axoniq.axonserver.connector.impl;

/**
* A {@link RuntimeException} to throw if reading from a stream results in a timeout.
*
* @author Mitchell Herrijgers
* @since 4.5.6
*/
public class StreamTimeoutException extends RuntimeException {

/**
* Constructs a new instance of {@link StreamTimeoutException}.
*/
public StreamTimeoutException() {
}

/**
* Constructs a new instance of {@link StreamTimeoutException} with a detailed description of the cause of the
* exception
*
* @param message the message describing the cause of the exception.
*/
public StreamTimeoutException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.GetAggregateEventsRequest;
import io.grpc.stub.ClientCallStreamObserver;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;

/**
* Test class validationg the {@link BufferedAggregateEventStream}.
Expand Down Expand Up @@ -66,4 +66,37 @@ void testEventStreamPropagatesErrorOnHasNextAfterReadingAvailableEvents() throws
assertThrows(StreamClosedException.class, () -> testSubject.hasNext());
}

}
@Test
void throwsExceptionOnTimeoutWhileRetrievingEvents() throws Exception {
reduceTimeout();

// Push messages
for (int i = 0; i < 20; i++) {
testSubject.onNext(Event.newBuilder().setAggregateSequenceNumber(i).build());
testSubject.hasNext();
testSubject.next();
}

// Now, wait while there is no message
RuntimeException exception = assertThrows(RuntimeException.class,
() -> testSubject.hasNext());
assertEquals(
"Was unable to load aggregate due to timeout while waiting for events. Last sequence number received: 19",
exception.getMessage());
}

/**
* Modify timeout to lower value. Otherwise, test will hang for 10 seconds waiting for the timeout. It's private and
* final, so we have to work around the modifiers as well.
*/
private void reduceTimeout() throws NoSuchFieldException, IllegalAccessException {
Field timeoutField = BufferedAggregateEventStream.class.getDeclaredField("TAKE_TIMEOUT_MILLIS");
timeoutField.setAccessible(true);

Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(timeoutField, timeoutField.getModifiers() & ~Modifier.FINAL);

timeoutField.setInt(testSubject, 100);
}
}

0 comments on commit 9269897

Please sign in to comment.