Skip to content

Commit

Permalink
Improvements on the FlowControlledBuffer timeout exception after revi…
Browse files Browse the repository at this point in the history
…ew comments
  • Loading branch information
CodeDrivenMitch committed Jul 8, 2022
1 parent f864d99 commit 230cbda
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import io.axoniq.axonserver.connector.event.AggregateEventStream;
import io.axoniq.axonserver.connector.impl.FlowControlledBuffer;
import io.axoniq.axonserver.connector.impl.StreamClosedException;
import io.axoniq.axonserver.connector.impl.StreamTimeoutException;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.GetAggregateEventsRequest;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Buffering implementation of the {@link AggregateEventStream} used for Event Sourced Aggregates.
Expand All @@ -34,8 +34,9 @@ public class BufferedAggregateEventStream
implements AggregateEventStream {

private static final Event TERMINAL_MESSAGE = Event.newBuilder().setAggregateSequenceNumber(-1729).build();
private static final int TIMEOUT_MILLIS = Integer.parseInt(System.getProperty("AGGREGATE_TAKE_EVENT_TIMEOUT_MILLIS",
"10000"));
private static final int TAKE_TIMEOUT_MILLIS = Integer.parseInt(
System.getProperty("AGGREGATE_TAKE_EVENT_TIMEOUT_MILLIS", "10000")
);

private Event peeked;
private long lastSequenceNumber = -1;
Expand Down Expand Up @@ -78,16 +79,16 @@ public boolean hasNext() {
return true;
}
try {
peeked = tryTake(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, true);
peeked = tryTake(TAKE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, true);
} catch (InterruptedException e) {
cancel();
Thread.currentThread().interrupt();
return false;
} catch (TimeoutException e) {
throw new RuntimeException(
String.format(
"Was unable to load aggregate due to timeout while waiting for events. Last sequence number received: %d",
lastSequenceNumber));
} catch (StreamTimeoutException e) {
throw new RuntimeException(String.format(
"Was unable to load aggregate due to timeout while waiting for events. Last sequence number received: %d",
lastSequenceNumber
), e);
}
if (peeked == null) {
Throwable errorResult = getErrorResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import io.axoniq.axonserver.connector.ErrorCategory;
import io.grpc.stub.ClientCallStreamObserver;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -46,9 +46,6 @@ public abstract class FlowControlledBuffer<T, R> extends FlowControlledStream<T,
*/
public FlowControlledBuffer(String clientId, int bufferSize, int refillBatch) {
super(clientId, bufferSize, refillBatch);
if (terminalMessage() == null) {
throw new IllegalStateException("Terminal message is not allowed to be null");
}
}

/**
Expand All @@ -60,23 +57,28 @@ public FlowControlledBuffer(String clientId, int bufferSize, int refillBatch) {

@Override
public void onNext(T value) {
if (value == null) {
throw new NullPointerException("Next value of buffer is not allowed to be null");
}
Objects.requireNonNull(value, "Next value of buffer is not allowed to be null");
buffer.offer(value);
}

@Override
public void onError(Throwable t) {
errorResult.set(t);
buffer.offer(terminalMessage());
buffer.offer(getAndValidateTerminalMessage());
}

@Override
public void onCompleted() {
buffer.offer(terminalMessage());
buffer.offer(getAndValidateTerminalMessage());
}

private T getAndValidateTerminalMessage() {
T message = terminalMessage();
Objects.requireNonNull(message, "Result of terminalMessage is not allowed to be null");
return message;
}


public void close() {
errorResult.set(new AxonServerException(ErrorCategory.OTHER, "Stream closed on client request", ""));
}
Expand Down Expand Up @@ -123,11 +125,7 @@ protected T tryTakeNow() {
* @throws InterruptedException while waiting for an entry to be taken
*/
protected T tryTake(long timeout, TimeUnit timeUnit) throws InterruptedException {
try {
return tryTake(timeout, timeUnit, false);
} catch (TimeoutException e) {
return null;
}
}

/**
Expand All @@ -138,14 +136,13 @@ protected T tryTake(long timeout, TimeUnit timeUnit) throws InterruptedException
* @param timeUnit the {@link TimeUnit} used to specify the duration together with the {@code timeout}
* @param exceptionOnTimeout Whether a {@code TimeoutException} should be thrown when the operation times out
* @return an entry of type {@code T} from this buffer if present, otherwise {@code null}
* @throws InterruptedException while waiting for an entry to be taken
* @throws TimeoutException If there is no message available after waiting the allotted period
* @throws InterruptedException while waiting for an entry to be taken
* @throws StreamTimeoutException If there is no message available after waiting the allotted period
*/
protected T tryTake(long timeout, TimeUnit timeUnit, boolean exceptionOnTimeout)
throws InterruptedException, TimeoutException {
protected T tryTake(long timeout, TimeUnit timeUnit, boolean exceptionOnTimeout) throws InterruptedException {
T poll = buffer.poll(timeout, timeUnit);
if (poll == null && exceptionOnTimeout) {
throw new TimeoutException("Timeout while trying to peek next event from the stream");
throw new StreamTimeoutException("Timeout while trying to peek next event from the stream");
}
T taken = validate(poll, true);
if (taken != null) {
Expand Down Expand Up @@ -194,7 +191,7 @@ private T validate(T peek, boolean nullOnTerminal) {
if (terminalMessage().equals(peek)) {
if (buffer.isEmpty()) {
// just to make sure there is always a TERMINAL entry left in a terminated buffer
buffer.offer(terminalMessage());
buffer.offer(getAndValidateTerminalMessage());
}
if (nullOnTerminal) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.axoniq.axonserver.connector.impl;

/**
* A {@link RuntimeException} to throw if reading from a stream results in a timeout.
*
* @author Mitchell Herrijgers
* @since 4.5.6
*/
public class StreamTimeoutException extends RuntimeException {

/**
* Constructs a new instance of {@link StreamTimeoutException}.
*/
public StreamTimeoutException() {
}

/**
* Constructs a new instance of {@link StreamTimeoutException} with a detailed description of the cause of the
* exception
*
* @param message the message describing the cause of the exception.
*/
public StreamTimeoutException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void throwsExceptionOnTimeoutWhileRetrievingEvents() throws Exception {
* final, so we have to work around the modifiers as well.
*/
private void reduceTimeout() throws NoSuchFieldException, IllegalAccessException {
Field timeoutField = BufferedAggregateEventStream.class.getDeclaredField("TIMEOUT_MILLIS");
Field timeoutField = BufferedAggregateEventStream.class.getDeclaredField("TAKE_TIMEOUT_MILLIS");
timeoutField.setAccessible(true);

Field modifiersField = Field.class.getDeclaredField("modifiers");
Expand Down

0 comments on commit 230cbda

Please sign in to comment.