Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -438,4 +438,46 @@ public static void enforceFeatureEnabledOrThrow(
"If set to true (default), allow credential vending for external catalogs. Note this requires ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING to be true first.")
.defaultValue(true)
.buildFeatureConfiguration();

public static final FeatureConfiguration<Integer> CLOUD_API_TIMEOUT_MILLIS =
PolarisConfiguration.<Integer>builder()
.key("CLOUD_API_TIMEOUT_MILLIS")
.description(
"Timeout in milliseconds for cloud provider API requests. "
+ "Prevents indefinite blocking when cloud provider endpoints are slow or unresponsive. "
+ "Used internally by storage integrations for credential vending and other cloud operations. "
+ "Currently only used by Azure storage integration (not yet implemented for AWS S3 or GCP).")
.defaultValue(15000)
.buildFeatureConfiguration();

public static final FeatureConfiguration<Integer> CLOUD_API_RETRY_COUNT =
PolarisConfiguration.<Integer>builder()
.key("CLOUD_API_RETRY_COUNT")
.description(
"Number of retry attempts for cloud provider API requests. "
+ "Uses exponential backoff with jitter to handle transient failures. "
+ "Currently only used by Azure storage integration (not yet implemented for AWS S3 or GCP).")
.defaultValue(3)
.buildFeatureConfiguration();

public static final FeatureConfiguration<Integer> CLOUD_API_RETRY_DELAY_MILLIS =
PolarisConfiguration.<Integer>builder()
.key("CLOUD_API_RETRY_DELAY_MILLIS")
.description(
"Initial delay in milliseconds before first retry for cloud provider API requests. "
+ "Delay doubles with each retry (exponential backoff). "
+ "Currently only used by Azure storage integration (not yet implemented for AWS S3 or GCP).")
.defaultValue(2000)
.buildFeatureConfiguration();

public static final FeatureConfiguration<Integer> CLOUD_API_RETRY_JITTER_MILLIS =

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to use milliseconds instead of a 0-1 jitter factor for a few reasons:

User clarity - It's more intuitive for operators to specify "500 milliseconds of jitter" rather than understanding what "0.5 jitter factor" means (50% of the retry delay)
Concrete vs relative - Millis gives direct control over the maximum random delay added, while a factor requires understanding how it interacts with the exponential backoff delays
Consistency - All other time-based configs use concrete units (seconds/millis) rather than abstract factors
Predictability - With millis, the max jitter is always clear regardless of retry delay values

The small conversion cost (jitterMillis / 1000.0) is negligible compared to the benefits of making the config more operator friendly. Happy to change to 0-1 factor if you prefer that approach though!

PolarisConfiguration.<Integer>builder()
.key("CLOUD_API_RETRY_JITTER_MILLIS")
.description(
"Maximum jitter in milliseconds added to retry delays for cloud provider API requests. "
+ "Helps prevent thundering herd when multiple requests fail simultaneously. "
+ "Actual jitter is random between 0 and this value. "
+ "Currently only used by Azure storage integration (not yet implemented for AWS S3 or GCP).")
.defaultValue(500)
.buildFeatureConfiguration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.polaris.core.storage.azure;

import static org.apache.polaris.core.config.FeatureConfiguration.CLOUD_API_RETRY_DELAY_MILLIS;
import static org.apache.polaris.core.config.FeatureConfiguration.CLOUD_API_RETRY_JITTER_MILLIS;
import static org.apache.polaris.core.config.FeatureConfiguration.CLOUD_API_RETRY_COUNT;
import static org.apache.polaris.core.config.FeatureConfiguration.CLOUD_API_TIMEOUT_MILLIS;
import static org.apache.polaris.core.config.FeatureConfiguration.STORAGE_CREDENTIAL_DURATION_SECONDS;

import com.azure.core.credential.AccessToken;
Expand All @@ -39,6 +43,7 @@
import com.azure.storage.file.datalake.sas.PathSasPermission;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.Period;
Expand All @@ -55,6 +60,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

/** Azure credential vendor that supports generating SAS token */
public class AzureCredentialsStorageIntegration
Expand Down Expand Up @@ -120,7 +126,7 @@ public StorageAccessConfig getSubscopedCreds(
OffsetDateTime.ofInstant(
start.plusSeconds(3600), ZoneOffset.UTC); // 1 hr to sync with AWS and GCP Access token

AccessToken accessToken = getAccessToken(config().getTenantId());
AccessToken accessToken = getAccessToken(realmConfig, config().getTenantId());
// Get user delegation key.
// Set the new generated user delegation key expiry to 7 days and minute 1 min
// Azure strictly requires the end time to be <= 7 days from the current time, -1 min to avoid
Expand Down Expand Up @@ -312,16 +318,103 @@ private void validateAccountAndContainer(
});
}

private AccessToken getAccessToken(String tenantId) {
/**
* Fetches an Azure AD access token with timeout and retry logic to handle transient failures.
*
* <p>This access token is used internally to obtain a user delegation key from Azure Storage,
* which is then used to generate SAS tokens for client credential vending.
*
* <p>This method implements a defensive strategy against slow or failing cloud provider requests:
*
* <ul>
* <li>Per-attempt timeout (configurable via CLOUD_API_TIMEOUT_MILLIS, default 15000ms)
* <li>Exponential backoff retry (configurable count and initial delay via CLOUD_API_RETRY_COUNT
* and CLOUD_API_RETRY_DELAY_MILLIS, defaults: 3 attempts starting at 2000ms)
* <li>Jitter to prevent thundering herd (configurable via CLOUD_API_RETRY_JITTER_MILLIS, default 500ms)
* </ul>
*
* @param realmConfig the realm configuration to get timeout and retry settings
* @param tenantId the Azure tenant ID
* @return the access token
* @throws RuntimeException if token fetch fails after all retries or times out
*/
private AccessToken getAccessToken(RealmConfig realmConfig, String tenantId) {
int timeoutMillis = realmConfig.getConfig(CLOUD_API_TIMEOUT_MILLIS);
int retryCount = realmConfig.getConfig(CLOUD_API_RETRY_COUNT);
int initialDelayMillis = realmConfig.getConfig(CLOUD_API_RETRY_DELAY_MILLIS);
int jitterMillis = realmConfig.getConfig(CLOUD_API_RETRY_JITTER_MILLIS);
double jitter = jitterMillis / 1000.0; // Convert millis to fraction for jitter factor
Copy link
Contributor

@dimas-b dimas-b Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, I fully understand this logic... per javadoc of reactor.util.retry.RetryBackoffSpec.jitter() the factor applies to the "computed delay", which may not be 1000 ms 🤔 How can the user reason about what the CLOUD_API_RETRY_JITTER_MILLIS value of 750 (for example) means?

Would it not be simpler to use the 0.0-1.0 factor value in the config?


String scope = "https://storage.azure.com/.default";
AccessToken accessToken =
defaultAzureCredential
.getToken(new TokenRequestContext().addScopes(scope).setTenantId(tenantId))
.timeout(Duration.ofMillis(timeoutMillis))
.doOnError(
error ->
LOGGER.warn(
"Error fetching Azure access token for tenant {}: {}",
tenantId,
error.getMessage()))
.retryWhen(
Retry.backoff(retryCount, Duration.ofMillis(initialDelayMillis))
.jitter(jitter)
.filter(this::isRetriableAzureException)
.doBeforeRetry(
retrySignal ->
LOGGER.info(
"Retrying Azure token fetch for tenant {} (attempt {}/{})",
tenantId,
retrySignal.totalRetries() + 1,
retryCount))
.onRetryExhaustedThrow(
(retryBackoffSpec, retrySignal) ->
new RuntimeException(
String.format(
"Azure token fetch exhausted after %d attempts for tenant %s",
retrySignal.totalRetries(), tenantId),
retrySignal.failure())))
.blockOptional()
.orElse(null);

if (accessToken == null) {
throw new RuntimeException("No access token fetched!");
throw new RuntimeException(
String.format("Failed to fetch Azure access token for tenant %s", tenantId));
}
return accessToken;
}

/**
* Determines if an exception is retriable for Azure token requests.
*
* <p>Retries are attempted for:
*
* <ul>
* <li>TimeoutException - per-attempt timeout exceeded
* <li>AADSTS50058 - Token endpoint timeout
* <li>AADSTS50078 - Service temporarily unavailable
* <li>AADSTS700084 - Token refresh required
* <li>503 - Service unavailable
* <li>429 - Too many requests (rate limited)
* </ul>
*
* @param throwable the exception to check
* @return true if the exception should trigger a retry
*/
private boolean isRetriableAzureException(Throwable throwable) {
// Retry on timeout exceptions
if (throwable instanceof java.util.concurrent.TimeoutException) {
return true;
}
// Retry on common transient Azure credential exceptions
String message = throwable.getMessage();
if (message != null) {
return message.contains("AADSTS50058") // Token endpoint timeout
|| message.contains("AADSTS50078") // Service temporarily unavailable
|| message.contains("AADSTS700084") // Token refresh required
|| message.contains("503") // Service unavailable
|| message.contains("429"); // Too many requests
}
return false;
}
}
Loading