Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_MAX_RETRY_ATTEMPTS)
private int maxIoRetries;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_IO_PREFETCH_RETRIES,
MinValue = 0,
DefaultValue = DEFAULT_PREFETCH_MAX_RETRY_ATTEMPTS)
private int maxIoPrefetchRetries;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT,
MinValue = 0,
DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT)
Expand Down Expand Up @@ -1142,6 +1147,10 @@ public int getMaxIoRetries() {
return this.maxIoRetries;
}

public int getPrefetchMaxIoRetries() {
return this.maxIoPrefetchRetries;
}

public int getCustomTokenFetchRetryCount() {
return this.customTokenFetchRetryCount;
}
Expand Down Expand Up @@ -1977,6 +1986,11 @@ public void setMaxIoRetries(int maxIoRetries) {
this.maxIoRetries = maxIoRetries;
}

@VisibleForTesting
public void setMaxIoPrefetchRetries(int maxIoPrefetchRetries) {
this.maxIoPrefetchRetries = maxIoPrefetchRetries;
}

@VisibleForTesting
void setMaxBackoffIntervalMilliseconds(int maxBackoffInterval) {
this.maxBackoffInterval = maxBackoffInterval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,8 @@ private AbfsClientContext populateAbfsClientContext() {
return new AbfsClientContextBuilder()
.withExponentialRetryPolicy(
new ExponentialRetryPolicy(abfsConfiguration))
.withPrefetchExponentialRetryPolicy(
ExponentialRetryPolicy.forPrefetch(abfsConfiguration))
.withStaticRetryPolicy(
new StaticRetryPolicy(abfsConfiguration))
.withTailLatencyRequestTimeoutRetryPolicy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public final class ConfigurationKeys {
public static final String AZURE_STATIC_RETRY_INTERVAL = "fs.azure.static.retry.interval";
public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
public static final String AZURE_MAX_IO_PREFETCH_RETRIES = "fs.azure.io.prefetch.retry.max.retries";
public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_STATIC_RETRY_INTERVAL = 1_000; // 1s
public static final int DEFAULT_BACKOFF_INTERVAL = 500; // 500ms
public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
public static final int DEFAULT_PREFETCH_MAX_RETRY_ATTEMPTS = 2;
public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public abstract class AbfsClient implements Closeable {
private final SharedKeyCredentials sharedKeyCredentials;
private ApiVersion xMsVersion = ApiVersion.getCurrentVersion();
private final ExponentialRetryPolicy exponentialRetryPolicy;
private final ExponentialRetryPolicy prefetchRetryPolicy;
private final StaticRetryPolicy staticRetryPolicy;
private final TailLatencyRequestTimeoutRetryPolicy tailLatencyRequestTimeoutRetryPolicy;
private final String filesystem;
Expand Down Expand Up @@ -212,6 +213,7 @@ private AbfsClient(final URL baseUrl,
this.filesystem = baseUrlString.substring(indexLastForwardSlash + 1);
this.abfsConfiguration = abfsConfiguration;
this.exponentialRetryPolicy = abfsClientContext.getExponentialRetryPolicy();
this.prefetchRetryPolicy = abfsClientContext.getPrefetchExponentialRetryPolicy();
this.staticRetryPolicy = abfsClientContext.getStaticRetryPolicy();
this.tailLatencyRequestTimeoutRetryPolicy = abfsClientContext.getTailLatencyRequestTimeoutRetryPolicy();
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
Expand Down Expand Up @@ -374,6 +376,10 @@ ExponentialRetryPolicy getExponentialRetryPolicy() {
return exponentialRetryPolicy;
}

ExponentialRetryPolicy getPrefetchRetryPolicy() {
return prefetchRetryPolicy;
}

StaticRetryPolicy getStaticRetryPolicy() {
return staticRetryPolicy;
}
Expand All @@ -397,6 +403,22 @@ && getAbfsConfiguration().getStaticRetryForConnectionTimeoutEnabled()) {
return getExponentialRetryPolicy();
}

/**
* Returns the retry policy based on tracing context and failure reason.
* @param tc the tracing context containing read type information.
* @param failureReason the reason for the failure to determine retry policy.
* @return retry policy to be used, prefetch policy if enabled and prefetch read, otherwise standard policy.
*/
public AbfsRetryPolicy getRetryPolicy(TracingContext tc, final String failureReason){
if (getAbfsConfiguration().isEnablePrefetchRequestPriority()
&& ReadType.PREFETCH_READ.equals(tc.getReadType())){
return getPrefetchRetryPolicy();
}
else {
return getRetryPolicy(failureReason);
}
}

SharedKeyCredentials getSharedKeyCredentials() {
return sharedKeyCredentials;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public class AbfsClientContext {

private final ExponentialRetryPolicy exponentialRetryPolicy;
private final ExponentialRetryPolicy prefetchExponentialRetryPolicy;
private final StaticRetryPolicy staticRetryPolicy;
private final TailLatencyRequestTimeoutRetryPolicy tailLatencyRequestTimeoutRetryPolicy;
private final AbfsPerfTracker abfsPerfTracker;
Expand All @@ -33,11 +34,13 @@ public class AbfsClientContext {

AbfsClientContext(
ExponentialRetryPolicy exponentialRetryPolicy,
ExponentialRetryPolicy prefetchExponentialRetryPolicy,
StaticRetryPolicy staticRetryPolicy,
TailLatencyRequestTimeoutRetryPolicy tailLatencyRequestTimeoutRetryPolicy,
AbfsPerfTracker abfsPerfTracker,
AbfsCounters abfsCounters, String fileSystemId) {
this.exponentialRetryPolicy = exponentialRetryPolicy;
this.prefetchExponentialRetryPolicy = prefetchExponentialRetryPolicy;
this.staticRetryPolicy = staticRetryPolicy;
this.tailLatencyRequestTimeoutRetryPolicy = tailLatencyRequestTimeoutRetryPolicy;
this.abfsPerfTracker = abfsPerfTracker;
Expand All @@ -49,6 +52,10 @@ public ExponentialRetryPolicy getExponentialRetryPolicy() {
return exponentialRetryPolicy;
}

public ExponentialRetryPolicy getPrefetchExponentialRetryPolicy() {
return prefetchExponentialRetryPolicy;
}

public StaticRetryPolicy getStaticRetryPolicy() {
return staticRetryPolicy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
public class AbfsClientContextBuilder {

private ExponentialRetryPolicy exponentialRetryPolicy;
private ExponentialRetryPolicy prefetchExponentialRetryPolicy;
private StaticRetryPolicy staticRetryPolicy;
private TailLatencyRequestTimeoutRetryPolicy tailLatencyRequestTimeoutRetryPolicy;
private AbfsPerfTracker abfsPerfTracker;
Expand All @@ -37,6 +38,12 @@ public AbfsClientContextBuilder withExponentialRetryPolicy(
return this;
}

public AbfsClientContextBuilder withPrefetchExponentialRetryPolicy(
final ExponentialRetryPolicy prefetchExponentialRetryPolicy) {
this.prefetchExponentialRetryPolicy = prefetchExponentialRetryPolicy;
return this;
}

public AbfsClientContextBuilder withStaticRetryPolicy(
final StaticRetryPolicy staticRetryPolicy) {
this.staticRetryPolicy = staticRetryPolicy;
Expand Down Expand Up @@ -74,6 +81,7 @@ public AbfsClientContext build() {
//validate the values
return new AbfsClientContext(
exponentialRetryPolicy,
prefetchExponentialRetryPolicy,
staticRetryPolicy,
tailLatencyRequestTimeoutRetryPolicy,
abfsPerfTracker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,8 @@ protected int readInternal(final long position, final byte[] b, final int offset
nextSize = min((long) readAheadBlockSize, contentLength - nextOffset);
}

getReadBufferManager().setPrefetchRequestPriorityEnabled(client.getAbfsConfiguration().isEnablePrefetchRequestPriority());

// try reading from buffers first
receivedBytes = getReadBufferManager().getBlock(this, position, length, b);
bytesFromReadAhead += receivedBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ private boolean executeHttpOperation(final int retryCount,

int status = httpOperation.getStatusCode();
failureReason = RetryReason.getAbbreviation(null, status, httpOperation.getStorageErrorMessage());
retryPolicy = client.getRetryPolicy(failureReason);
retryPolicy = client.getRetryPolicy(tracingContext, failureReason);

if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ public ExponentialRetryPolicy(AbfsConfiguration conf) {
conf.getBackoffIntervalMilliseconds());
}

/**
* Creates an {@link ExponentialRetryPolicy} configured with prefetch retry
* settings read from the given {@link AbfsConfiguration}.
*
* @param conf The {@link AbfsConfiguration} from which to retrieve prefetch retry configuration.
* @return a new {@link ExponentialRetryPolicy} for prefetch operations.
*/
public static ExponentialRetryPolicy forPrefetch(AbfsConfiguration conf) {
return new ExponentialRetryPolicy(
conf.getPrefetchMaxIoRetries(),
conf.getMinBackoffIntervalMilliseconds(),
conf.getMaxBackoffIntervalMilliseconds(),
conf.getBackoffIntervalMilliseconds());
}

/**
* Initializes a new instance of the {@link ExponentialRetryPolicy} class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public abstract class ReadBufferManager {
private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
private boolean isPrefetchRequestPriorityEnabled;

/**
* Initializes the ReadBufferManager singleton instance. Creates the read buffers and threads.
Expand Down Expand Up @@ -198,6 +199,15 @@ protected static void setReadAheadBlockSize(int readAheadBlockSize) {
blockSize = readAheadBlockSize;
}

/**
* Sets prefetch request priority according to the config set
*
* @param isEnabled true if its enabled, false otherwise
*/
protected void setPrefetchRequestPriorityEnabled(boolean isEnabled) {
isPrefetchRequestPriorityEnabled = isEnabled;
}

/**
* Gets the queue of read-ahead requests.
*
Expand Down Expand Up @@ -274,6 +284,15 @@ int getCompletedReadListSize() {
return completedReadList.size();
}

/**
* Checks if prefetch request priority is enabled.
*
* @return true if prefetch request priority is enabled, false otherwise
*/
boolean isPrefetchRequestPriorityEnabled() {
return isPrefetchRequestPriorityEnabled;
}

/**
* Simulates full buffer usage and adds a failed buffer for testing.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,9 @@ private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long
if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
// To prevent new read requests to fail due to old read-ahead attempts,
// return exception only from buffers that failed within last thresholdAgeMilliseconds
if ((currentTimeMillis() - (buf.getTimeStamp()) < getThresholdAgeMilliseconds())) {
if (!isPrefetchRequestPriorityEnabled() && (
currentTimeMillis() - (buf.getTimeStamp())
< getThresholdAgeMilliseconds())) {
throw buf.getErrException();
} else {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ private int getBlockFromCompletedQueue(final String eTag, final long position,
if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
// To prevent new read requests to fail due to old read-ahead attempts,
// return exception only from buffers that failed within last getThresholdAgeMilliseconds()
if ((currentTimeMillis() - (buf.getTimeStamp())
if (!isPrefetchRequestPriorityEnabled() && (currentTimeMillis() - (buf.getTimeStamp())
< getThresholdAgeMilliseconds())) {
throw buf.getErrException();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ public static void addGeneralMockBehaviourToAbfsClient(final AbfsClient abfsClie
Mockito.doReturn(staticRetryPolicy).when(abfsClient).getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION);
Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getRetryPolicy(
AdditionalMatchers.not(eq(CONNECTION_TIMEOUT_ABBREVIATION)));
Mockito.doReturn(staticRetryPolicy).when(abfsClient).getRetryPolicy(
any(TracingContext.class), eq(CONNECTION_TIMEOUT_ABBREVIATION));
Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getRetryPolicy(
any(TracingContext.class),
AdditionalMatchers.not(eq(CONNECTION_TIMEOUT_ABBREVIATION)));

// Defining behavior of static retry policy
Mockito.doReturn(true).when(staticRetryPolicy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,10 +416,12 @@ public static AbfsClient createTestClientFromCurrentContext(

AbfsClientContext abfsClientContext =
new AbfsClientContextBuilder().withAbfsPerfTracker(tracker)
.withExponentialRetryPolicy(
new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries()))
.withAbfsCounters(abfsCounters)
.build();
.withExponentialRetryPolicy(
new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries()))
.withPrefetchExponentialRetryPolicy(
new ExponentialRetryPolicy(abfsConfig.getPrefetchMaxIoRetries()))
.withAbfsCounters(abfsCounters)
.build();

// Create test AbfsClient
AbfsClient testClient;
Expand Down Expand Up @@ -524,6 +526,8 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
new ExponentialRetryPolicy(1));
when(client.getRetryPolicy(any())).thenReturn(
new ExponentialRetryPolicy(1));
when(client.getRetryPolicy(any(TracingContext.class), any())).thenReturn(
new ExponentialRetryPolicy(1));

when(client.createDefaultUriQueryBuilder()).thenCallRealMethod();
when(client.createRequestUrl(any(), any())).thenCallRealMethod();
Expand Down
Loading
Loading