diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 1772f47256..176d6bf488 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -438,4 +438,46 @@ public static void enforceFeatureEnabledOrThrow( "If set to true (default), allow credential vending for external catalogs. Note this requires ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING to be true first.") .defaultValue(true) .buildFeatureConfiguration(); + + public static final FeatureConfiguration CLOUD_API_TIMEOUT_MILLIS = + PolarisConfiguration.builder() + .key("CLOUD_API_TIMEOUT_MILLIS") + .description( + "Timeout in milliseconds for cloud provider API requests. " + + "Prevents indefinite blocking when cloud provider endpoints are slow or unresponsive. " + + "Used internally by storage integrations for credential vending and other cloud operations. " + + "Currently only used by Azure storage integration (not yet implemented for AWS S3 or GCP).") + .defaultValue(15000) + .buildFeatureConfiguration(); + + public static final FeatureConfiguration CLOUD_API_RETRY_COUNT = + PolarisConfiguration.builder() + .key("CLOUD_API_RETRY_COUNT") + .description( + "Number of retry attempts for cloud provider API requests. " + + "Uses exponential backoff with jitter to handle transient failures. " + + "Currently only used by Azure storage integration (not yet implemented for AWS S3 or GCP).") + .defaultValue(3) + .buildFeatureConfiguration(); + + public static final FeatureConfiguration CLOUD_API_RETRY_DELAY_MILLIS = + PolarisConfiguration.builder() + .key("CLOUD_API_RETRY_DELAY_MILLIS") + .description( + "Initial delay in milliseconds before first retry for cloud provider API requests. " + + "Delay doubles with each retry (exponential backoff). " + + "Currently only used by Azure storage integration (not yet implemented for AWS S3 or GCP).") + .defaultValue(2000) + .buildFeatureConfiguration(); + + public static final FeatureConfiguration CLOUD_API_RETRY_JITTER_MILLIS = + PolarisConfiguration.builder() + .key("CLOUD_API_RETRY_JITTER_MILLIS") + .description( + "Maximum jitter in milliseconds added to retry delays for cloud provider API requests. " + + "Helps prevent thundering herd when multiple requests fail simultaneously. " + + "Actual jitter is random between 0 and this value. " + + "Currently only used by Azure storage integration (not yet implemented for AWS S3 or GCP).") + .defaultValue(500) + .buildFeatureConfiguration(); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java index 0b189b3116..970acb9e22 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java @@ -18,6 +18,10 @@ */ package org.apache.polaris.core.storage.azure; +import static org.apache.polaris.core.config.FeatureConfiguration.CLOUD_API_RETRY_DELAY_MILLIS; +import static org.apache.polaris.core.config.FeatureConfiguration.CLOUD_API_RETRY_JITTER_MILLIS; +import static org.apache.polaris.core.config.FeatureConfiguration.CLOUD_API_RETRY_COUNT; +import static org.apache.polaris.core.config.FeatureConfiguration.CLOUD_API_TIMEOUT_MILLIS; import static org.apache.polaris.core.config.FeatureConfiguration.STORAGE_CREDENTIAL_DURATION_SECONDS; import com.azure.core.credential.AccessToken; @@ -39,6 +43,7 @@ import com.azure.storage.file.datalake.sas.PathSasPermission; import com.google.common.annotations.VisibleForTesting; import jakarta.annotation.Nonnull; +import java.time.Duration; import java.time.Instant; import java.time.OffsetDateTime; import java.time.Period; @@ -55,6 +60,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; /** Azure credential vendor that supports generating SAS token */ public class AzureCredentialsStorageIntegration @@ -120,7 +126,7 @@ public StorageAccessConfig getSubscopedCreds( OffsetDateTime.ofInstant( start.plusSeconds(3600), ZoneOffset.UTC); // 1 hr to sync with AWS and GCP Access token - AccessToken accessToken = getAccessToken(config().getTenantId()); + AccessToken accessToken = getAccessToken(realmConfig, config().getTenantId()); // Get user delegation key. // Set the new generated user delegation key expiry to 7 days and minute 1 min // Azure strictly requires the end time to be <= 7 days from the current time, -1 min to avoid @@ -312,16 +318,103 @@ private void validateAccountAndContainer( }); } - private AccessToken getAccessToken(String tenantId) { + /** + * Fetches an Azure AD access token with timeout and retry logic to handle transient failures. + * + *

This access token is used internally to obtain a user delegation key from Azure Storage, + * which is then used to generate SAS tokens for client credential vending. + * + *

This method implements a defensive strategy against slow or failing cloud provider requests: + * + *

    + *
  • Per-attempt timeout (configurable via CLOUD_API_TIMEOUT_MILLIS, default 15000ms) + *
  • Exponential backoff retry (configurable count and initial delay via CLOUD_API_RETRY_COUNT + * and CLOUD_API_RETRY_DELAY_MILLIS, defaults: 3 attempts starting at 2000ms) + *
  • Jitter to prevent thundering herd (configurable via CLOUD_API_RETRY_JITTER_MILLIS, default 500ms) + *
+ * + * @param realmConfig the realm configuration to get timeout and retry settings + * @param tenantId the Azure tenant ID + * @return the access token + * @throws RuntimeException if token fetch fails after all retries or times out + */ + private AccessToken getAccessToken(RealmConfig realmConfig, String tenantId) { + int timeoutMillis = realmConfig.getConfig(CLOUD_API_TIMEOUT_MILLIS); + int retryCount = realmConfig.getConfig(CLOUD_API_RETRY_COUNT); + int initialDelayMillis = realmConfig.getConfig(CLOUD_API_RETRY_DELAY_MILLIS); + int jitterMillis = realmConfig.getConfig(CLOUD_API_RETRY_JITTER_MILLIS); + double jitter = jitterMillis / 1000.0; // Convert millis to fraction for jitter factor + String scope = "https://storage.azure.com/.default"; AccessToken accessToken = defaultAzureCredential .getToken(new TokenRequestContext().addScopes(scope).setTenantId(tenantId)) + .timeout(Duration.ofMillis(timeoutMillis)) + .doOnError( + error -> + LOGGER.warn( + "Error fetching Azure access token for tenant {}: {}", + tenantId, + error.getMessage())) + .retryWhen( + Retry.backoff(retryCount, Duration.ofMillis(initialDelayMillis)) + .jitter(jitter) + .filter(this::isRetriableAzureException) + .doBeforeRetry( + retrySignal -> + LOGGER.info( + "Retrying Azure token fetch for tenant {} (attempt {}/{})", + tenantId, + retrySignal.totalRetries() + 1, + retryCount)) + .onRetryExhaustedThrow( + (retryBackoffSpec, retrySignal) -> + new RuntimeException( + String.format( + "Azure token fetch exhausted after %d attempts for tenant %s", + retrySignal.totalRetries(), tenantId), + retrySignal.failure()))) .blockOptional() .orElse(null); + if (accessToken == null) { - throw new RuntimeException("No access token fetched!"); + throw new RuntimeException( + String.format("Failed to fetch Azure access token for tenant %s", tenantId)); } return accessToken; } + + /** + * Determines if an exception is retriable for Azure token requests. + * + *

Retries are attempted for: + * + *

    + *
  • TimeoutException - per-attempt timeout exceeded + *
  • AADSTS50058 - Token endpoint timeout + *
  • AADSTS50078 - Service temporarily unavailable + *
  • AADSTS700084 - Token refresh required + *
  • 503 - Service unavailable + *
  • 429 - Too many requests (rate limited) + *
+ * + * @param throwable the exception to check + * @return true if the exception should trigger a retry + */ + private boolean isRetriableAzureException(Throwable throwable) { + // Retry on timeout exceptions + if (throwable instanceof java.util.concurrent.TimeoutException) { + return true; + } + // Retry on common transient Azure credential exceptions + String message = throwable.getMessage(); + if (message != null) { + return message.contains("AADSTS50058") // Token endpoint timeout + || message.contains("AADSTS50078") // Service temporarily unavailable + || message.contains("AADSTS700084") // Token refresh required + || message.contains("503") // Service unavailable + || message.contains("429"); // Too many requests + } + return false; + } }