Skip to content

Commit

Permalink
Merge pull request #504 from sduskis/retry_auth
Browse files Browse the repository at this point in the history
Using a retry with back off for auth refreshing.
  • Loading branch information
sduskis committed Oct 12, 2015
2 parents 6aa04a7 + 573d55b commit 967ecce
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ private Future<Void> initializeCredentials(Future<Credentials> credentialsFuture
if (credentials != null) {
if (credentials instanceof OAuth2Credentials) {
final RefreshingOAuth2CredentialsInterceptor oauth2Interceptor =
new RefreshingOAuth2CredentialsInterceptor(batchPool, (OAuth2Credentials) credentials);
new RefreshingOAuth2CredentialsInterceptor(batchPool, (OAuth2Credentials) credentials,
this.options.getRetryOptions());
credentialRefreshFuture = batchPool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.google.common.util.concurrent.ListenableFuture;

/**
* A FutureFallback that retries a RetryableRpc request.
* A {@link FutureFallback} that retries a {@link RetryableRpc} request.
*/
public class RetryingRpcFutureFallback<RequestT, ResponseT> implements FutureFallback<ResponseT> {

Expand All @@ -41,26 +41,14 @@ public static <RequestT, ResponseT> RetryingRpcFutureFallback<RequestT, Response
return new RetryingRpcFutureFallback<RequestT, ResponseT>(retryOptions, request, retryableRpc);
}

@VisibleForTesting
interface Sleeper {
void sleep(long ms) throws InterruptedException;
}

static Sleeper THREAD_SLEEPER = new Sleeper() {
@Override
public void sleep(long ms) throws InterruptedException {
Thread.sleep(ms);
}
};

protected final Log LOG = LogFactory.getLog(RetryingRpcFutureFallback.class);

private final RequestT request;

@VisibleForTesting
BackOff currentBackoff;
@VisibleForTesting
Sleeper sleeper = THREAD_SLEEPER;
Sleeper sleeper = Sleeper.DEFAULT;

private final RetryableRpc<RequestT, ResponseT> retryableRpc;
private final RetryOptions retryOptions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2015 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.grpc.async;

/**
* An interface that wraps Thread.sleep for test purposes
*/
public interface Sleeper {
/**
* Usually a wrapper for Thread.sleep. Can be overridden for testing purposes.
*/
void sleep(long ms) throws InterruptedException;

/**
* A Sleeper that uses {@link Thread#sleep()}
*/
public static Sleeper DEFAULT = new Sleeper() {
@Override
public void sleep(long ms) throws InterruptedException {
Thread.sleep(ms);
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -28,54 +28,65 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import javax.annotation.Nullable;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.Clock;
import com.google.api.client.util.Preconditions;
import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.cloud.bigtable.config.Logger;
import com.google.cloud.bigtable.config.RetryOptions;
import com.google.cloud.bigtable.grpc.async.Sleeper;
import com.google.common.annotations.VisibleForTesting;

/**
* Client interceptor that authenticates all calls by binding header data provided by a credential.
* Typically this will populate the Authorization header but other headers may also be filled out.
*
* <p> Uses the new and simplified Google auth library:
* https://github.com/google/google-auth-library-java</p>
*
* <p> TODO: COPIED FROM io.grpc.auth.ClientAuthInterceptor. The logic added here for
* initialization and locking could be moved back to gRPC. This implementation takes advantage
* of the fact that all of the Bigtable endpoints are OAuth2 based. It uses the OAuth AccessToken
* to get the token value and next refresh time. The refresh is scheduled asynchronously.</p>
* <p>
* Uses the new and simplified Google auth library:
* https://github.com/google/google-auth-library-java
* </p>
* <p>
* TODO: COPIED FROM io.grpc.auth.ClientAuthInterceptor. The logic added here for initialization and
* locking could be moved back to gRPC. This implementation takes advantage of the fact that all of
* the Bigtable endpoints are OAuth2 based. It uses the OAuth AccessToken to get the token value and
* next refresh time. The refresh is scheduled asynchronously.
* </p>
*/
public class RefreshingOAuth2CredentialsInterceptor implements ClientInterceptor {

/**
* <p>This enum describes the states of the OAuth header.</p>
*
* <p>
* This enum describes the states of the OAuth header.
* </p>
* <ol>
* <li> Good - fine to use, and does not need to be refreshed.
* <li> Stale - fine to use, but requires an async refresh
* <li> Expired - Cannot be used. Wait for a new token to be loaded
* <li>Good - fine to use, and does not need to be refreshed.
* <li>Stale - fine to use, but requires an async refresh
* <li>Expired - Cannot be used. Wait for a new token to be loaded
* </ol>
*/
@VisibleForTesting
enum CacheState {
Good,
Stale,
Expired,
Exception
Good, Stale, Expired, Exception
}

enum RetryState {
PerformRetry, RetriesExhausted, Interrupted
}

private static final Metadata.Key<String> AUTHORIZATION_HEADER_KEY =
Metadata.Key.of("Authorization", Metadata.ASCII_STRING_MARSHALLER);
private static final Logger LOG = new Logger(RefreshingOAuth2CredentialsInterceptor.class);
private static final Metadata.Key<String> AUTHORIZATION_HEADER_KEY = Metadata.Key.of(
"Authorization", Metadata.ASCII_STRING_MARSHALLER);

@VisibleForTesting
static Clock clock = Clock.SYSTEM;

@VisibleForTesting
static class HeaderCacheElement {
/**
* This specifies how far in advance of a header expiration do we consider the token stale.
* The Stale state indicates that the interceptor needs to do an asynchronous refresh.
* This specifies how far in advance of a header expiration do we consider the token stale. The
* Stale state indicates that the interceptor needs to do an asynchronous refresh.
*/
public static final int TOKEN_STALENESS_MS = 75 * 1000;

Expand Down Expand Up @@ -112,7 +123,7 @@ public CacheState getCacheState() {
return CacheState.Exception;
}
long now = clock.currentTimeMillis();
if (now < staleTimeMs){
if (now < staleTimeMs) {
return CacheState.Good;
} else if (now < expiresTimeMs) {
return CacheState.Stale;
Expand All @@ -127,19 +138,24 @@ public CacheState getCacheState() {

@VisibleForTesting
final AtomicBoolean isRefreshing = new AtomicBoolean(false);

@VisibleForTesting
Sleeper sleeper = Sleeper.DEFAULT;

private final ExecutorService executor;
private final OAuth2Credentials credentials;
private final RetryOptions retryOptions;

public RefreshingOAuth2CredentialsInterceptor(ExecutorService scheduler,
OAuth2Credentials credentials) {
OAuth2Credentials credentials, RetryOptions retryOptions) {
this.executor = Preconditions.checkNotNull(scheduler);
this.credentials = Preconditions.checkNotNull(credentials);
this.retryOptions = Preconditions.checkNotNull(retryOptions);
}


@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
// TODO(sduskis): If the call fails for Auth reasons, this does not properly propagate info that
// would be in WWW-Authenticate, because it does not yet have access to the header.
return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
Expand Down Expand Up @@ -228,28 +244,89 @@ boolean doRefresh() {
if (!requiresRefresh) {
return false;
}
HeaderCacheElement cacheElement = null;
HeaderCacheElement cacheElement = refreshCredentialsWithRetry(null);
synchronized (isRefreshing) {
headerCache.set(cacheElement);
isRefreshing.set(false);
isRefreshing.notifyAll();
}
return true;
}

/**
* Calls {@link OAuth2Credentials#refreshAccessToken()}. In case of an IOException, retry the call
* as per the {@link Backoff} policy defined by {@link RetryOptions#createBackoff()}.
* @param backoff defines the current state of the retries. Initially, the value is null. In the
* case of an {@link IOException}, create a new {@link BackOff}; see
* {@link RetryOptions#createBackoff()} for more details on the initial construction. If
* a retry already occurred, the Backoff have the logic to get the next sleep period and
* retry exhaustion logic.
* @return HeaderCacheElement containing either a valid {@link AccessToken} or an exception.
*/
protected HeaderCacheElement refreshCredentialsWithRetry(@Nullable BackOff backoff) {
try {
LOG.info("Refreshing the OAuth token");
AccessToken newToken = credentials.refreshAccessToken();
if (newToken == null) {
// The current implementations of refreshAccessToken() throw an IOException or return
// a valid token. This handling is future proofing.
cacheElement = new HeaderCacheElement(
new IOException("Could not load the token for credentials: " + credentials));
// The current implementations of refreshAccessToken() throws an IOException or return
// a valid token. This handling is future proofing just in case credentials returns null for
// some reason. This case was caught by a poorly coded unit test.
LOG.info("Refreshed the OAuth token");
return new HeaderCacheElement(new IOException("Could not load the token for credentials: "
+ credentials));
} else {
cacheElement = new HeaderCacheElement(newToken);
// Success!
return new HeaderCacheElement(newToken);
}
} catch (IOException e) {
cacheElement = new HeaderCacheElement(e);
} catch (Exception e) {
cacheElement = new HeaderCacheElement(new IOException("Could not read headers", e));
} finally {
synchronized (isRefreshing) {
headerCache.set(cacheElement);
isRefreshing.set(false);
isRefreshing.notifyAll();
} catch (IOException exception) {
LOG.warn("Got an unexpected IOException when refreshing google credentials.", exception);
// An IOException occurred. Retry with backoff.
if (backoff == null) {
// initialize backoff.
backoff = retryOptions.createBackoff();
}

// Given the backoff, either sleep for a short duration, or terminate if the backoff has
// reached its configured timeout limit.
try {
RetryState retryState = getRetryState(backoff);
if (retryState == RetryState.PerformRetry) {
return refreshCredentialsWithRetry(backoff);
} else {
return new HeaderCacheElement(exception);
}
} catch (IOException e) {
LOG.warn("Got an exception while trying to run backoff.nextBackOffMillis()", e);
return new HeaderCacheElement(exception);
}
} catch (Exception e) {
LOG.warn("Got an unexpected exception while trying to refresh google credentials.", e);
return new HeaderCacheElement(new IOException("Could not read headers", e));
}
}

/**
* Sleep and/or determine if the backoff has timed out.
* @param backoff
* @return RetryState indicating the current state of the retry logic.
* @throws IOExcetion in some cases from {@link BackOff#nextBackOffMillis()}
*/
protected RetryState getRetryState(BackOff backoff) throws IOException{
long nextBackOffMillis = backoff.nextBackOffMillis();
if (nextBackOffMillis == BackOff.STOP) {
LOG.warn("Exhausted the number of retries for credentials refresh after "
+ this.retryOptions.getMaxElaspedBackoffMillis() + " milliseconds.");
return RetryState.RetriesExhausted;
}
try {
sleeper.sleep(nextBackOffMillis);
// Try to perform another call.
return RetryState.PerformRetry;
} catch (InterruptedException e) {
LOG.warn("Interrupted while trying to refresh credentials.");
Thread.interrupted();
// If the thread is interrupted, terminate immediately.
return RetryState.Interrupted;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public Long answer(InvocationOnMock invocation) throws Throwable {
return start + totalSleep.get();
}
});
underTest.sleeper = new RetryingRpcFutureFallback.Sleeper() {
underTest.sleeper = new Sleeper() {
@Override
public void sleep(long ms) throws InterruptedException {
totalSleep.addAndGet(ms * 1000000);
Expand Down
Loading

0 comments on commit 967ecce

Please sign in to comment.