Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix race in mock publisher #1560

Merged
merged 2 commits into from
Jan 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -125,6 +126,7 @@ public static long getApiMaxRequestBytes() {

private final BundlingSettings bundlingSettings;
private final RetrySettings retrySettings;
private final LongRandom longRandom;

private final FlowController.Settings flowControlSettings;
private final boolean failOnFlowControlLimits;
Expand All @@ -151,6 +153,7 @@ private Publisher(Builder builder) throws IOException {

this.bundlingSettings = builder.bundlingSettings;
this.retrySettings = builder.retrySettings;
this.longRandom = builder.longRandom;

flowControlSettings = builder.flowControlSettings;
failOnFlowControlLimits = builder.failOnFlowControlLimits;
Expand Down Expand Up @@ -380,7 +383,8 @@ public void onSuccess(PublishResponse result) {

@Override
public void onFailure(Throwable t) {
long nextBackoffDelay = computeNextBackoffDelayMs(outstandingBundle, retrySettings);
long nextBackoffDelay =
computeNextBackoffDelayMs(outstandingBundle, retrySettings, longRandom);

if (!isRetryable(t)
|| System.currentTimeMillis() + nextBackoffDelay
Expand Down Expand Up @@ -494,14 +498,14 @@ private boolean hasBundlingBytes() {
}

private static long computeNextBackoffDelayMs(
OutstandingBundle outstandingBundle, RetrySettings retrySettings) {
OutstandingBundle outstandingBundle, RetrySettings retrySettings, LongRandom longRandom) {
long delayMillis =
Math.round(
retrySettings.getInitialRetryDelay().getMillis()
* Math.pow(retrySettings.getRetryDelayMultiplier(), outstandingBundle.attempt - 1));
delayMillis = Math.min(retrySettings.getMaxRetryDelay().getMillis(), delayMillis);
outstandingBundle.attempt++;
return ThreadLocalRandom.current().nextLong(0, delayMillis);
return longRandom.nextLong(0, delayMillis);
}

private boolean isRetryable(Throwable t) {
Expand All @@ -520,6 +524,10 @@ private boolean isRetryable(Throwable t) {
}
}

interface LongRandom {
long nextLong(long least, long bound);
}

/** A builder of {@link Publisher}s. */
public static final class Builder {
static final Duration MIN_TOTAL_TIMEOUT = new Duration(10 * 1000); // 10 seconds
Expand Down Expand Up @@ -547,6 +555,13 @@ public static final class Builder {
.setRpcTimeoutMultiplier(2)
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
.build();
static final LongRandom DEFAULT_LONG_RANDOM =
new LongRandom() {
@Override
public long nextLong(long least, long bound) {
return ThreadLocalRandom.current().nextLong(least, bound);
}
};

private static final int THREADS_PER_CPU = 5;
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
Expand All @@ -564,6 +579,7 @@ public static final class Builder {
boolean failOnFlowControlLimits = false;

RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
LongRandom longRandom = DEFAULT_LONG_RANDOM;

// Channels and credentials
Optional<Credentials> userCredentials = Optional.absent();
Expand Down Expand Up @@ -659,6 +675,12 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
return this;
}

@VisibleForTesting
Builder setLongRandom(LongRandom longRandom) {
this.longRandom = Preconditions.checkNotNull(longRandom);
return this;
}

/** Gives the ability to set a custom executor to be used by the library. */
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = Preconditions.checkNotNull(executorProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,8 @@ public Builder setFlowControlSettings(FlowController.Settings flowControlSetting
/**
* Set acknowledgement expiration padding.
*
* <p>This is the time accounted before a message expiration is to happen, so the
* {@link Subscriber} is able to send an ack extension beforehand.
* <p>This is the time accounted before a message expiration is to happen, so the {@link
* Subscriber} is able to send an ack extension beforehand.
*
* <p>This padding duration is configurable so you can account for network latency. A reasonable
* number must be provided so messages don't expire because of network latency between when the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,98 +21,84 @@
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase;
import io.grpc.stub.StreamObserver;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a
* Cloud Pub/Sub Publisher.
* A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a Cloud
* Pub/Sub Publisher.
*/
class FakePublisherServiceImpl extends PublisherImplBase {

private final Queue<Response> publishResponses = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<Response> publishResponses = new LinkedBlockingQueue<>();

/**
* Class used to save the state of a possible response.
*/
/** Class used to save the state of a possible response. */
private static class Response {
Optional<PublishResponse> publishResponse;
Optional<Throwable> error;

public Response(PublishResponse publishResponse) {
this.publishResponse = Optional.of(publishResponse);
this.error = Optional.absent();
}

public Response(Throwable exception) {
this.publishResponse = Optional.absent();
this.error = Optional.of(exception);
}

public PublishResponse getPublishResponse() {
return publishResponse.get();
}

public Throwable getError() {
return error.get();
}

boolean isError() {
return error.isPresent();
}

@Override
public String toString() {
if (isError()) {
return error.get().toString();
}
return publishResponse.get().toString();
}
}

@Override
public void publish(PublishRequest request, StreamObserver<PublishResponse> responseObserver) {
Response response = null;
synchronized (publishResponses) {
response = publishResponses.poll();
try {
if (response.isError()) {
responseObserver.onError(response.getError());
return;
}

responseObserver.onNext(response.getPublishResponse());
responseObserver.onCompleted();
} finally {
publishResponses.notifyAll();
}
Response response;
try {
response = publishResponses.take();
} catch (InterruptedException e) {
throw new IllegalArgumentException(e);
}
if (response.isError()) {
responseObserver.onError(response.getError());
} else {
responseObserver.onNext(response.getPublishResponse());
responseObserver.onCompleted();
}
}

public FakePublisherServiceImpl addPublishResponse(PublishResponse publishResponse) {
synchronized (publishResponses) {
publishResponses.add(new Response(publishResponse));
}
publishResponses.add(new Response(publishResponse));
return this;
}

public FakePublisherServiceImpl addPublishResponse(
PublishResponse.Builder publishResponseBuilder) {
addPublishResponse(publishResponseBuilder.build());
return this;
return addPublishResponse(publishResponseBuilder.build());
}

public FakePublisherServiceImpl addPublishError(Throwable error) {
synchronized (publishResponses) {
publishResponses.add(new Response(error));
}
publishResponses.add(new Response(error));
return this;
}

public void reset() {
synchronized (publishResponses) {
publishResponses.clear();
publishResponses.notifyAll();
}
}

public void waitForNoOutstandingResponses() throws InterruptedException {
synchronized (publishResponses) {
while (!publishResponses.isEmpty()) {
publishResponses.wait();
}
}
publishResponses.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@
import io.grpc.stub.StreamObserver;
import java.util.concurrent.ExecutionException;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -64,20 +63,22 @@ public class PublisherImplTest {
private static final ExecutorProvider SINGLE_THREAD_EXECUTOR =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();

private static InProcessChannelBuilder testChannelBuilder;
private InProcessChannelBuilder testChannelBuilder;

@Captor private ArgumentCaptor<PublishRequest> requestCaptor;

private FakeScheduledExecutorService fakeExecutor;

private FakeCredentials testCredentials;

private static FakePublisherServiceImpl testPublisherServiceImpl;
private FakePublisherServiceImpl testPublisherServiceImpl;

private static ServerImpl testServer;
private ServerImpl testServer;

@BeforeClass
public static void setUpClass() throws Exception {
class FakeException extends Exception {}

@Before
public void setUp() throws Exception {
testPublisherServiceImpl = Mockito.spy(new FakePublisherServiceImpl());

InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("test-server");
Expand All @@ -86,19 +87,16 @@ public static void setUpClass() throws Exception {
serverBuilder.addService(testPublisherServiceImpl);
testServer = serverBuilder.build();
testServer.start();
}

@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
testPublisherServiceImpl.reset();
Mockito.reset(testPublisherServiceImpl);
fakeExecutor = new FakeScheduledExecutorService();
testCredentials = new FakeCredentials();
}

@AfterClass
public static void tearDownClass() throws Exception {
@After
public void tearDown() throws Exception {
testServer.shutdownNow().awaitTermination();
}

Expand Down Expand Up @@ -272,19 +270,18 @@ public void testPublishFailureRetries() throws Exception {
.build())
.build(); // To demonstrate that reaching duration will trigger publish

ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");

testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1"));

ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");

assertEquals("1", publishFuture1.get());

Mockito.verify(testPublisherServiceImpl, times(2))
.publish(Mockito.<PublishRequest>any(), Mockito.<StreamObserver<PublishResponse>>any());
publisher.shutdown();
}

@Test(expected = Throwable.class)
public void testPublishFailureRetries_exceededsRetryDuration() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
Expand All @@ -302,15 +299,18 @@ public void testPublishFailureRetries_exceededsRetryDuration() throws Exception
.build())
.build(); // To demonstrate that reaching duration will trigger publish

ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");

// With exponential backoff, starting at 5ms we should have no more than 11 retries
for (int i = 0; i < 11; ++i) {
testPublisherServiceImpl.addPublishError(new Throwable("Transiently failing"));
testPublisherServiceImpl.addPublishError(new FakeException());
}
ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");

try {
publishFuture1.get();
} catch (ExecutionException e) {
if (!(e.getCause() instanceof FakeException)) {
throw new IllegalStateException("unexpected exception", e);
}
} finally {
Mockito.verify(testPublisherServiceImpl, atLeast(10))
.publish(Mockito.<PublishRequest>any(), Mockito.<StreamObserver<PublishResponse>>any());
Expand All @@ -336,9 +336,8 @@ public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exce
.build())
.build(); // To demonstrate that reaching duration will trigger publish

ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");

testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT));
ListenableFuture<String> publishFuture1 = sendTestMessage(publisher, "A");

try {
publishFuture1.get();
Expand Down Expand Up @@ -605,6 +604,13 @@ private Builder getTestPublisherBuilder() {
return Publisher.Builder.newBuilder(TEST_TOPIC)
.setCredentials(testCredentials)
.setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))
.setChannelBuilder(testChannelBuilder);
.setChannelBuilder(testChannelBuilder)
.setLongRandom(
new Publisher.LongRandom() {
@Override
public long nextLong(long least, long bound) {
return bound - 1;
}
});
}
}