From 573d55b1da0b4762ec890e0595f20ac8eba7ac0d Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Thu, 8 Oct 2015 19:25:13 -0400 Subject: [PATCH] Using a retry with back off for auth refreshing. The Credential retrieval has occassional issues under load. Dataflow, for example, will start a whole bunch of jobs that need to get credentials at the same time, and we've seen intermittent failures because of that issue; the retry should fix, or at least improve, those types of situations. Adding a unit test for retry logic in the auth interceptor. Updating the Test as per Kevin's comments. Adding more logging around credential refreshes and retries. Using an explicit start time for the tests in RefreshingOAuth2CredentialsInterceptorTest. --- .../cloud/bigtable/grpc/BigtableSession.java | 3 +- .../grpc/async/RetryingRpcFutureFallback.java | 16 +- .../cloud/bigtable/grpc/async/Sleeper.java | 36 ++++ ...efreshingOAuth2CredentialsInterceptor.java | 165 +++++++++++++----- .../async/RetryingRpcFutureFallbackTest.java | 2 +- ...shingOAuth2CredentialsInterceptorTest.java | 137 +++++++++++---- 6 files changed, 266 insertions(+), 93 deletions(-) create mode 100644 bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/Sleeper.java diff --git a/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/BigtableSession.java b/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/BigtableSession.java index 1b85918fc5..707ffc2b2d 100644 --- a/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/BigtableSession.java +++ b/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/BigtableSession.java @@ -302,7 +302,8 @@ private Future initializeCredentials(Future credentialsFuture if (credentials != null) { if (credentials instanceof OAuth2Credentials) { final RefreshingOAuth2CredentialsInterceptor oauth2Interceptor = - new RefreshingOAuth2CredentialsInterceptor(batchPool, (OAuth2Credentials) credentials); + new RefreshingOAuth2CredentialsInterceptor(batchPool, (OAuth2Credentials) credentials, + this.options.getRetryOptions()); credentialRefreshFuture = batchPool.submit(new Callable() { @Override public Void call() throws Exception { diff --git a/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/RetryingRpcFutureFallback.java b/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/RetryingRpcFutureFallback.java index 7ce09975c4..bd874e16b9 100644 --- a/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/RetryingRpcFutureFallback.java +++ b/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/RetryingRpcFutureFallback.java @@ -32,7 +32,7 @@ import com.google.common.util.concurrent.ListenableFuture; /** - * A FutureFallback that retries a RetryableRpc request. + * A {@link FutureFallback} that retries a {@link RetryableRpc} request. */ public class RetryingRpcFutureFallback implements FutureFallback { @@ -41,18 +41,6 @@ public static RetryingRpcFutureFallback(retryOptions, request, retryableRpc); } - @VisibleForTesting - interface Sleeper { - void sleep(long ms) throws InterruptedException; - } - - static Sleeper THREAD_SLEEPER = new Sleeper() { - @Override - public void sleep(long ms) throws InterruptedException { - Thread.sleep(ms); - } - }; - protected final Log LOG = LogFactory.getLog(RetryingRpcFutureFallback.class); private final RequestT request; @@ -60,7 +48,7 @@ public void sleep(long ms) throws InterruptedException { @VisibleForTesting BackOff currentBackoff; @VisibleForTesting - Sleeper sleeper = THREAD_SLEEPER; + Sleeper sleeper = Sleeper.DEFAULT; private final RetryableRpc retryableRpc; private final RetryOptions retryOptions; diff --git a/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/Sleeper.java b/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/Sleeper.java new file mode 100644 index 0000000000..10b0d13c44 --- /dev/null +++ b/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/Sleeper.java @@ -0,0 +1,36 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.grpc.async; + +/** + * An interface that wraps Thread.sleep for test purposes + */ +public interface Sleeper { + /** + * Usually a wrapper for Thread.sleep. Can be overridden for testing purposes. + */ + void sleep(long ms) throws InterruptedException; + + /** + * A Sleeper that uses {@link Thread#sleep()} + */ + public static Sleeper DEFAULT = new Sleeper() { + @Override + public void sleep(long ms) throws InterruptedException { + Thread.sleep(ms); + } + }; +} diff --git a/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/io/RefreshingOAuth2CredentialsInterceptor.java b/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/io/RefreshingOAuth2CredentialsInterceptor.java index 4e461dea85..ae8c7838cd 100644 --- a/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/io/RefreshingOAuth2CredentialsInterceptor.java +++ b/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/io/RefreshingOAuth2CredentialsInterceptor.java @@ -4,9 +4,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -28,45 +28,56 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; + +import com.google.api.client.util.BackOff; import com.google.api.client.util.Clock; import com.google.api.client.util.Preconditions; import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.OAuth2Credentials; +import com.google.cloud.bigtable.config.Logger; +import com.google.cloud.bigtable.config.RetryOptions; +import com.google.cloud.bigtable.grpc.async.Sleeper; import com.google.common.annotations.VisibleForTesting; /** * Client interceptor that authenticates all calls by binding header data provided by a credential. * Typically this will populate the Authorization header but other headers may also be filled out. - * - *

Uses the new and simplified Google auth library: - * https://github.com/google/google-auth-library-java

- * - *

TODO: COPIED FROM io.grpc.auth.ClientAuthInterceptor. The logic added here for - * initialization and locking could be moved back to gRPC. This implementation takes advantage - * of the fact that all of the Bigtable endpoints are OAuth2 based. It uses the OAuth AccessToken - * to get the token value and next refresh time. The refresh is scheduled asynchronously.

+ *

+ * Uses the new and simplified Google auth library: + * https://github.com/google/google-auth-library-java + *

+ *

+ * TODO: COPIED FROM io.grpc.auth.ClientAuthInterceptor. The logic added here for initialization and + * locking could be moved back to gRPC. This implementation takes advantage of the fact that all of + * the Bigtable endpoints are OAuth2 based. It uses the OAuth AccessToken to get the token value and + * next refresh time. The refresh is scheduled asynchronously. + *

*/ public class RefreshingOAuth2CredentialsInterceptor implements ClientInterceptor { /** - *

This enum describes the states of the OAuth header.

- * + *

+ * This enum describes the states of the OAuth header. + *

*
    - *
  1. Good - fine to use, and does not need to be refreshed. - *
  2. Stale - fine to use, but requires an async refresh - *
  3. Expired - Cannot be used. Wait for a new token to be loaded + *
  4. Good - fine to use, and does not need to be refreshed. + *
  5. Stale - fine to use, but requires an async refresh + *
  6. Expired - Cannot be used. Wait for a new token to be loaded *
*/ @VisibleForTesting enum CacheState { - Good, - Stale, - Expired, - Exception + Good, Stale, Expired, Exception + } + + enum RetryState { + PerformRetry, RetriesExhausted, Interrupted } - private static final Metadata.Key AUTHORIZATION_HEADER_KEY = - Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER); + private static final Logger LOG = new Logger(RefreshingOAuth2CredentialsInterceptor.class); + private static final Metadata.Key AUTHORIZATION_HEADER_KEY = Metadata.Key.of( + "Authorization", Metadata.ASCII_STRING_MARSHALLER); @VisibleForTesting static Clock clock = Clock.SYSTEM; @@ -74,8 +85,8 @@ enum CacheState { @VisibleForTesting static class HeaderCacheElement { /** - * This specifies how far in advance of a header expiration do we consider the token stale. - * The Stale state indicates that the interceptor needs to do an asynchronous refresh. + * This specifies how far in advance of a header expiration do we consider the token stale. The + * Stale state indicates that the interceptor needs to do an asynchronous refresh. */ public static final int TOKEN_STALENESS_MS = 75 * 1000; @@ -112,7 +123,7 @@ public CacheState getCacheState() { return CacheState.Exception; } long now = clock.currentTimeMillis(); - if (now < staleTimeMs){ + if (now < staleTimeMs) { return CacheState.Good; } else if (now < expiresTimeMs) { return CacheState.Stale; @@ -127,19 +138,24 @@ public CacheState getCacheState() { @VisibleForTesting final AtomicBoolean isRefreshing = new AtomicBoolean(false); + + @VisibleForTesting + Sleeper sleeper = Sleeper.DEFAULT; + private final ExecutorService executor; private final OAuth2Credentials credentials; + private final RetryOptions retryOptions; public RefreshingOAuth2CredentialsInterceptor(ExecutorService scheduler, - OAuth2Credentials credentials) { + OAuth2Credentials credentials, RetryOptions retryOptions) { this.executor = Preconditions.checkNotNull(scheduler); this.credentials = Preconditions.checkNotNull(credentials); + this.retryOptions = Preconditions.checkNotNull(retryOptions); } - @Override - public ClientCall interceptCall( - MethodDescriptor method, CallOptions callOptions, Channel next) { + public ClientCall interceptCall(MethodDescriptor method, + CallOptions callOptions, Channel next) { // TODO(sduskis): If the call fails for Auth reasons, this does not properly propagate info that // would be in WWW-Authenticate, because it does not yet have access to the header. return new CheckedForwardingClientCall(next.newCall(method, callOptions)) { @@ -228,28 +244,89 @@ boolean doRefresh() { if (!requiresRefresh) { return false; } - HeaderCacheElement cacheElement = null; + HeaderCacheElement cacheElement = refreshCredentialsWithRetry(null); + synchronized (isRefreshing) { + headerCache.set(cacheElement); + isRefreshing.set(false); + isRefreshing.notifyAll(); + } + return true; + } + + /** + * Calls {@link OAuth2Credentials#refreshAccessToken()}. In case of an IOException, retry the call + * as per the {@link Backoff} policy defined by {@link RetryOptions#createBackoff()}. + * @param backoff defines the current state of the retries. Initially, the value is null. In the + * case of an {@link IOException}, create a new {@link BackOff}; see + * {@link RetryOptions#createBackoff()} for more details on the initial construction. If + * a retry already occurred, the Backoff have the logic to get the next sleep period and + * retry exhaustion logic. + * @return HeaderCacheElement containing either a valid {@link AccessToken} or an exception. + */ + protected HeaderCacheElement refreshCredentialsWithRetry(@Nullable BackOff backoff) { try { + LOG.info("Refreshing the OAuth token"); AccessToken newToken = credentials.refreshAccessToken(); if (newToken == null) { - // The current implementations of refreshAccessToken() throw an IOException or return - // a valid token. This handling is future proofing. - cacheElement = new HeaderCacheElement( - new IOException("Could not load the token for credentials: " + credentials)); + // The current implementations of refreshAccessToken() throws an IOException or return + // a valid token. This handling is future proofing just in case credentials returns null for + // some reason. This case was caught by a poorly coded unit test. + LOG.info("Refreshed the OAuth token"); + return new HeaderCacheElement(new IOException("Could not load the token for credentials: " + + credentials)); } else { - cacheElement = new HeaderCacheElement(newToken); + // Success! + return new HeaderCacheElement(newToken); } - } catch (IOException e) { - cacheElement = new HeaderCacheElement(e); - } catch (Exception e) { - cacheElement = new HeaderCacheElement(new IOException("Could not read headers", e)); - } finally { - synchronized (isRefreshing) { - headerCache.set(cacheElement); - isRefreshing.set(false); - isRefreshing.notifyAll(); + } catch (IOException exception) { + LOG.warn("Got an unexpected IOException when refreshing google credentials.", exception); + // An IOException occurred. Retry with backoff. + if (backoff == null) { + // initialize backoff. + backoff = retryOptions.createBackoff(); + } + + // Given the backoff, either sleep for a short duration, or terminate if the backoff has + // reached its configured timeout limit. + try { + RetryState retryState = getRetryState(backoff); + if (retryState == RetryState.PerformRetry) { + return refreshCredentialsWithRetry(backoff); + } else { + return new HeaderCacheElement(exception); + } + } catch (IOException e) { + LOG.warn("Got an exception while trying to run backoff.nextBackOffMillis()", e); + return new HeaderCacheElement(exception); } + } catch (Exception e) { + LOG.warn("Got an unexpected exception while trying to refresh google credentials.", e); + return new HeaderCacheElement(new IOException("Could not read headers", e)); + } + } + + /** + * Sleep and/or determine if the backoff has timed out. + * @param backoff + * @return RetryState indicating the current state of the retry logic. + * @throws IOExcetion in some cases from {@link BackOff#nextBackOffMillis()} + */ + protected RetryState getRetryState(BackOff backoff) throws IOException{ + long nextBackOffMillis = backoff.nextBackOffMillis(); + if (nextBackOffMillis == BackOff.STOP) { + LOG.warn("Exhausted the number of retries for credentials refresh after " + + this.retryOptions.getMaxElaspedBackoffMillis() + " milliseconds."); + return RetryState.RetriesExhausted; + } + try { + sleeper.sleep(nextBackOffMillis); + // Try to perform another call. + return RetryState.PerformRetry; + } catch (InterruptedException e) { + LOG.warn("Interrupted while trying to refresh credentials."); + Thread.interrupted(); + // If the thread is interrupted, terminate immediately. + return RetryState.Interrupted; } - return true; } } diff --git a/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/RetryingRpcFutureFallbackTest.java b/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/RetryingRpcFutureFallbackTest.java index d53d80a236..a8d9367364 100644 --- a/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/RetryingRpcFutureFallbackTest.java +++ b/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/async/RetryingRpcFutureFallbackTest.java @@ -97,7 +97,7 @@ public Long answer(InvocationOnMock invocation) throws Throwable { return start + totalSleep.get(); } }); - underTest.sleeper = new RetryingRpcFutureFallback.Sleeper() { + underTest.sleeper = new Sleeper() { @Override public void sleep(long ms) throws InterruptedException { totalSleep.addAndGet(ms * 1000000); diff --git a/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/io/RefreshingOAuth2CredentialsInterceptorTest.java b/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/io/RefreshingOAuth2CredentialsInterceptorTest.java index 745fe552f7..28adb55e6d 100644 --- a/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/io/RefreshingOAuth2CredentialsInterceptorTest.java +++ b/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/io/RefreshingOAuth2CredentialsInterceptorTest.java @@ -1,13 +1,27 @@ /* - * Copyright 2015 Google Inc. All Rights Reserved. Licensed under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with the License. You may obtain - * a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.google.cloud.bigtable.grpc.io; +import static com.google.cloud.bigtable.config.RetryOptions.DEFAULT_BACKOFF_MULTIPLIER; +import static com.google.cloud.bigtable.config.RetryOptions.DEFAULT_INITIAL_BACKOFF_MILLIS; +import static com.google.cloud.bigtable.config.RetryOptions.DEFAULT_MAX_ELAPSED_BACKOFF_MILLIS; +import static com.google.cloud.bigtable.config.RetryOptions.DEFAULT_READ_PARTIAL_ROW_TIMEOUT_MS; +import static com.google.cloud.bigtable.config.RetryOptions.DEFAULT_STREAMING_BUFFER_SIZE; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.util.Date; import java.util.concurrent.Callable; @@ -32,8 +46,12 @@ import org.mockito.stubbing.Answer; import com.google.api.client.util.Clock; +import com.google.api.client.util.ExponentialBackOff; +import com.google.api.client.util.NanoClock; import com.google.auth.oauth2.AccessToken; import com.google.auth.oauth2.OAuth2Credentials; +import com.google.cloud.bigtable.config.RetryOptions; +import com.google.cloud.bigtable.grpc.async.Sleeper; import com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.CacheState; import com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.HeaderCacheElement; @@ -57,21 +75,45 @@ public static void shtudown() { @Mock private OAuth2Credentials credentials; + @Mock + private NanoClock nanoClock; + + private RetryOptions retryOptions; + @Before public void setupMocks() { MockitoAnnotations.initMocks(this); - setTime(0L); + when(nanoClock.nanoTime()).then(new Answer() { + @Override + public Long answer(InvocationOnMock invocation) throws Throwable { + return getTimeInMilliseconds() * 1000000; + } + }); + retryOptions = + new RetryOptions(true, true, DEFAULT_INITIAL_BACKOFF_MILLIS, DEFAULT_BACKOFF_MULTIPLIER, + DEFAULT_MAX_ELAPSED_BACKOFF_MILLIS, DEFAULT_STREAMING_BUFFER_SIZE, + DEFAULT_READ_PARTIAL_ROW_TIMEOUT_MS) { + @Override + protected ExponentialBackOff.Builder createBackoffBuilder() { + return super.createBackoffBuilder().setNanoClock(nanoClock); + } + }; + setTimeInMillieconds(0L); } - private void setTime(final long time) { + private void setTimeInMillieconds(final long timeMs) { RefreshingOAuth2CredentialsInterceptor.clock = new Clock() { @Override public long currentTimeMillis() { - return time; + return timeMs; } }; } + private long getTimeInMilliseconds() { + return RefreshingOAuth2CredentialsInterceptor.clock.currentTimeMillis(); + } + @Test public void testSyncRefresh() throws IOException { initialize(HeaderCacheElement.TOKEN_STALENESS_MS + 1); @@ -85,37 +127,69 @@ public void testStaleAndExpired() throws IOException { initialize(expiration); Assert.assertEquals(CacheState.Good, RefreshingOAuth2CredentialsInterceptor.getCacheState(underTest.headerCache.get())); - setTime(2L); + long startTime = 2L; + setTimeInMillieconds(startTime); Assert.assertEquals(CacheState.Stale, RefreshingOAuth2CredentialsInterceptor.getCacheState(underTest.headerCache.get())); long expiredStaleDiff = HeaderCacheElement.TOKEN_STALENESS_MS - HeaderCacheElement.TOKEN_EXPIRES_MS; - setTime(2L + expiredStaleDiff); + setTimeInMillieconds(startTime + expiredStaleDiff); Assert.assertEquals(CacheState.Expired, RefreshingOAuth2CredentialsInterceptor.getCacheState(underTest.headerCache.get())); } + @Test + public void testRetriesExhausted() throws Exception { + IOException ioException = new IOException("something bad happend"); + Mockito.when(credentials.refreshAccessToken()).thenThrow(ioException); + final int startTime = 100000000; + setTimeInMillieconds(startTime); + underTest = + new RefreshingOAuth2CredentialsInterceptor(executorService, credentials, retryOptions); + final int maxElaspedBackoffMillis = retryOptions.getMaxElaspedBackoffMillis(); + underTest.sleeper = new Sleeper() { + @Override + public void sleep(long ms) throws InterruptedException { + long now = getTimeInMilliseconds() + ms; + setTimeInMillieconds(now); + // Make sure that the system "slept" for more than the retryOption max millis. The Backoff logic + // adds some random variability to the exact elapsed time, so add in a bit of wiggle room. + Assert.assertTrue(String.format("%d > %d", now, startTime + maxElaspedBackoffMillis * 2), + now < startTime + maxElaspedBackoffMillis * 2); + } + }; + HeaderCacheElement header = underTest.refreshCredentialsWithRetry(null); + + Assert.assertNull(header.header); + Assert.assertSame(ioException, header.exception); + + // Make sure that the system "slept" for more than the retryOption max millis. The Backoff logic + // adds some random variability to the exact elapsed time, so add in a bit of wiggle room. + long timeInMillis = getTimeInMilliseconds(); + Assert.assertTrue(timeInMillis > startTime + maxElaspedBackoffMillis); + } + @Test /** * Test that checks that concurrent requests to RefreshingOAuth2CredentialsInterceptor refresh - * logic doesn't cause hanging behavior. Specifically, when an Expired condition occurs it + * logic doesn't cause hanging behavior. Specifically, when an Expired condition occurs it * triggers a call to syncRefresh() which potentially waits for refresh that was initiated * from another thread either through syncRefresh() or asyncRefresh(). This test case simulates - * that condition. + * that condition. */ public void testRefreshDoesntHang() throws Exception, TimeoutException { // Assume that the user starts at this time... it's an arbitrarily big number which will - // assure that subtracting HeaderCacheElement.TOKEN_STALENESS_MS and TOKEN_EXPIRES_MS will not + // assure that subtracting HeaderCacheElement.TOKEN_STALENESS_MS and TOKEN_EXPIRES_MS will not // be negative. long start = HeaderCacheElement.TOKEN_STALENESS_MS * 10; - setTime(start); - + setTimeInMillieconds(start); + // RefreshingOAuth2CredentialsInterceptor will show that the access token is stale. final long expiration = start + HeaderCacheElement.TOKEN_EXPIRES_MS + 1; - - // Create a mechanism that will allow us to control when the accessToken is returned. - // credentials.refreshAccessToken() will get called asynchronously and will wait until the + + // Create a mechanism that will allow us to control when the accessToken is returned. + // credentials.refreshAccessToken() will get called asynchronously and will wait until the // lock is notified before returning. That will allow us to set up multiple concurrent calls final Object lock = new String(""); Mockito.when(credentials.refreshAccessToken()).thenAnswer(new Answer() { @@ -138,7 +212,9 @@ public Void call() throws Exception { } }; - underTest = new RefreshingOAuth2CredentialsInterceptor(executorService, credentials); + underTest = + new RefreshingOAuth2CredentialsInterceptor(executorService, credentials, + new RetryOptions.Builder().build()); // At this point, the access token wasn't retrieved yet. The // RefreshingOAuth2CredentialsInterceptor considers null to be Expired. @@ -167,18 +243,7 @@ public Void call() throws Exception { private void syncCall(final Object lock, Callable syncRefreshCallable) throws InterruptedException, ExecutionException, TimeoutException { - Future future; - future = executorService.submit(syncRefreshCallable); - unlock(lock, future); - - // Wait for no more than a second to make sure that the call to underTest.syncRefresh() - // completes properly. If a second passes without syncRefresh() completing, future.get(..) - // will throw a TimeoutException. - future.get(1, TimeUnit.SECONDS); - } - - private void unlock(final Object lock, Future future) throws InterruptedException, - ExecutionException { + Future future = executorService.submit(syncRefreshCallable); // let the Thread running syncRefreshCallable() have a turn so that it can initiate the call // to refreshAccessToken(). try { @@ -195,12 +260,18 @@ private void unlock(final Object lock, Future future) throws InterruptedEx synchronized(lock) { lock.notifyAll(); } + + // Wait for no more than a second to make sure that the call to underTest.syncRefresh() + // completes properly. If a second passes without syncRefresh() completing, future.get(..) + // will throw a TimeoutException. + future.get(1, TimeUnit.SECONDS); } private void initialize(long expiration) throws IOException { Mockito.when(credentials.refreshAccessToken()).thenReturn( new AccessToken("", new Date(expiration))); - underTest = new RefreshingOAuth2CredentialsInterceptor(executorService, credentials); + underTest = + new RefreshingOAuth2CredentialsInterceptor(executorService, credentials, retryOptions); Assert.assertTrue(underTest.doRefresh()); } }