diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index fafc30372b4a5..ef3a57e3f0abb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -146,6 +146,14 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_MAX_RETRY_ATTEMPTS) private int maxIoRetries; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_CONNECTION_TIMEOUT, + DefaultValue = DEFAULT_HTTP_CONNECTION_TIMEOUT) + private int httpConnectionTimeout; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_HTTP_READ_TIMEOUT, + DefaultValue = DEFAULT_HTTP_READ_TIMEOUT) + private int httpReadTimeout; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT, MinValue = 0, DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT) @@ -641,6 +649,14 @@ public int getCustomTokenFetchRetryCount() { return this.customTokenFetchRetryCount; } + public int getHttpConnectionTimeout() { + return this.httpConnectionTimeout; + } + + public int getHttpReadTimeout() { + return this.httpReadTimeout; + } + public long getAzureBlockSize() { return this.azureBlockSize; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 9d3b2d5e82c6e..9f1ff30d7bd09 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -45,6 +45,8 @@ public final class ConfigurationKeys { public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval"; public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries"; public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count"; + public static final String AZURE_HTTP_CONNECTION_TIMEOUT = "fs.azure.http.connection.timeout"; + public static final String AZURE_HTTP_READ_TIMEOUT = "fs.azure.http.read.timeout"; // Retry strategy for getToken calls public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = "fs.azure.oauth.token.fetch.retry.max.retries"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 63d62a33b1819..7fc43d0d1b53f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -43,6 +43,8 @@ public final class FileSystemConfigurations { public static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 3s public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30; public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3; + public static final int DEFAULT_HTTP_CONNECTION_TIMEOUT = 500; // 500ms + public static final int DEFAULT_HTTP_READ_TIMEOUT = 30 * 1000; // 30 secs // Retry parameter defaults. public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index aa72ed64e6e5d..be8379746ee45 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -1290,6 +1290,14 @@ public void addCallback(ListenableFuture future, FutureCallback callba Futures.addCallback(future, callback, executorService); } + /** + * Gets the current active configuration instance. + * @return active AbfsConfiguration instance. + */ + public AbfsConfiguration getAbfsConfiguration() { + return abfsConfiguration; + } + @VisibleForTesting protected AccessTokenProvider getTokenProvider() { return tokenProvider; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java index 413bf3686898b..867775a7192c8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java @@ -49,9 +49,6 @@ public class AbfsHttpOperation implements AbfsPerfLoggable { private static final Logger LOG = LoggerFactory.getLogger(AbfsHttpOperation.class); - private static final int CONNECT_TIMEOUT = 30 * 1000; - private static final int READ_TIMEOUT = 30 * 1000; - private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024; private static final int ONE_THOUSAND = 1000; @@ -257,11 +254,17 @@ public String getMaskedEncodedUrl() { * * @param url The full URL including query string parameters. * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE). - * @param requestHeaders The HTTP request headers.READ_TIMEOUT + * @param requestHeaders The HTTP request headers. + * @param connectionTimeout timeout for connection establishment. + * @param readTimeout timeout for read streaming from server. * * @throws IOException if an error occurs. */ - public AbfsHttpOperation(final URL url, final String method, final List requestHeaders) + public AbfsHttpOperation(final URL url, + final String method, + final List requestHeaders, + final int connectionTimeout, + final int readTimeout) throws IOException { this.isTraceEnabled = LOG.isTraceEnabled(); this.url = url; @@ -276,8 +279,10 @@ public AbfsHttpOperation(final URL url, final String method, final List"); - AbfsHttpOperation op = new AbfsHttpOperation(url, method, requestHeaders); + AbfsHttpOperation op = new AbfsHttpOperation(url, method, requestHeaders, + DEFAULT_HTTP_CONNECTION_TIMEOUT, DEFAULT_HTTP_READ_TIMEOUT); byte[] requestBuffer = requestBody.toString().getBytes(StandardCharsets.UTF_8.toString()); op.sendRequest(requestBuffer, 0, requestBuffer.length); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java index 191d6e77ae09b..4e2c6c1dc6ae9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsPerfTracker.java @@ -38,6 +38,9 @@ import static org.assertj.core.api.Assertions.assertThat; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_CONNECTION_TIMEOUT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT; + /** * Test the latency tracker for ABFS. * @@ -74,7 +77,9 @@ public void verifyDisablingOfTracker() throws Exception { try (AbfsPerfInfo tracker = new AbfsPerfInfo(abfsPerfTracker, "disablingCaller", "disablingCallee")) { - AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + AbfsHttpOperation op = new AbfsHttpOperation(url, "GET", + new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT, + DEFAULT_HTTP_READ_TIMEOUT); tracker.registerResult(op).registerSuccess(true); } @@ -92,7 +97,9 @@ public void verifyTrackingForSingletonLatencyRecords() throws Exception { assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull(); List> tasks = new ArrayList<>(); - AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", + new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT, + DEFAULT_HTTP_READ_TIMEOUT); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -131,7 +138,9 @@ public void verifyTrackingForAggregateLatencyRecords() throws Exception { assertThat(latencyDetails).describedAs("AbfsPerfTracker should be empty").isNull(); List> tasks = new ArrayList<>(); - AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", + new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT, + DEFAULT_HTTP_READ_TIMEOUT); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -170,7 +179,9 @@ public void verifyRecordingSingletonLatencyIsCheapWhenDisabled() throws Exceptio long aggregateLatency = 0; AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); List> tasks = new ArrayList<>(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", + new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT, + DEFAULT_HTTP_READ_TIMEOUT); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -205,7 +216,9 @@ public void verifyRecordingAggregateLatencyIsCheapWhenDisabled() throws Exceptio long aggregateLatency = 0; AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, false); List> tasks = new ArrayList<>(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", + new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT, + DEFAULT_HTTP_READ_TIMEOUT); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -269,7 +282,9 @@ public void verifyRecordingSingletonLatencyIsCheapWhenEnabled() throws Exception long aggregateLatency = 0; AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); List> tasks = new ArrayList<>(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", + new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT, + DEFAULT_HTTP_READ_TIMEOUT); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -303,7 +318,9 @@ public void verifyRecordingAggregateLatencyIsCheapWhenEnabled() throws Exception long aggregateLatency = 0; AbfsPerfTracker abfsPerfTracker = new AbfsPerfTracker(accountName, filesystemName, true); List> tasks = new ArrayList<>(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList<>()); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", + new ArrayList<>(), DEFAULT_HTTP_CONNECTION_TIMEOUT, + DEFAULT_HTTP_READ_TIMEOUT); for (int i = 0; i < numTasks; i++) { tasks.add(() -> { @@ -363,7 +380,9 @@ public void verifyNoExceptionOnInvalidInput() throws Exception { Instant testInstant = Instant.now(); AbfsPerfTracker abfsPerfTrackerDisabled = new AbfsPerfTracker(accountName, filesystemName, false); AbfsPerfTracker abfsPerfTrackerEnabled = new AbfsPerfTracker(accountName, filesystemName, true); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList()); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", + new ArrayList(), DEFAULT_HTTP_CONNECTION_TIMEOUT, + DEFAULT_HTTP_READ_TIMEOUT); verifyNoException(abfsPerfTrackerDisabled); verifyNoException(abfsPerfTrackerEnabled); @@ -371,7 +390,9 @@ public void verifyNoExceptionOnInvalidInput() throws Exception { private void verifyNoException(AbfsPerfTracker abfsPerfTracker) throws Exception { Instant testInstant = Instant.now(); - final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", new ArrayList()); + final AbfsHttpOperation httpOperation = new AbfsHttpOperation(url, "GET", + new ArrayList(), DEFAULT_HTTP_CONNECTION_TIMEOUT, + DEFAULT_HTTP_READ_TIMEOUT); try ( AbfsPerfInfo tracker01 = new AbfsPerfInfo(abfsPerfTracker, null, null);