Skip to content

Commit

Permalink
fix: Make connections start up asynchronously (#289)
Browse files Browse the repository at this point in the history
* fix: Make connections start up asynchronously so that publishers can initialize many connections at once.

* chore: deflake some tests

* fix: Deflake more

* fix: Deflake more, add helper for common operation.

* chore: Add header

* fix: Canonicalize paths earlier so they only call into the lookup library once.

* fix: deflake
  • Loading branch information
dpcollins-google committed Oct 12, 2020
1 parent 578bc73 commit 27b1fec
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsublite.cloudpubsub;

import static com.google.cloud.pubsublite.ProjectLookupUtils.toCanonical;

import com.google.api.gax.batching.BatchingSettings;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Constants;
Expand Down Expand Up @@ -124,7 +126,7 @@ Publisher instantiate() throws StatusException {

RoutingPublisherBuilder.Builder wireBuilder =
RoutingPublisherBuilder.newBuilder()
.setTopic(topicPath())
.setTopic(toCanonical(topicPath()))
.setPublisherBuilder(singlePartitionPublisherBuilder);

numPartitions().ifPresent(wireBuilder::setNumPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsublite.cloudpubsub;

import static com.google.cloud.pubsublite.ProjectLookupUtils.toCanonical;

import com.google.auto.value.AutoValue;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.MessageTransformer;
Expand Down Expand Up @@ -142,13 +144,15 @@ public SubscriberSettings build() throws StatusException {

@SuppressWarnings("CheckReturnValue")
Subscriber instantiate() throws StatusException {
SubscriptionPath canonicalPath = toCanonical(subscriptionPath());

SubscriberBuilder.Builder wireSubscriberBuilder = SubscriberBuilder.newBuilder();
wireSubscriberBuilder.setSubscriptionPath(subscriptionPath());
wireSubscriberBuilder.setSubscriptionPath(canonicalPath);
subscriberServiceStub().ifPresent(wireSubscriberBuilder::setSubscriberServiceStub);
wireSubscriberBuilder.setContext(PubsubContext.of(FRAMEWORK));

CommitterBuilder.Builder wireCommitterBuilder = CommitterBuilder.newBuilder();
wireCommitterBuilder.setSubscriptionPath(subscriptionPath());
wireCommitterBuilder.setSubscriptionPath(canonicalPath);
cursorServiceStub().ifPresent(wireCommitterBuilder::setCursorStub);

PartitionSubscriberFactory partitionSubscriberFactory =
Expand All @@ -166,7 +170,7 @@ Subscriber instantiate() throws StatusException {

if (!partitions().isPresent()) {
AssignerBuilder.Builder assignerBuilder = AssignerBuilder.newBuilder();
assignerBuilder.setSubscriptionPath(subscriptionPath());
assignerBuilder.setSubscriptionPath(canonicalPath);
assignmentServiceStub().ifPresent(assignerBuilder::setAssignmentStub);
AssignerFactory assignerFactory =
receiver -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ class RetryingConnectionImpl<

@Override
protected void doStart() {
reinitialize();
notifyStarted();
this.systemExecutor.execute(
() -> {
reinitialize();
notifyStarted();
});
}

// Reinitialize the stream. Must be called in a downcall to prevent deadlock.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.wire.RetryingConnectionHelpers.whenFailed;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -46,6 +47,7 @@
import io.grpc.testing.GrpcCleanupRule;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Future;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -130,8 +132,10 @@ public void delivery_CallsReceiver() throws StatusException {
}

@Test
public void responseObserverFailure_Fails() {
public void responseObserverFailure_Fails() throws Exception {
Future<Void> failed = whenFailed(permanentErrorHandler);
leakedResponseObserver.onError(Status.INVALID_ARGUMENT.asException());
failed.get();
verify(permanentErrorHandler)
.failed(any(), argThat(new StatusExceptionMatcher(Code.INVALID_ARGUMENT)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.StatusExceptionMatcher.assertFutureThrowsCode;
import static com.google.cloud.pubsublite.internal.wire.RetryingConnectionHelpers.whenFailed;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
Expand Down Expand Up @@ -149,9 +150,11 @@ public void stopWaitsForCommit() throws Exception {
}

@Test
public void responseMoreThanSentError() {
public void responseMoreThanSentError() throws Exception {
Future<Void> failed = whenFailed(permanentErrorHandler);
ApiFuture<Void> future = committer.commitOffset(Offset.of(10));
leakedResponseObserver.onNext(ResponseWithCount(2));
failed.get();
verify(permanentErrorHandler)
.failed(any(), argThat(new StatusExceptionMatcher(Code.FAILED_PRECONDITION)));
assertFutureThrowsCode(future, Code.FAILED_PRECONDITION);
Expand Down Expand Up @@ -184,16 +187,10 @@ public void multipleSentCompletedInOrder() {
@Test
public void stopInCommitCallback() throws Exception {
ApiFuture<Void> future = committer.commitOffset(Offset.of(10));
CountDownLatch latch = new CountDownLatch(1);
ExtractStatus.addFailureHandler(
future,
(error) -> {
committer.stopAsync();
latch.countDown();
});
Future<Void> failed = whenFailed(permanentErrorHandler);
leakedResponseObserver.onError(Status.FAILED_PRECONDITION.asException());
latch.await();
assertFutureThrowsCode(future, Code.FAILED_PRECONDITION);
failed.get();
verify(permanentErrorHandler)
.failed(any(), argThat(new StatusExceptionMatcher(Code.FAILED_PRECONDITION)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.wire.RetryingConnectionHelpers.whenFailed;
import static com.google.common.truth.Truth.assertThat;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -52,7 +53,6 @@
import io.grpc.testing.GrpcCleanupRule;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.junit.Before;
Expand Down Expand Up @@ -83,7 +83,7 @@ public class PublisherImplTest {
private final BatchPublisher mockBatchPublisher = mock(BatchPublisher.class);
private final BatchPublisherFactory mockPublisherFactory = mock(BatchPublisherFactory.class);
private final Listener permanentErrorHandler = mock(Listener.class);
private final CountDownLatch errorOccurredLatch = new CountDownLatch(1);
private Future<Void> errorOccurredFuture;

private PublisherImpl publisher;
private StreamObserver<Offset> leakedOffsetStream;
Expand All @@ -97,14 +97,7 @@ public void setUp() throws StatusException {
})
.when(mockPublisherFactory)
.New(any(), any(), eq(INITIAL_PUBLISH_REQUEST));
doAnswer(
(Answer<Void>)
args -> {
errorOccurredLatch.countDown();
return null;
})
.when(permanentErrorHandler)
.failed(any(), any());
errorOccurredFuture = whenFailed(permanentErrorHandler);
ManagedChannel channel =
grpcCleanup.register(
InProcessChannelBuilder.forName("localhost:12345").directExecutor().build());
Expand Down Expand Up @@ -191,7 +184,7 @@ public void publishAfterError_IsError() throws Exception {
startPublisher();
leakedOffsetStream.onError(Status.PERMISSION_DENIED.asRuntimeException());
assertThrows(IllegalStateException.class, publisher::awaitTerminated);
errorOccurredLatch.await();
errorOccurredFuture.get();
verify(permanentErrorHandler).failed(any(), ArgumentMatchers.<StatusRuntimeException>any());
Message message = Message.builder().build();
Future<Offset> future = publisher.publish(message);
Expand Down Expand Up @@ -320,7 +313,7 @@ public void invalidOffsetSequence_SetsPermanentException() throws Exception {
assertThrows(IllegalStateException.class, publisher::awaitTerminated);
assertThat(future3.isDone()).isTrue();
assertThrows(Exception.class, future3::get);
errorOccurredLatch.await();
errorOccurredFuture.get();
verify(permanentErrorHandler).failed(any(), any());

verifyNoMoreInteractions(mockBatchPublisher);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;

import com.google.api.core.ApiService.Listener;
import com.google.api.core.SettableApiFuture;
import java.util.concurrent.Future;

class RetryingConnectionHelpers {
static Future<Void> whenFailed(Listener mockListener) {
SettableApiFuture<Void> future = SettableApiFuture.create();
doAnswer(
args -> {
future.set(null);
return null;
})
.when(mockListener)
.failed(any(), any());
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.StatusExceptionMatcher.assertFutureThrowsCode;
import static com.google.cloud.pubsublite.internal.wire.RetryingConnectionHelpers.whenFailed;
import static com.google.common.truth.Truth.assertThat;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -61,6 +62,7 @@
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -215,14 +217,15 @@ public void messagesUnordered_IsError() {
}

@Test
public void messageBatchesOutOfOrder_IsError() {
public void messageBatchesOutOfOrder_IsError() throws Exception {
Future<Void> failed = whenFailed(permanentErrorHandler);
subscriber.allowFlow(bigFlowControlRequest());
ImmutableList<SequencedMessage> messages =
ImmutableList.of(
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 0));
leakedResponseObserver.onNext(Response.ofMessages(messages));
leakedResponseObserver.onNext(Response.ofMessages(messages));
assertThrows(IllegalStateException.class, subscriber::awaitTerminated);
failed.get();
verify(permanentErrorHandler)
.failed(any(), argThat(new StatusExceptionMatcher(Code.FAILED_PRECONDITION)));
}
Expand Down

0 comments on commit 27b1fec

Please sign in to comment.