Skip to content

Commit

Permalink
feat: implement max duration per ack extension (#211)
Browse files Browse the repository at this point in the history
* feat: implement max duration per ack extension

* fix: update coverage

* fix: requested changes
  • Loading branch information
hannahrogers-google committed Jul 6, 2020
1 parent b101905 commit 1427b8c
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class MessageDispatcher {

private final Duration ackExpirationPadding;
private final Duration maxAckExtensionPeriod;
private final int maxSecondsPerAckExtension;
private final MessageReceiver receiver;
private final AckProcessor ackProcessor;

Expand Down Expand Up @@ -190,6 +191,7 @@ void sendAckOperations(
AckProcessor ackProcessor,
Duration ackExpirationPadding,
Duration maxAckExtensionPeriod,
Duration maxDurationPerAckExtension,
Distribution ackLatencyDistribution,
FlowController flowController,
Executor executor,
Expand All @@ -199,6 +201,7 @@ void sendAckOperations(
this.systemExecutor = systemExecutor;
this.ackExpirationPadding = ackExpirationPadding;
this.maxAckExtensionPeriod = maxAckExtensionPeriod;
this.maxSecondsPerAckExtension = Math.toIntExact(maxDurationPerAckExtension.getSeconds());
this.receiver = receiver;
this.ackProcessor = ackProcessor;
this.flowController = flowController;
Expand Down Expand Up @@ -407,6 +410,10 @@ public void run() {
int computeDeadlineSeconds() {
int sec = ackLatencyDistribution.getPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES);

if ((maxSecondsPerAckExtension > 0) && (sec > maxSecondsPerAckExtension)) {
sec = maxSecondsPerAckExtension;
}

// Use Ints.constrainToRange when we get guava 21.
if (sec < Subscriber.MIN_ACK_DEADLINE_SECONDS) {
sec = Subscriber.MIN_ACK_DEADLINE_SECONDS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public StreamingSubscriberConnection(
MessageReceiver receiver,
Duration ackExpirationPadding,
Duration maxAckExtensionPeriod,
Duration maxDurationPerAckExtension,
Distribution ackLatencyDistribution,
SubscriberStub stub,
int channelAffinity,
Expand All @@ -111,6 +112,7 @@ public StreamingSubscriberConnection(
this,
ackExpirationPadding,
maxAckExtensionPeriod,
maxDurationPerAckExtension,
ackLatencyDistribution,
flowController,
executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final Duration maxAckExtensionPeriod;
private final Duration maxDurationPerAckExtension;
// The ExecutorProvider used to generate executors for processing messages.
private final ExecutorProvider executorProvider;
// An instantiation of the SystemExecutorProvider used for processing acks
Expand All @@ -128,6 +129,7 @@ private Subscriber(Builder builder) {
subscriptionName = builder.subscriptionName;

maxAckExtensionPeriod = builder.maxAckExtensionPeriod;
maxDurationPerAckExtension = builder.maxDurationPerAckExtension;
clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock();

flowController =
Expand Down Expand Up @@ -329,6 +331,7 @@ private void startStreamingConnections() {
receiver,
ACK_EXPIRATION_PADDING,
maxAckExtensionPeriod,
maxDurationPerAckExtension,
ackLatencyDistribution,
subStub,
i,
Expand Down Expand Up @@ -415,6 +418,7 @@ public static final class Builder {
private MessageReceiver receiver;

private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
private Duration maxDurationPerAckExtension = Duration.ofMillis(0);

private FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
Expand Down Expand Up @@ -515,6 +519,22 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
return this;
}

/**
* Set the upper bound for a single mod ack extention period.
*
* <p>The ack deadline will continue to be extended by up to this duration until
* MaxAckExtensionPeriod is reached. Setting MaxDurationPerAckExtension bounds the maximum
* amount of time before a mesage re-delivery in the event the Subscriber fails to extend the
* deadline.
*
* <p>MaxDurationPerAckExtension configuration can be disabled by specifying a zero duration.
*/
public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) {
Preconditions.checkArgument(maxDurationPerAckExtension.toMillis() >= 0);
this.maxDurationPerAckExtension = maxDurationPerAckExtension;
return this;
}

/**
* Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be
* called {@link Builder#parallelPullCount} times.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void run() {
// No-op; don't do anything.
}
};
private static final int MAX_SECONDS_PER_ACK_EXTENSION = 60;

private MessageDispatcher dispatcher;
private LinkedBlockingQueue<AckReplyConsumer> consumers;
Expand Down Expand Up @@ -128,6 +129,7 @@ public void sendAckOperations(
processor,
Duration.ofSeconds(5),
Duration.ofMinutes(60),
Duration.ofSeconds(MAX_SECONDS_PER_ACK_EXTENSION),
new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1),
flowController,
MoreExecutors.directExecutor(),
Expand Down Expand Up @@ -235,4 +237,15 @@ public void testDeadlineAdjustment() throws Exception {

assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(42);
}

@Test
public void testMaxDurationPerAckExtension() throws Exception {
assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(10);

dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
clock.advance(MAX_SECONDS_PER_ACK_EXTENSION + 5, TimeUnit.SECONDS);
consumers.take().ack();

assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(MAX_SECONDS_PER_ACK_EXTENSION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.threeten.bp.Duration;

/** Tests for {@link Subscriber}. */
public class SubscriberTest {
Expand Down Expand Up @@ -236,6 +237,7 @@ private Builder getTestSubscriberBuilder(MessageReceiver receiver) {
FixedTransportChannelProvider.create(GrpcTransportChannel.create(testChannel)))
.setCredentialsProvider(NoCredentialsProvider.create())
.setClock(fakeExecutor.getClock())
.setParallelPullCount(1);
.setParallelPullCount(1)
.setMaxDurationPerAckExtension(Duration.ofSeconds(5));
}
}

0 comments on commit 1427b8c

Please sign in to comment.