From 07079713701ec416fea6d47c898badafb4ebb45d Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Tue, 24 Jan 2017 15:37:43 +1100 Subject: [PATCH 1/2] fix race in mock publisher FakePublisherServiceImpl::publish has a race. If the call to publish happen before a response can be placed, the server will try to respond with a null object. The fix is to either - always set the response before calling - make publish wait for the response For propriety, this commit does both. This fix reveals another flake. Publisher uses exponential backoff with jitter. The jitter randomly picks a number between 0 and a maximum. If we pick low values too many times, it will retry too often and the server will run out of canned transient errors to respond back with. The test still passed since it expected any Throwable. This commit fixed the test to expect FakeException, set the jitter to random in range (max/2, max), and increases the number of canned errors to compensate. Retrying can still causes random test failures, independently of above changes. If a request fails due to DEADLINE_EXCEEDED, the future is completed with a corresponding error. However, the last RPC might not have been successfully cancelled. When a new test starts, it gives canned response to the server. The server might use some of these responses to respond to RPCs of previous tests. Consequently, a misbehaving test can fail every test that comes after it. This commit changes the test setup code so that it creates a new fake server for every test to avoid this problem. --- .../google/cloud/pubsub/spi/v1/Publisher.java | 2 +- .../spi/v1/FakePublisherServiceImpl.java | 78 ++++++++----------- .../pubsub/spi/v1/PublisherImplTest.java | 43 +++++----- 3 files changed, 54 insertions(+), 69 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index 39941ae878fa..307c9b73fb86 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -501,7 +501,7 @@ private static long computeNextBackoffDelayMs( * Math.pow(retrySettings.getRetryDelayMultiplier(), outstandingBundle.attempt - 1)); delayMillis = Math.min(retrySettings.getMaxRetryDelay().getMillis(), delayMillis); outstandingBundle.attempt++; - return ThreadLocalRandom.current().nextLong(0, delayMillis); + return ThreadLocalRandom.current().nextLong(delayMillis / 2, delayMillis); } private boolean isRetryable(Throwable t) { diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakePublisherServiceImpl.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakePublisherServiceImpl.java index 18d180513518..162898fa7c07 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakePublisherServiceImpl.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/FakePublisherServiceImpl.java @@ -21,98 +21,84 @@ import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase; import io.grpc.stub.StreamObserver; -import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; /** - * A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a - * Cloud Pub/Sub Publisher. + * A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a Cloud + * Pub/Sub Publisher. */ class FakePublisherServiceImpl extends PublisherImplBase { - private final Queue publishResponses = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue publishResponses = new LinkedBlockingQueue<>(); - /** - * Class used to save the state of a possible response. - */ + /** Class used to save the state of a possible response. */ private static class Response { Optional publishResponse; Optional error; - + public Response(PublishResponse publishResponse) { this.publishResponse = Optional.of(publishResponse); this.error = Optional.absent(); } - + public Response(Throwable exception) { this.publishResponse = Optional.absent(); this.error = Optional.of(exception); } - + public PublishResponse getPublishResponse() { return publishResponse.get(); } - + public Throwable getError() { return error.get(); } - + boolean isError() { return error.isPresent(); } + + @Override + public String toString() { + if (isError()) { + return error.get().toString(); + } + return publishResponse.get().toString(); + } } @Override public void publish(PublishRequest request, StreamObserver responseObserver) { - Response response = null; - synchronized (publishResponses) { - response = publishResponses.poll(); - try { - if (response.isError()) { - responseObserver.onError(response.getError()); - return; - } - - responseObserver.onNext(response.getPublishResponse()); - responseObserver.onCompleted(); - } finally { - publishResponses.notifyAll(); - } + Response response; + try { + response = publishResponses.take(); + } catch (InterruptedException e) { + throw new IllegalArgumentException(e); + } + if (response.isError()) { + responseObserver.onError(response.getError()); + } else { + responseObserver.onNext(response.getPublishResponse()); + responseObserver.onCompleted(); } } public FakePublisherServiceImpl addPublishResponse(PublishResponse publishResponse) { - synchronized (publishResponses) { - publishResponses.add(new Response(publishResponse)); - } + publishResponses.add(new Response(publishResponse)); return this; } public FakePublisherServiceImpl addPublishResponse( PublishResponse.Builder publishResponseBuilder) { - addPublishResponse(publishResponseBuilder.build()); - return this; + return addPublishResponse(publishResponseBuilder.build()); } public FakePublisherServiceImpl addPublishError(Throwable error) { - synchronized (publishResponses) { - publishResponses.add(new Response(error)); - } + publishResponses.add(new Response(error)); return this; } public void reset() { - synchronized (publishResponses) { - publishResponses.clear(); - publishResponses.notifyAll(); - } - } - - public void waitForNoOutstandingResponses() throws InterruptedException { - synchronized (publishResponses) { - while (!publishResponses.isEmpty()) { - publishResponses.wait(); - } - } + publishResponses.clear(); } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java index e831763702af..bd7b96c715c0 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java @@ -44,9 +44,8 @@ import io.grpc.stub.StreamObserver; import java.util.concurrent.ExecutionException; import org.joda.time.Duration; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -64,7 +63,7 @@ public class PublisherImplTest { private static final ExecutorProvider SINGLE_THREAD_EXECUTOR = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build(); - private static InProcessChannelBuilder testChannelBuilder; + private InProcessChannelBuilder testChannelBuilder; @Captor private ArgumentCaptor requestCaptor; @@ -72,12 +71,14 @@ public class PublisherImplTest { private FakeCredentials testCredentials; - private static FakePublisherServiceImpl testPublisherServiceImpl; + private FakePublisherServiceImpl testPublisherServiceImpl; - private static ServerImpl testServer; + private ServerImpl testServer; - @BeforeClass - public static void setUpClass() throws Exception { + class FakeException extends Exception {} + + @Before + public void setUp() throws Exception { testPublisherServiceImpl = Mockito.spy(new FakePublisherServiceImpl()); InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("test-server"); @@ -86,10 +87,7 @@ public static void setUpClass() throws Exception { serverBuilder.addService(testPublisherServiceImpl); testServer = serverBuilder.build(); testServer.start(); - } - @Before - public void setUp() throws Exception { MockitoAnnotations.initMocks(this); testPublisherServiceImpl.reset(); Mockito.reset(testPublisherServiceImpl); @@ -97,8 +95,8 @@ public void setUp() throws Exception { testCredentials = new FakeCredentials(); } - @AfterClass - public static void tearDownClass() throws Exception { + @After + public void tearDown() throws Exception { testServer.shutdownNow().awaitTermination(); } @@ -272,11 +270,11 @@ public void testPublishFailureRetries() throws Exception { .build()) .build(); // To demonstrate that reaching duration will trigger publish - ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); - testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); + ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); + assertEquals("1", publishFuture1.get()); Mockito.verify(testPublisherServiceImpl, times(2)) @@ -284,7 +282,6 @@ public void testPublishFailureRetries() throws Exception { publisher.shutdown(); } - @Test(expected = Throwable.class) public void testPublishFailureRetries_exceededsRetryDuration() throws Exception { Publisher publisher = getTestPublisherBuilder() @@ -302,15 +299,18 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception .build()) .build(); // To demonstrate that reaching duration will trigger publish - ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); - - // With exponential backoff, starting at 5ms we should have no more than 11 retries - for (int i = 0; i < 11; ++i) { - testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing")); + // We use exponential backoff with randomness. 30 should be more than enough. + for (int i = 0; i < 30; ++i) { + testPublisherServiceImpl.addPublishError(new FakeException()); } + ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); try { publishFuture1.get(); + } catch (ExecutionException e) { + if (!(e.getCause() instanceof FakeException)) { + throw new IllegalStateException("unexpected exception", e); + } } finally { Mockito.verify(testPublisherServiceImpl, atLeast(10)) .publish(Mockito.any(), Mockito.>any()); @@ -336,9 +336,8 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce .build()) .build(); // To demonstrate that reaching duration will trigger publish - ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); - testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT)); + ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); try { publishFuture1.get(); From 866195744dc5024a099e2f0be5dda5039f160fed Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 25 Jan 2017 18:25:14 +1100 Subject: [PATCH 2/2] PR comments --- .../google/cloud/pubsub/spi/v1/Publisher.java | 28 +++++++++++++++++-- .../cloud/pubsub/spi/v1/Subscriber.java | 4 +-- .../pubsub/spi/v1/PublisherImplTest.java | 13 +++++++-- 3 files changed, 37 insertions(+), 8 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index 307c9b73fb86..377c93e99483 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -23,6 +23,7 @@ import com.google.api.gax.grpc.InstantiatingExecutorProvider; import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -125,6 +126,7 @@ public static long getApiMaxRequestBytes() { private final BundlingSettings bundlingSettings; private final RetrySettings retrySettings; + private final LongRandom longRandom; private final FlowController.Settings flowControlSettings; private final boolean failOnFlowControlLimits; @@ -151,6 +153,7 @@ private Publisher(Builder builder) throws IOException { this.bundlingSettings = builder.bundlingSettings; this.retrySettings = builder.retrySettings; + this.longRandom = builder.longRandom; flowControlSettings = builder.flowControlSettings; failOnFlowControlLimits = builder.failOnFlowControlLimits; @@ -380,7 +383,8 @@ public void onSuccess(PublishResponse result) { @Override public void onFailure(Throwable t) { - long nextBackoffDelay = computeNextBackoffDelayMs(outstandingBundle, retrySettings); + long nextBackoffDelay = + computeNextBackoffDelayMs(outstandingBundle, retrySettings, longRandom); if (!isRetryable(t) || System.currentTimeMillis() + nextBackoffDelay @@ -494,14 +498,14 @@ private boolean hasBundlingBytes() { } private static long computeNextBackoffDelayMs( - OutstandingBundle outstandingBundle, RetrySettings retrySettings) { + OutstandingBundle outstandingBundle, RetrySettings retrySettings, LongRandom longRandom) { long delayMillis = Math.round( retrySettings.getInitialRetryDelay().getMillis() * Math.pow(retrySettings.getRetryDelayMultiplier(), outstandingBundle.attempt - 1)); delayMillis = Math.min(retrySettings.getMaxRetryDelay().getMillis(), delayMillis); outstandingBundle.attempt++; - return ThreadLocalRandom.current().nextLong(delayMillis / 2, delayMillis); + return longRandom.nextLong(0, delayMillis); } private boolean isRetryable(Throwable t) { @@ -520,6 +524,10 @@ private boolean isRetryable(Throwable t) { } } + interface LongRandom { + long nextLong(long least, long bound); + } + /** A builder of {@link Publisher}s. */ public static final class Builder { static final Duration MIN_TOTAL_TIMEOUT = new Duration(10 * 1000); // 10 seconds @@ -547,6 +555,13 @@ public static final class Builder { .setRpcTimeoutMultiplier(2) .setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT) .build(); + static final LongRandom DEFAULT_LONG_RANDOM = + new LongRandom() { + @Override + public long nextLong(long least, long bound) { + return ThreadLocalRandom.current().nextLong(least, bound); + } + }; private static final int THREADS_PER_CPU = 5; static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = @@ -564,6 +579,7 @@ public static final class Builder { boolean failOnFlowControlLimits = false; RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; + LongRandom longRandom = DEFAULT_LONG_RANDOM; // Channels and credentials Optional userCredentials = Optional.absent(); @@ -659,6 +675,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) { return this; } + @VisibleForTesting + Builder setLongRandom(LongRandom longRandom) { + this.longRandom = Preconditions.checkNotNull(longRandom); + return this; + } + /** Gives the ability to set a custom executor to be used by the library. */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 862e62d47ed9..827fc3dbaa32 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -458,8 +458,8 @@ public Builder setFlowControlSettings(FlowController.Settings flowControlSetting /** * Set acknowledgement expiration padding. * - *

This is the time accounted before a message expiration is to happen, so the - * {@link Subscriber} is able to send an ack extension beforehand. + *

This is the time accounted before a message expiration is to happen, so the {@link + * Subscriber} is able to send an ack extension beforehand. * *

This padding duration is configurable so you can account for network latency. A reasonable * number must be provided so messages don't expire because of network latency between when the diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java index bd7b96c715c0..165079e584b9 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherImplTest.java @@ -299,8 +299,8 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception .build()) .build(); // To demonstrate that reaching duration will trigger publish - // We use exponential backoff with randomness. 30 should be more than enough. - for (int i = 0; i < 30; ++i) { + // With exponential backoff, starting at 5ms we should have no more than 11 retries + for (int i = 0; i < 11; ++i) { testPublisherServiceImpl.addPublishError(new FakeException()); } ListenableFuture publishFuture1 = sendTestMessage(publisher, "A"); @@ -604,6 +604,13 @@ private Builder getTestPublisherBuilder() { return Publisher.Builder.newBuilder(TEST_TOPIC) .setCredentials(testCredentials) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) - .setChannelBuilder(testChannelBuilder); + .setChannelBuilder(testChannelBuilder) + .setLongRandom( + new Publisher.LongRandom() { + @Override + public long nextLong(long least, long bound) { + return bound - 1; + } + }); } }