Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_MAX_RETRY_ATTEMPTS)
private int maxIoRetries;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_CONNECTION_TIMEOUT,
DefaultValue = DEFAULT_HTTP_CONNECTION_TIMEOUT)
private int httpConnectionTimeout;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_READ_TIMEOUT,
DefaultValue = DEFAULT_HTTP_READ_TIMEOUT)
private int httpReadTimeout;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT,
MinValue = 0,
DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT)
Expand Down Expand Up @@ -641,6 +649,14 @@ public int getCustomTokenFetchRetryCount() {
return this.customTokenFetchRetryCount;
}

public int getHttpConnectionTimeout() {
return this.httpConnectionTimeout;
}

public int getHttpReadTimeout() {
return this.httpReadTimeout;
}

public long getAzureBlockSize() {
return this.azureBlockSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public final class ConfigurationKeys {
public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count";
public static final String AZURE_HTTP_CONNECTION_TIMEOUT = "fs.azure.http.connection.timeout";
public static final String AZURE_HTTP_READ_TIMEOUT = "fs.azure.http.read.timeout";

// Retry strategy for getToken calls
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = "fs.azure.oauth.token.fetch.retry.max.retries";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 3s
public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3;
public static final int DEFAULT_HTTP_CONNECTION_TIMEOUT = 500; // 500ms
public static final int DEFAULT_HTTP_READ_TIMEOUT = 30 * 1000; // 30 secs

// Retry parameter defaults.
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,14 @@ public <V> void addCallback(ListenableFuture<V> future, FutureCallback<V> callba
Futures.addCallback(future, callback, executorService);
}

/**
* Gets the current active configuration instance.
* @return active AbfsConfiguration instance.
*/
public AbfsConfiguration getAbfsConfiguration() {
return abfsConfiguration;
}

@VisibleForTesting
protected AccessTokenProvider getTokenProvider() {
return tokenProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@
public class AbfsHttpOperation implements AbfsPerfLoggable {
private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class);

private static final int CONNECT_TIMEOUT = 30 * 1000;
private static final int READ_TIMEOUT = 30 * 1000;

private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024;

private static final int ONE_THOUSAND = 1000;
Expand Down Expand Up @@ -257,11 +254,17 @@ public String getMaskedEncodedUrl() {
*
* @param url The full URL including query string parameters.
* @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
* @param requestHeaders The HTTP request headers.READ_TIMEOUT
* @param requestHeaders The HTTP request headers.
* @param connectionTimeout timeout for connection establishment.
* @param readTimeout timeout for read streaming from server.
*
* @throws IOException if an error occurs.
*/
public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttpHeader> requestHeaders)
public AbfsHttpOperation(final URL url,
final String method,
final List<AbfsHttpHeader> requestHeaders,
final int connectionTimeout,
final int readTimeout)
throws IOException {
this.isTraceEnabled = LOG.isTraceEnabled();
this.url = url;
Expand All @@ -276,8 +279,10 @@ public AbfsHttpOperation(final URL url, final String method, final List<AbfsHttp
}
}

this.connection.setConnectTimeout(CONNECT_TIMEOUT);
this.connection.setReadTimeout(READ_TIMEOUT);
this.connection.setConnectTimeout(connectionTimeout);
this.connection.setReadTimeout(readTimeout);
LOG.trace("HttpUrlConnection setup: connTimeout={}, readTimeout={}",
connectionTimeout, readTimeout);

this.connection.setRequestMethod(method);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,9 @@ private boolean executeHttpOperation(final int retryCount,
AbfsHttpOperation httpOperation = null;
try {
// initialize the HTTP request and open the connection
httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
httpOperation = new AbfsHttpOperation(url, method, requestHeaders,
client.getAbfsConfiguration().getHttpConnectionTimeout(),
client.getAbfsConfiguration().getHttpReadTimeout());
incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1);
tracingContext.constructHeader(httpOperation);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.Random;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -31,7 +32,12 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
import org.apache.hadoop.fs.contract.ContractTestUtils;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_HTTP_CONNECTION_TIMEOUT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_HTTP_READ_TIMEOUT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
Expand All @@ -45,6 +51,9 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
private static final int TEST_OFFSET = 100;
private static final int TEST_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private static final int TEST_DEFAULT_READ_BUFFER_SIZE = 1023900;
private static final int TEST_STABLE_DEFAULT_CONNECTION_TIMEOUT_MS = 500;
private static final int TEST_STABLE_DEFAULT_READ_TIMEOUT_MS = 30000;
private static final int TEST_UNSTABLE_READ_TIMEOUT_MS = 1;

public ITestAzureBlobFileSystemE2E() throws Exception {
super();
Expand Down Expand Up @@ -229,4 +238,43 @@ private void testWriteOneByteToFile(Path testFilePath) throws Exception {
FileStatus fileStatus = fs.getFileStatus(testFilePath);
assertEquals(1, fileStatus.getLen());
}

@Test
public void testHttpConnectionTimeout() throws Exception {
// Not seeing connection failures while testing with 1 ms connection
// timeout itself and on repeated TPCDS runs when cluster
// and account are in same region, 10 ms is seen stable.
// 500 ms is seen stable for cross region.
testHttpTimeouts(TEST_STABLE_DEFAULT_CONNECTION_TIMEOUT_MS,
TEST_STABLE_DEFAULT_READ_TIMEOUT_MS);
}

@Test(expected = InvalidAbfsRestOperationException.class)
public void testHttpReadTimeout() throws Exception {
// Small read timeout is bound to make the request fail.
testHttpTimeouts(TEST_STABLE_DEFAULT_CONNECTION_TIMEOUT_MS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TEST_UNSTABLE_READ_TIMEOUT_MS);
}

public void testHttpTimeouts(int connectionTimeoutMs, int readTimeoutMs)
throws Exception {
Configuration conf = this.getRawConfiguration();
// set to small values that will cause timeouts
conf.setInt(AZURE_HTTP_CONNECTION_TIMEOUT, connectionTimeoutMs);
conf.setInt(AZURE_HTTP_READ_TIMEOUT, readTimeoutMs);
// Reduce retry count to reduce test run time
conf.setInt(AZURE_MAX_IO_RETRIES, 1);
final AzureBlobFileSystem fs = getFileSystem(conf);
Assertions.assertThat(
fs.getAbfsStore().getAbfsConfiguration().getHttpConnectionTimeout())
.describedAs("HTTP connection time should be picked from config")
.isEqualTo(connectionTimeoutMs);
Assertions.assertThat(
fs.getAbfsStore().getAbfsConfiguration().getHttpReadTimeout())
.describedAs("HTTP Read time should be picked from config")
.isEqualTo(readTimeoutMs);
Path testPath = path(methodName.getMethodName());
ContractTestUtils.createFile(fs, testPath, false, new byte[0]);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.hadoop.fs.azurebfs.utils.Base64;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_RANGE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE;
Expand Down Expand Up @@ -143,6 +145,8 @@ public void testConfigServiceImplAnnotatedFieldsInitialized() throws Exception {
assertEquals(MAX_AZURE_BLOCK_SIZE, abfsConfiguration.getAzureBlockSize());
assertEquals(AZURE_BLOCK_LOCATION_HOST_DEFAULT, abfsConfiguration.getAzureBlockLocationHost());
assertEquals(DEFAULT_READ_AHEAD_RANGE, abfsConfiguration.getReadAheadRange());
assertEquals(DEFAULT_HTTP_CONNECTION_TIMEOUT, abfsConfiguration.getHttpConnectionTimeout());
assertEquals(DEFAULT_HTTP_READ_TIMEOUT, abfsConfiguration.getHttpReadTimeout());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.apache.hadoop.fs.azurebfs.utils.SASGenerator;
import org.apache.hadoop.security.AccessControlException;

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;

/**
* A mock SAS token provider implementation
*/
Expand Down Expand Up @@ -103,7 +106,8 @@ private byte[] getUserDelegationKey(String accountName, String appID, String app
requestBody.append(ske);
requestBody.append("</Expiry></KeyInfo>");

AbfsHttpOperation op = new AbfsHttpOperation(url, method, requestHeaders);
AbfsHttpOperation op = new AbfsHttpOperation(url, method, requestHeaders,
DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT);

byte[] requestBuffer = requestBody.toString().getBytes(StandardCharsets.UTF_8.toString());
op.sendRequest(requestBuffer, 0, requestBuffer.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@

import static org.assertj.core.api.Assertions.assertThat;

import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;

/**
* Test the latency tracker for ABFS.
*
Expand Down Expand Up @@ -74,7 +77,9 @@ public void verifyDisablingOfTracker() throws Exception {

try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "disablingCaller",
"disablingCallee")) {
AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", new ArrayList<>());
AbfsHttpOperation op = new AbfsHttpOperation(url, "GET",
new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT,
DEFAULT_HTTP_READ_TIMEOUT);
tracker.registerResult(op).registerSuccess(true);
}

Expand All @@ -92,7 +97,9 @@ public void verifyTrackingForSingletonLatencyRecords() throws Exception {
assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull();

List<Callable<Integer>> tasks = new ArrayList<>();
AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET",
new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT,
DEFAULT_HTTP_READ_TIMEOUT);

for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Expand Down Expand Up @@ -131,7 +138,9 @@ public void verifyTrackingForAggregateLatencyRecords() throws Exception {
assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull();

List<Callable<Integer>> tasks = new ArrayList<>();
AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET",
new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT,
DEFAULT_HTTP_READ_TIMEOUT);

for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Expand Down Expand Up @@ -170,7 +179,9 @@ public void verifyRecordingSingletonLatencyIsCheapWhenDisabled() throws Exceptio
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET",
new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT,
DEFAULT_HTTP_READ_TIMEOUT);

for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Expand Down Expand Up @@ -205,7 +216,9 @@ public void verifyRecordingAggregateLatencyIsCheapWhenDisabled() throws Exceptio
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET",
new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT,
DEFAULT_HTTP_READ_TIMEOUT);

for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Expand Down Expand Up @@ -269,7 +282,9 @@ public void verifyRecordingSingletonLatencyIsCheapWhenEnabled() throws Exception
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET",
new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT,
DEFAULT_HTTP_READ_TIMEOUT);

for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Expand Down Expand Up @@ -303,7 +318,9 @@ public void verifyRecordingAggregateLatencyIsCheapWhenEnabled() throws Exception
long aggregateLatency = 0;
AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true);
List<Callable<Long>> tasks = new ArrayList<>();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>());
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET",
new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT,
DEFAULT_HTTP_READ_TIMEOUT);

for (int i = 0; i < numTasks; i++) {
tasks.add(() -> {
Expand Down Expand Up @@ -363,15 +380,19 @@ public void verifyNoExceptionOnInvalidInput() throws Exception {
Instant testInstant = Instant.now();
AbfsPerfTracker abfsPerfTrackerDisabled = new AbfsPerfTracker(accountName, filesystemName, false);
AbfsPerfTracker abfsPerfTrackerEnabled = new AbfsPerfTracker(accountName, filesystemName, true);
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<AbfsHttpHeader>());
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET",
new ArrayList<AbfsHttpHeader>(), DEFAULT_HTTP_CONNECTION_TIMEOUT,
DEFAULT_HTTP_READ_TIMEOUT);

verifyNoException(abfsPerfTrackerDisabled);
verifyNoException(abfsPerfTrackerEnabled);
}

private void verifyNoException(AbfsPerfTracker abfsPerfTracker) throws Exception {
Instant testInstant = Instant.now();
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<AbfsHttpHeader>());
final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET",
new ArrayList<AbfsHttpHeader>(), DEFAULT_HTTP_CONNECTION_TIMEOUT,
DEFAULT_HTTP_READ_TIMEOUT);

try (
AbfsPerfInfo tracker01 = new AbfsPerfInfo(abfsPerfTracker, null, null);
Expand Down