Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make the stream ack deadline congruent with maxDurationPerAckExtension #447

Merged
merged 7 commits into from Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,13 +16,15 @@

package com.google.cloud.pubsub.v1;

import static com.google.cloud.pubsub.v1.Subscriber.DEFAULT_MAX_DURATION_PER_ACK_EXTENSION;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
Expand Down Expand Up @@ -62,10 +64,14 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private static final Logger logger =
Logger.getLogger(StreamingSubscriberConnection.class.getName());

@InternalApi static final Duration DEFAULT_STREAM_ACK_DEADLINE = Duration.ofSeconds(60);
@InternalApi static final Duration MAX_STREAM_ACK_DEADLINE = Duration.ofSeconds(600);
@InternalApi static final Duration MIN_STREAM_ACK_DEADLINE = Duration.ofSeconds(10);
private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100);
private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10);
private static final int MAX_PER_REQUEST_CHANGES = 1000;

private final Duration streamAckDeadline;
private final SubscriberStub stub;
private final int channelAffinity;
private final String subscription;
Expand Down Expand Up @@ -106,6 +112,15 @@ public StreamingSubscriberConnection(
ApiClock clock) {
this.subscription = subscription;
this.systemExecutor = systemExecutor;
if (maxDurationPerAckExtension.compareTo(DEFAULT_MAX_DURATION_PER_ACK_EXTENSION) == 0) {
this.streamAckDeadline = DEFAULT_STREAM_ACK_DEADLINE;
} else if (maxDurationPerAckExtension.compareTo(MIN_STREAM_ACK_DEADLINE) < 0) {
this.streamAckDeadline = MIN_STREAM_ACK_DEADLINE;
} else if (maxDurationPerAckExtension.compareTo(MAX_STREAM_ACK_DEADLINE) > 0) {
this.streamAckDeadline = MAX_STREAM_ACK_DEADLINE;
} else {
this.streamAckDeadline = maxDurationPerAckExtension;
}
this.stub = stub;
this.channelAffinity = channelAffinity;
this.messageDispatcher =
Expand Down Expand Up @@ -217,7 +232,7 @@ private void initialize() {
initClientStream.send(
StreamingPullRequest.newBuilder()
.setSubscription(subscription)
.setStreamAckDeadlineSeconds(60)
.setStreamAckDeadlineSeconds((int) streamAckDeadline.getSeconds())
.setClientId(clientId)
.setMaxOutstandingMessages(
this.useLegacyFlowControl
Expand Down
Expand Up @@ -91,6 +91,7 @@
* details.
*/
public class Subscriber extends AbstractApiService implements SubscriberInterface {
@InternalApi static final Duration DEFAULT_MAX_DURATION_PER_ACK_EXTENSION = Duration.ofMillis(0);
private static final int THREADS_PER_CHANNEL = 5;
private static final int MAX_INBOUND_MESSAGE_SIZE =
20 * 1024 * 1024; // 20MB API maximum message size.
Expand Down Expand Up @@ -421,7 +422,7 @@ public static final class Builder {
private MessageReceiver receiver;

private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD;
private Duration maxDurationPerAckExtension = Duration.ofMillis(0);
private Duration maxDurationPerAckExtension = DEFAULT_MAX_DURATION_PER_ACK_EXTENSION;

private boolean useLegacyFlowControl = false;
private FlowControlSettings flowControlSettings =
Expand Down
Expand Up @@ -51,6 +51,7 @@ class FakeSubscriberServiceImpl extends SubscriberImplBase {
private final AtomicInteger messageAckDeadline =
new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS);
private final AtomicInteger getSubscriptionCalled = new AtomicInteger();
private StreamingPullRequest lastSeenRequest;
private final List<Stream> openedStreams = new ArrayList<>();
private final List<Stream> closedStreams = new ArrayList<>();
private final List<String> acks = new ArrayList<>();
Expand Down Expand Up @@ -139,6 +140,7 @@ public void onNext(StreamingPullRequest request) {
subscriptionInitialized.notifyAll();
}
}
setLastSeenRequest(request);
addOpenedStream(stream);
stream.notifyAll();
}
Expand Down Expand Up @@ -292,6 +294,18 @@ private static void waitAtLeast(Collection<?> collection, int target)
}
}

public StreamingPullRequest getLastSeenRequest() {
synchronized (lastSeenRequest) {
return lastSeenRequest;
}
}

public void setLastSeenRequest(StreamingPullRequest lastSeenRequest) {
synchronized (lastSeenRequest) {
this.lastSeenRequest = lastSeenRequest;
}
}

private void addOpenedStream(Stream stream) {
synchronized (openedStreams) {
openedStreams.add(stream);
Expand Down
Expand Up @@ -16,6 +16,10 @@

package com.google.cloud.pubsub.v1;

import static com.google.cloud.pubsub.v1.StreamingSubscriberConnection.DEFAULT_STREAM_ACK_DEADLINE;
import static com.google.cloud.pubsub.v1.StreamingSubscriberConnection.MAX_STREAM_ACK_DEADLINE;
import static com.google.cloud.pubsub.v1.StreamingSubscriberConnection.MIN_STREAM_ACK_DEADLINE;
import static com.google.cloud.pubsub.v1.Subscriber.DEFAULT_MAX_DURATION_PER_ACK_EXTENSION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -224,6 +228,67 @@ public ScheduledExecutorService getExecutor() {
}
}

@Test
public void testStreamAckDeadlineIsSetCorrectly() throws Exception {
int expectedChannelCount = 1;
// Deadline is smaller than the allowed streamAckDeadline.
int maxDurationPerAckExtension = 5;

Subscriber subscriber =
startSubscriber(
getTestSubscriberBuilder(testReceiver)
.setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension)));
assertEquals(
expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
assertEquals(
MIN_STREAM_ACK_DEADLINE.getSeconds(),
fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());

subscriber.stopAsync().awaitTerminated();

// Deadline is higher than the allowed streamAckDeadline.
maxDurationPerAckExtension = 700;
subscriber =
startSubscriber(
getTestSubscriberBuilder(testReceiver)
.setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension)));
assertEquals(
expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
assertEquals(
MAX_STREAM_ACK_DEADLINE.getSeconds(),
fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());

subscriber.stopAsync().awaitTerminated();

// Deadline is within the allowed limits for streamAckDeadline.
maxDurationPerAckExtension = 100;
subscriber =
startSubscriber(
getTestSubscriberBuilder(testReceiver)
.setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension)));
assertEquals(
expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
assertEquals(
maxDurationPerAckExtension,
fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());

subscriber.stopAsync().awaitTerminated();

// maxDurationPerAckExtension is unset.
maxDurationPerAckExtension = (int) DEFAULT_MAX_DURATION_PER_ACK_EXTENSION.getSeconds();
subscriber =
startSubscriber(
getTestSubscriberBuilder(testReceiver)
.setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension)));
assertEquals(
expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount));
assertEquals(
DEFAULT_STREAM_ACK_DEADLINE.getSeconds(),
fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds());

subscriber.stopAsync().awaitTerminated();
}

private Subscriber startSubscriber(Builder testSubscriberBuilder) {
Subscriber subscriber = testSubscriberBuilder.build();
subscriber.startAsync().awaitRunning();
Expand Down