Skip to content
Permalink
Browse files
feat: make the stream ack deadline congruent with maxDurationPerAckEx…
…tension (#447)

* feat: allow to override the stream ack deadline

* set streamAckDeadline to be congruent with maxDurationPerAckExtension

* remove extra impot

* remove unused DEFAULT_STREAM_ACK_DEADLINE

* consider the case in which maxDurationPerAckExtension is not set

* add test

* test fix

Co-authored-by: Jaume Marhuenda-Beltran <jaumemarhuenda@google.com>
  • Loading branch information
beltran and beltran committed Nov 23, 2020
1 parent 366d95b commit c63dc255dff55982fca2e4cf0e955c47614dc291
@@ -16,13 +16,15 @@

package com.google.cloud.pubsub.v1;

import static com.google.cloud.pubsub.v1.Subscriber.DEFAULT_MAX_DURATION_PER_ACK_EXTENSION;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
@@ -62,10 +64,14 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private static final Logger logger =
Logger.getLogger(StreamingSubscriberConnection.class.getName());

@InternalApi static final Duration DEFAULT_STREAM_ACK_DEADLINE = Duration.ofSeconds(60);
@InternalApi static final Duration MAX_STREAM_ACK_DEADLINE = Duration.ofSeconds(600);
@InternalApi static final Duration MIN_STREAM_ACK_DEADLINE = Duration.ofSeconds(10);
private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100);
private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10);
private static final int MAX_PER_REQUEST_CHANGES = 1000;

private final Duration streamAckDeadline;
private final SubscriberStub stub;
private final int channelAffinity;
private final String subscription;
@@ -106,6 +112,15 @@ public StreamingSubscriberConnection(
ApiClock clock) {
this.subscription = subscription;
this.systemExecutor = systemExecutor;
if (maxDurationPerAckExtension.compareTo(DEFAULT_MAX_DURATION_PER_ACK_EXTENSION) == 0) {
this.streamAckDeadline = DEFAULT_STREAM_ACK_DEADLINE;
} else if (maxDurationPerAckExtension.compareTo(MIN_STREAM_ACK_DEADLINE) < 0) {
this.streamAckDeadline = MIN_STREAM_ACK_DEADLINE;
} else if (maxDurationPerAckExtension.compareTo(MAX_STREAM_ACK_DEADLINE) > 0) {
this.streamAckDeadline = MAX_STREAM_ACK_DEADLINE;
} else {
this.streamAckDeadline = maxDurationPerAckExtension;
}
this.stub = stub;
this.channelAffinity = channelAffinity;
this.messageDispatcher =
@@ -217,7 +232,7 @@ private void initialize() {
initClientStream.send(
StreamingPullRequest.newBuilder()
.setSubscription(subscription)
.setStreamAckDeadlineSeconds(60)
.setStreamAckDeadlineSeconds((int) streamAckDeadline.getSeconds())
.setClientId(clientId)
.setMaxOutstandingMessages(
this.useLegacyFlowControl
@@ -91,6 +91,7 @@
* details.
*/
public class Subscriber extends AbstractApiService implements SubscriberInterface {
@InternalApi static final Duration DEFAULT_MAX_DURATION_PER_ACK_EXTENSION = Duration.ofMillis(0);
private static final int THREADS_PER_CHANNEL = 5;
private static final int MAX_INBOUND_MESSAGE_SIZE =
20 * 1024 * 1024; // 20MB API maximum message size.
@@ -421,7 +422,7 @@ public static final class Builder {
private MessageReceiver receiver;

private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
private Duration maxDurationPerAckExtension = Duration.ofMillis(0);
private Duration maxDurationPerAckExtension = DEFAULT_MAX_DURATION_PER_ACK_EXTENSION;

private boolean useLegacyFlowControl = false;
private FlowControlSettings flowControlSettings =
@@ -51,6 +51,7 @@ class FakeSubscriberServiceImpl extends SubscriberImplBase {
private final AtomicInteger messageAckDeadline =
new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS);
private final AtomicInteger getSubscriptionCalled = new AtomicInteger();
private StreamingPullRequest lastSeenRequest;
private final List<Stream> openedStreams = new ArrayList<>();
private final List<Stream> closedStreams = new ArrayList<>();
private final List<String> acks = new ArrayList<>();
@@ -139,6 +140,7 @@ public void onNext(StreamingPullRequest request) {
subscriptionInitialized.notifyAll();
}
}
setLastSeenRequest(request);
addOpenedStream(stream);
stream.notifyAll();
}
@@ -292,6 +294,18 @@ private static void waitAtLeast(Collection<?> collection, int target)
}
}

public StreamingPullRequest getLastSeenRequest() {
synchronized (lastSeenRequest) {
return lastSeenRequest;
}
}

public void setLastSeenRequest(StreamingPullRequest lastSeenRequest) {
synchronized (lastSeenRequest) {
this.lastSeenRequest = lastSeenRequest;
}
}

private void addOpenedStream(Stream stream) {
synchronized (openedStreams) {
openedStreams.add(stream);
@@ -16,6 +16,10 @@

package com.google.cloud.pubsub.v1;

import static com.google.cloud.pubsub.v1.StreamingSubscriberConnection.DEFAULT_STREAM_ACK_DEADLINE;
import static com.google.cloud.pubsub.v1.StreamingSubscriberConnection.MAX_STREAM_ACK_DEADLINE;
import static com.google.cloud.pubsub.v1.StreamingSubscriberConnection.MIN_STREAM_ACK_DEADLINE;
import static com.google.cloud.pubsub.v1.Subscriber.DEFAULT_MAX_DURATION_PER_ACK_EXTENSION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

@@ -224,6 +228,67 @@ public ScheduledExecutorService getExecutor() {
}
}

@Test
public void testStreamAckDeadlineIsSetCorrectly() throws Exception {
int expectedChannelCount = 1;
// Deadline is smaller than the allowed streamAckDeadline.
int maxDurationPerAckExtension = 5;

Subscriber subscriber =
startSubscriber(
getTestSubscriberBuilder(testReceiver)
.setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension)));
assertEquals(
expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
assertEquals(
MIN_STREAM_ACK_DEADLINE.getSeconds(),
fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());

subscriber.stopAsync().awaitTerminated();

// Deadline is higher than the allowed streamAckDeadline.
maxDurationPerAckExtension = 700;
subscriber =
startSubscriber(
getTestSubscriberBuilder(testReceiver)
.setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension)));
assertEquals(
expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
assertEquals(
MAX_STREAM_ACK_DEADLINE.getSeconds(),
fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());

subscriber.stopAsync().awaitTerminated();

// Deadline is within the allowed limits for streamAckDeadline.
maxDurationPerAckExtension = 100;
subscriber =
startSubscriber(
getTestSubscriberBuilder(testReceiver)
.setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension)));
assertEquals(
expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
assertEquals(
maxDurationPerAckExtension,
fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());

subscriber.stopAsync().awaitTerminated();

// maxDurationPerAckExtension is unset.
maxDurationPerAckExtension = (int) DEFAULT_MAX_DURATION_PER_ACK_EXTENSION.getSeconds();
subscriber =
startSubscriber(
getTestSubscriberBuilder(testReceiver)
.setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension)));
assertEquals(
expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
assertEquals(
DEFAULT_STREAM_ACK_DEADLINE.getSeconds(),
fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());

subscriber.stopAsync().awaitTerminated();
}

private Subscriber startSubscriber(Builder testSubscriberBuilder) {
Subscriber subscriber = testSubscriberBuilder.build();
subscriber.startAsync().awaitRunning();

0 comments on commit c63dc25

Please sign in to comment.