Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hadoop 16857. ABFS: Stop CustomTokenProvider retry logic to depend on AbfsRestOp retry policy #1923

Merged
merged 8 commits into from
Apr 22, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_MAX_RETRY_ATTEMPTS)
private int maxIoRetries;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT,
MinValue = 0,
DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT)
private int customTokenFetchRetryCount;

@LongConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_SIZE_PROPERTY_NAME,
MinValue = 0,
MaxValue = MAX_AZURE_BLOCK_SIZE,
Expand Down Expand Up @@ -417,6 +422,10 @@ public int getMaxIoRetries() {
return this.maxIoRetries;
}

public int getCustomTokenFetchRetryCount() {
return this.customTokenFetchRetryCount;
}

public long getAzureBlockSize() {
return this.azureBlockSize;
}
Expand Down Expand Up @@ -581,7 +590,7 @@ public AccessTokenProvider getTokenProvider() throws TokenAccessProviderExceptio
LOG.trace("Initializing {}", customTokenProviderClass.getName());
azureTokenProvider.initialize(rawConfig, accountName);
LOG.trace("{} init complete", customTokenProviderClass.getName());
return new CustomTokenProviderAdapter(azureTokenProvider);
return new CustomTokenProviderAdapter(azureTokenProvider, getCustomTokenFetchRetryCount());
} catch(IllegalArgumentException e) {
throw e;
} catch (Exception e) {
Expand Down Expand Up @@ -716,6 +725,11 @@ void setListMaxResults(int listMaxResults) {
this.listMaxResults = listMaxResults;
}

@VisibleForTesting
public void setMaxIoRetries(int maxIoRetries) {
this.maxIoRetries = maxIoRetries;
}

private String getTrimmedPasswordString(String key, String defaultValue) throws IOException {
String value = getPasswordString(key);
if (StringUtils.isBlank(value)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public final class ConfigurationKeys {
public static final String AZURE_MAX_BACKOFF_INTERVAL = "fs.azure.io.retry.max.backoff.interval";
public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count";

// Read and write buffer sizes defined by the user
public static final String AZURE_WRITE_BUFFER_SIZE = "fs.azure.write.request.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 3s
public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3;

private static final int ONE_KB = 1024;
private static final int ONE_MB = ONE_KB * ONE_KB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,23 @@ public String getMessage() {
final StringBuilder sb = new StringBuilder();
sb.append("HTTP Error ");
sb.append(httpErrorCode);
sb.append("; url='").append(url).append('\'');
sb.append(' ');
if (!url.isEmpty()) {
sb.append("; url='").append(url).append('\'').append(' ');
}

sb.append(super.getMessage());
sb.append("; requestId='").append(requestId).append('\'');
sb.append("; contentType='").append(contentType).append('\'');
sb.append("; response '").append(body).append('\'');
if (!requestId.isEmpty()) {
sb.append("; requestId='").append(requestId).append('\'');
}

if (!contentType.isEmpty()) {
sb.append("; contentType='").append(contentType).append('\'');
}

if (!body.isEmpty()) {
sb.append("; response '").append(body).append('\'');
}

return sb.toString();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension;
import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
import org.apache.hadoop.fs.azurebfs.oauth2.AzureADAuthenticator.HttpException;

/**
* Provides tokens based on custom implementation, following the Adapter Design
Expand All @@ -38,24 +39,65 @@
public final class CustomTokenProviderAdapter extends AccessTokenProvider
implements BoundDTExtension {

private final int fetchTokenRetryCount;
private CustomTokenProviderAdaptee adaptee;
private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class);

/**
* Constructs a token provider based on the custom token provider.
*
* @param adaptee the custom token provider
* @param customTokenFetchRetryCount max retry count for customTokenFetch
*/
public CustomTokenProviderAdapter(CustomTokenProviderAdaptee adaptee) {
public CustomTokenProviderAdapter(CustomTokenProviderAdaptee adaptee, int customTokenFetchRetryCount) {
Preconditions.checkNotNull(adaptee, "adaptee");
this.adaptee = adaptee;
fetchTokenRetryCount = customTokenFetchRetryCount;
}

protected AzureADToken refreshToken() throws IOException {
LOG.debug("AADToken: refreshing custom based token");

AzureADToken azureADToken = new AzureADToken();
azureADToken.setAccessToken(adaptee.getAccessToken());

String accessToken = null;

Exception ex;
boolean succeeded = false;
// Custom token providers should have their own retry policies,
// Providing a linear retry option for the the retry count
// mentioned in config "fs.azure.custom.token.fetch.retry.count"
int retryCount = fetchTokenRetryCount;
do {
ex = null;
try {
accessToken = adaptee.getAccessToken();
LOG.trace("CustomTokenProvider Access token fetch was successful with retry count {}",
(fetchTokenRetryCount - retryCount));
} catch (Exception e) {
DadanielZ marked this conversation as resolved.
Show resolved Hide resolved
LOG.debug("CustomTokenProvider Access token fetch failed with retry count {}",
(fetchTokenRetryCount - retryCount));
ex = e;
}

succeeded = (ex == null);
retryCount--;
} while (!succeeded && (retryCount) >= 0);

if (!succeeded) {
HttpException httpEx = new HttpException(
-1,
"",
String.format("CustomTokenProvider getAccessToken threw %s : %s",
ex.getClass().getTypeName(), ex.getMessage()),
"",
"",
""
);
throw httpEx;
}

azureADToken.setAccessToken(accessToken);
azureADToken.setExpiry(adaptee.getExpiryTime());

return azureADToken;
Expand Down
2 changes: 2 additions & 0 deletions hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ token when its `getAccessToken()` method is invoked.
The declared class must implement `org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee`
and optionally `org.apache.hadoop.fs.azurebfs.extensions.BoundDTExtension`.

The declared class also holds responsibility to implement retry logic while fetching access tokens.

## <a name="technical"></a> Technical notes

### <a name="proxy"></a> Proxy setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,13 @@ public void testAbfsRestOperationExceptionFormat() throws IOException {
}

@Test
public void testRequestRetryConfig() throws Exception {
testRetryLogic(0);
testRetryLogic(3);
public void testCustomTokenFetchRetryCount() throws Exception {
testWithDifferentCustomTokenFetchRetry(0);
testWithDifferentCustomTokenFetchRetry(3);
testWithDifferentCustomTokenFetchRetry(5);
}

public void testRetryLogic(int numOfRetries) throws Exception {
public void testWithDifferentCustomTokenFetchRetry(int numOfRetries) throws Exception {
AzureBlobFileSystem fs = this.getFileSystem();

Configuration config = new Configuration(this.getRawConfiguration());
Expand All @@ -93,7 +94,7 @@ public void testRetryLogic(int numOfRetries) throws Exception {
config.set("fs.azure.account.auth.type." + accountName, "Custom");
config.set("fs.azure.account.oauth.provider.type." + accountName, "org.apache.hadoop.fs"
+ ".azurebfs.oauth2.RetryTestTokenProvider");
config.set("fs.azure.io.retry.max.retries", Integer.toString(numOfRetries));
config.set("fs.azure.custom.token.fetch.retry.count", Integer.toString(numOfRetries));
// Stop filesystem creation as it will lead to calls to store.
config.set("fs.azure.createRemoteFileSystemDuringInitialization", "false");

Expand All @@ -110,7 +111,7 @@ public void testRetryLogic(int numOfRetries) throws Exception {
// Number of retries done should be as configured
Assert.assertTrue(
"Number of token fetch retries (" + RetryTestTokenProvider.reTryCount
+ ") done, does not match with max " + "retry count configured (" + numOfRetries
+ ") done, does not match with fs.azure.custom.token.fetch.retry.count configured (" + numOfRetries
+ ")", RetryTestTokenProvider.reTryCount == numOfRetries);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

import java.util.Random;

import org.junit.Assert;
import org.junit.Test;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;

/**
* Unit test TestExponentialRetryPolicy.
*/
public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {

private final int maxRetryCount = 30;
private final int noRetryCount = 0;
private final int retryCount = new Random().nextInt(maxRetryCount);
private final int retryCountBeyondMax = maxRetryCount + 1;


public TestExponentialRetryPolicy() throws Exception {
super();
}

@Test
public void testDifferentMaxIORetryCount() throws Exception {
AbfsConfiguration abfsConfig = getAbfsConfig();
abfsConfig.setMaxIoRetries(noRetryCount);
testMaxIOConfig(abfsConfig);
abfsConfig.setMaxIoRetries(retryCount);
testMaxIOConfig(abfsConfig);
abfsConfig.setMaxIoRetries(retryCountBeyondMax);
testMaxIOConfig(abfsConfig);
}

@Test
public void testDefaultMaxIORetryCount() throws Exception {
AbfsConfiguration abfsConfig = getAbfsConfig();
Assert.assertTrue(
String.format("default maxIORetry count is %s.", maxRetryCount),
abfsConfig.getMaxIoRetries() == maxRetryCount);
testMaxIOConfig(abfsConfig);
}

private AbfsConfiguration getAbfsConfig() throws Exception {
Configuration
config = new Configuration(this.getRawConfiguration());
return new AbfsConfiguration(config, "dummyAccountName");
}

private void testMaxIOConfig(AbfsConfiguration abfsConfig) {
ExponentialRetryPolicy retryPolicy = new ExponentialRetryPolicy(
abfsConfig.getMaxIoRetries());
int localRetryCount = 0;

while (localRetryCount < abfsConfig.getMaxIoRetries()) {
Assert.assertTrue(
"Retry should be allowed when retryCount less than max count configured.",
retryPolicy.shouldRetry(localRetryCount, -1));
localRetryCount++;
}

Assert.assertTrue(
"When all retries are exhausted, the retryCount will be same as max configured",
localRetryCount == abfsConfig.getMaxIoRetries());
}
}