From 4c0a331ab355c09ff2b609ddf4f1145273e9a626 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Tue, 19 May 2026 17:57:45 +0700 Subject: [PATCH 1/3] Core: Add pluggable BackoffStrategy SPI for Tasks retries Retry backoff was a single hardcoded exponential-plus-jitter formula inlined in Tasks.runTaskWithRetry, with no way to substitute another algorithm. This makes the backoff a pluggable SPI (org.apache.iceberg.util.BackoffStrategy) while keeping the existing behavior as the default. ExponentialBackoffStrategy reproduces the previous formula and jitter exactly. Tasks.Builder.exponentialBackoff(...) now installs that default and a new backoffStrategy(...) overrides it; a null strategy is a no-op, so behavior is byte-for-byte unchanged unless a strategy is configured. A strategy is selected via one property, retry.strategy-impl, loaded by BackoffStrategies via the standard DynConstructors no-arg + initialize(Map) pattern. Every production .exponentialBackoff(...) site now routes through the SPI and is selectable from its local properties (commits, REST handlers, metadata refresh, status checks, in-memory/DynamoDB locks, Hive Metastore lock via Hadoop conf, REST scan poll, Spark cleanup via FileIO.properties()). OAuth2 token refresh has no usable properties map and stays on the default, still SPI-routed. Adds TestBackoffStrategies, TestBackoffStrategyCommit, and TestTasks coverage. A follow-up will add an AIMD strategy on top of this SPI. Part of #7528 Co-Authored-By: Claude Opus 4.7 --- .../aws/dynamodb/DynamoDbLockManager.java | 1 + .../iceberg/BaseMetastoreOperations.java | 2 + .../iceberg/BaseMetastoreTableOperations.java | 3 + .../apache/iceberg/BaseReplaceSortOrder.java | 2 + .../org/apache/iceberg/BaseTransaction.java | 3 + .../org/apache/iceberg/PropertiesUpdate.java | 2 + .../org/apache/iceberg/RemoveSnapshots.java | 2 + .../java/org/apache/iceberg/SetLocation.java | 2 + .../apache/iceberg/SetSnapshotOperation.java | 2 + .../org/apache/iceberg/SetStatistics.java | 2 + .../org/apache/iceberg/SnapshotProducer.java | 2 + .../apache/iceberg/rest/CatalogHandlers.java | 5 + .../apache/iceberg/rest/RESTTableScan.java | 2 + .../iceberg/util/BackoffStrategies.java | 91 ++++++++++ .../apache/iceberg/util/BackoffStrategy.java | 60 +++++++ .../util/ExponentialBackoffStrategy.java | 50 ++++++ .../org/apache/iceberg/util/LockManagers.java | 7 + .../java/org/apache/iceberg/util/Tasks.java | 27 ++- .../iceberg/view/BaseViewOperations.java | 3 + .../apache/iceberg/view/PropertiesUpdate.java | 2 + .../apache/iceberg/view/SetViewLocation.java | 2 + .../iceberg/view/ViewVersionReplace.java | 2 + .../iceberg/TestBackoffStrategyCommit.java | 92 +++++++++++ .../iceberg/util/TestBackoffStrategies.java | 155 ++++++++++++++++++ .../org/apache/iceberg/util/TestTasks.java | 119 ++++++++++++++ docs/docs/configuration.md | 1 + .../apache/iceberg/hive/MetastoreLock.java | 12 ++ .../spark/source/SparkCleanupUtil.java | 12 ++ .../spark/source/SparkCleanupUtil.java | 12 ++ .../spark/source/SparkCleanupUtil.java | 12 ++ 30 files changed, 682 insertions(+), 7 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/util/BackoffStrategies.java create mode 100644 core/src/main/java/org/apache/iceberg/util/BackoffStrategy.java create mode 100644 core/src/main/java/org/apache/iceberg/util/ExponentialBackoffStrategy.java create mode 100644 core/src/test/java/org/apache/iceberg/TestBackoffStrategyCommit.java create mode 100644 core/src/test/java/org/apache/iceberg/util/TestBackoffStrategies.java diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbLockManager.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbLockManager.java index 80bbfcbee798..1b3d5ca8862b 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbLockManager.java +++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbLockManager.java @@ -178,6 +178,7 @@ public boolean acquire(String entityId, String ownerId) { .throwFailureWhenFinished() .retry(Integer.MAX_VALUE - 1) .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1) + .backoffStrategy(backoffStrategy()) .onlyRetryOn( ConditionalCheckFailedException.class, ProvisionedThroughputExceededException.class, diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java index 0635b56a7fba..60d88ed1ce2a 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -122,6 +123,7 @@ protected CommitStatus checkCommitStatusStrict( .retry(maxAttempts) .suppressFailureWhenFinished() .exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0) + .backoffStrategy(BackoffStrategies.from(properties)) .onFailure( (location, checkException) -> LOG.error("Cannot check if commit to {} exists.", tableOrViewName, checkException)) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index f1223705c11d..357497f2438f 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -34,6 +34,7 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Objects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -194,6 +195,8 @@ protected void refreshFromMetadataLocation( Tasks.foreach(newLocation) .retry(numRetries) .exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */) + .backoffStrategy( + BackoffStrategies.from(currentMetadata != null ? currentMetadata.properties() : null)) .throwFailureWhenFinished() .stopRetryOn(NotFoundException.class) // overridden if shouldRetry is non-null .shouldRetryTest(shouldRetry) diff --git a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java index 2311c1b017d9..6cec75c59632 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java @@ -29,6 +29,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.expressions.Term; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.Tasks; public class BaseReplaceSortOrder implements ReplaceSortOrder { @@ -56,6 +57,7 @@ public void commit() { base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) + .backoffStrategy(BackoffStrategies.from(base.properties())) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 9884ac297079..8c559f55921b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -48,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -309,6 +310,7 @@ private void commitReplaceTransaction(boolean orCreate) { PropertyUtil.propertyAsInt( props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) + .backoffStrategy(BackoffStrategies.from(props)) .onlyRetryOn(CommitFailedException.class) .run( underlyingOps -> { @@ -364,6 +366,7 @@ private void commitSimpleTransaction() { base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) + .backoffStrategy(BackoffStrategies.from(base.properties())) .onlyRetryOn(CommitFailedException.class) .run( underlyingOps -> { diff --git a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java index 9389aec50c0a..4c322e7c696d 100644 --- a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java @@ -33,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.Tasks; class PropertiesUpdate implements UpdateProperties { @@ -106,6 +107,7 @@ public void commit() { base.propertyTryAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), base.propertyTryAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) + .backoffStrategy(BackoffStrategies.from(base.properties())) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 8e320b3d69bd..7e0bf4aed4a5 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; @@ -361,6 +362,7 @@ public void commit() { base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) + .backoffStrategy(BackoffStrategies.from(base.properties())) .onlyRetryOn(CommitFailedException.class) .run( item -> { diff --git a/core/src/main/java/org/apache/iceberg/SetLocation.java b/core/src/main/java/org/apache/iceberg/SetLocation.java index 148e4b8bc8be..c77ac62038c0 100644 --- a/core/src/main/java/org/apache/iceberg/SetLocation.java +++ b/core/src/main/java/org/apache/iceberg/SetLocation.java @@ -28,6 +28,7 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.Tasks; public class SetLocation implements UpdateLocation { @@ -60,6 +61,7 @@ public void commit() { base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) + .backoffStrategy(BackoffStrategies.from(base.properties())) .onlyRetryOn(CommitFailedException.class) .run(taskOps -> taskOps.commit(base, base.updateLocation(newLocation))); } diff --git a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java index 0f80b4e1f233..9a8e66befc4c 100644 --- a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java +++ b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java @@ -31,6 +31,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.SnapshotUtil; import org.apache.iceberg.util.Tasks; @@ -114,6 +115,7 @@ public void commit() { base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) + .backoffStrategy(BackoffStrategies.from(base.properties())) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java b/core/src/main/java/org/apache/iceberg/SetStatistics.java index 01e06fa16bca..9bf3d780bfb4 100644 --- a/core/src/main/java/org/apache/iceberg/SetStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java @@ -32,6 +32,7 @@ import java.util.Optional; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.Tasks; public class SetStatistics implements UpdateStatistics { @@ -70,6 +71,7 @@ public void commit() { ops.current() .propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) + .backoffStrategy(BackoffStrategies.from(ops.current().properties())) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 6ba10e8049f6..8506d90b533b 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -72,6 +72,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.math.IntMath; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.Exceptions; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; @@ -468,6 +469,7 @@ public void commit() { base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) + .backoffStrategy(BackoffStrategies.from(base.properties())) .onlyRetryOn(CommitFailedException.class) .countAttempts(commitMetrics().attempts()) .run( diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index 3a1e62260aae..63fad76499e9 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -101,6 +101,7 @@ import org.apache.iceberg.rest.responses.LoadViewResponse; import org.apache.iceberg.rest.responses.PlanTableScanResponse; import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.view.BaseView; @@ -620,6 +621,8 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, 2.0 /* exponential */) + .backoffStrategy( + BackoffStrategies.from(ops.current() != null ? ops.current().properties() : null)) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { @@ -782,6 +785,8 @@ static ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, 2.0 /* exponential */) + .backoffStrategy( + BackoffStrategies.from(ops.current() != null ? ops.current().properties() : null)) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java index 9fa273ca169f..cc6eacdf353c 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java @@ -52,6 +52,7 @@ import org.apache.iceberg.rest.responses.FetchPlanningResultResponse; import org.apache.iceberg.rest.responses.PlanTableScanResponse; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -262,6 +263,7 @@ private CloseableIterable fetchPlanningResult() { try { Tasks.foreach(planId) .exponentialBackoff(MIN_SLEEP_MS, MAX_SLEEP_MS, maxWaitTimeMs, SCALE_FACTOR) + .backoffStrategy(BackoffStrategies.from(catalogProperties)) .retry(MAX_RETRIES) .onlyRetryOn(NotCompleteException.class) .onFailure( diff --git a/core/src/main/java/org/apache/iceberg/util/BackoffStrategies.java b/core/src/main/java/org/apache/iceberg/util/BackoffStrategies.java new file mode 100644 index 000000000000..45160933fc3d --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/BackoffStrategies.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.util.Map; +import org.apache.iceberg.common.DynConstructors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Resolves and instantiates pluggable {@link BackoffStrategy} implementations. */ +public class BackoffStrategies { + private static final Logger LOG = LoggerFactory.getLogger(BackoffStrategies.class); + + /** + * Property naming the {@link BackoffStrategy} implementation to use for retries. The value is a + * fully-qualified class name with a public no-arg constructor. The same key is honored for every + * retry site (commit, lock, status check, scan, token refresh, file cleanup). When unset, the + * built-in exponential backoff is used. + */ + public static final String STRATEGY_IMPL = "retry.strategy-impl"; + + private BackoffStrategies() {} + + /** + * Resolves the configured strategy from the given properties. + * + * @param properties properties that may contain {@link #STRATEGY_IMPL} + * @return the configured strategy, or {@code null} when {@link #STRATEGY_IMPL} is not set (the + * caller then keeps the built-in exponential backoff) + */ + public static BackoffStrategy from(Map properties) { + String impl = properties == null ? null : properties.get(STRATEGY_IMPL); + if (impl == null) { + return null; + } + + return loadBackoffStrategy(impl, properties); + } + + /** + * Loads and initializes a {@link BackoffStrategy} by class name. + * + * @param impl fully-qualified class name of a {@link BackoffStrategy} with a no-arg constructor + * @param properties properties passed to {@link BackoffStrategy#initialize(Map)} + * @return an initialized strategy instance + */ + public static BackoffStrategy loadBackoffStrategy(String impl, Map properties) { + LOG.info("Loading custom BackoffStrategy implementation: {}", impl); + DynConstructors.Ctor ctor; + try { + ctor = + DynConstructors.builder(BackoffStrategy.class) + .loader(BackoffStrategies.class.getClassLoader()) + .impl(impl) + .buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize BackoffStrategy, missing no-arg constructor: %s", impl), + e); + } + + BackoffStrategy strategy; + try { + strategy = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize BackoffStrategy, %s does not implement BackoffStrategy.", impl), + e); + } + + strategy.initialize(properties); + return strategy; + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/BackoffStrategy.java b/core/src/main/java/org/apache/iceberg/util/BackoffStrategy.java new file mode 100644 index 000000000000..51d4fd9091dd --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/BackoffStrategy.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.util.Map; + +/** + * Strategy that decides how long to wait between retry attempts in {@link Tasks}. + * + *

{@link Tasks} always retries through a {@code BackoffStrategy}. When none is supplied, the + * built-in exponential backoff (configured via {@link Tasks.Builder#exponentialBackoff}) is used. A + * custom strategy can be selected by setting the {@link BackoffStrategies#STRATEGY_IMPL} property + * to the fully-qualified name of a class implementing this interface with a public no-arg + * constructor. + * + *

The total retry duration ({@code commit.retry.total-timeout-ms} and similar) and the maximum + * number of attempts remain {@link Tasks} concerns; a strategy only computes the per-attempt wait. + * + *

Implementations must be thread-safe: the parallel {@link Tasks} execution path shares a single + * strategy instance across worker threads, and a single instance may be reused across many retried + * items. + */ +public interface BackoffStrategy { + + /** + * Returns the time in milliseconds to wait before the given retry attempt. + * + * @param attempt the 1-based attempt number that is about to be retried; callers always pass a + * value {@code >= 1} (the first retry, after the initial failure, is attempt 1), and + * strategies need not handle {@code 0} or negative inputs + * @return the wait time in milliseconds, including any jitter the strategy chooses to apply + */ + long computeBackoff(int attempt); + + /** + * Initializes the strategy from catalog or table properties. + * + *

Called once immediately after construction when the strategy is loaded reflectively by + * {@link BackoffStrategies}. The default implementation does nothing. + * + * @param properties the properties the strategy was selected from + */ + default void initialize(Map properties) {} +} diff --git a/core/src/main/java/org/apache/iceberg/util/ExponentialBackoffStrategy.java b/core/src/main/java/org/apache/iceberg/util/ExponentialBackoffStrategy.java new file mode 100644 index 000000000000..8740f78b1b49 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/ExponentialBackoffStrategy.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * The default {@link BackoffStrategy}: exponential backoff with up to 10% additive jitter. + * + *

This reproduces the historical, hardcoded {@link Tasks} backoff exactly. It is used whenever + * no custom strategy is configured, so existing callers see byte-for-byte identical wait times. + * + *

Immutable and therefore thread-safe. + */ +class ExponentialBackoffStrategy implements BackoffStrategy { + private final long minSleepTimeMs; + private final long maxSleepTimeMs; + private final double scaleFactor; + + ExponentialBackoffStrategy(long minSleepTimeMs, long maxSleepTimeMs, double scaleFactor) { + this.minSleepTimeMs = minSleepTimeMs; + this.maxSleepTimeMs = maxSleepTimeMs; + this.scaleFactor = scaleFactor; + } + + @Override + public long computeBackoff(int attempt) { + int delayMs = + (int) + Math.min(minSleepTimeMs * Math.pow(scaleFactor, attempt - 1), (double) maxSleepTimeMs); + int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) (delayMs * 0.1))); + return (long) delayMs + jitter; + } +} diff --git a/core/src/main/java/org/apache/iceberg/util/LockManagers.java b/core/src/main/java/org/apache/iceberg/util/LockManagers.java index 561d0a8975dd..bdcaf5c51ce3 100644 --- a/core/src/main/java/org/apache/iceberg/util/LockManagers.java +++ b/core/src/main/java/org/apache/iceberg/util/LockManagers.java @@ -85,6 +85,7 @@ public abstract static class BaseLockManager implements LockManager { private long heartbeatIntervalMs; private long heartbeatTimeoutMs; private int heartbeatThreads; + private BackoffStrategy backoffStrategy; public long heartbeatTimeoutMs() { return heartbeatTimeoutMs; @@ -106,6 +107,10 @@ public int heartbeatThreads() { return heartbeatThreads; } + public BackoffStrategy backoffStrategy() { + return backoffStrategy; + } + /** * Returns the shared scheduler for lock heartbeats. * @@ -158,6 +163,7 @@ public void initialize(Map properties) { properties, CatalogProperties.LOCK_HEARTBEAT_THREADS, CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT); + this.backoffStrategy = BackoffStrategies.from(properties); } @Override @@ -243,6 +249,7 @@ public boolean acquire(String entityId, String ownerId) { .onlyRetryOn(IllegalStateException.class) .throwFailureWhenFinished() .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1) + .backoffStrategy(backoffStrategy()) .run(id -> acquireOnce(id, ownerId)); return true; } catch (IllegalStateException e) { diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java b/core/src/main/java/org/apache/iceberg/util/Tasks.java index 29100c6cffb2..c04cdbb68196 100644 --- a/core/src/main/java/org/apache/iceberg/util/Tasks.java +++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java @@ -29,7 +29,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; @@ -87,6 +86,7 @@ public static class Builder { private long maxSleepTimeMs = 600000; // 10 minutes private long maxDurationMs = 600000; // 10 minutes private double scaleFactor = 2.0; // exponential + private BackoffStrategy backoffStrategy = null; private Counter attemptsCounter; public Builder(Iterable items) { @@ -192,6 +192,20 @@ public Builder exponentialBackoff( return this; } + /** + * Use a custom {@link BackoffStrategy} to compute the wait between retries. + * + *

When set to a non-null strategy, it replaces the built-in exponential backoff configured + * via {@link #exponentialBackoff}. A {@code null} argument is ignored and the built-in + * exponential backoff is kept, so callers can pass {@link + * BackoffStrategies#from(java.util.Map)} directly. The total retry duration and maximum number + * of attempts are unaffected. + */ + public Builder backoffStrategy(BackoffStrategy strategy) { + this.backoffStrategy = strategy; + return this; + } + public boolean run(Task task) { return run(task, RuntimeException.class); } @@ -402,6 +416,10 @@ public void run() { @SuppressWarnings("checkstyle:CyclomaticComplexity") private void runTaskWithRetry(Task task, I item) throws E { long start = System.currentTimeMillis(); + BackoffStrategy strategy = + backoffStrategy != null + ? backoffStrategy + : new ExponentialBackoffStrategy(minSleepTimeMs, maxSleepTimeMs, scaleFactor); int attempt = 0; while (true) { attempt += 1; @@ -449,12 +467,7 @@ private void runTaskWithRetry(Task task, I item) thr } } - int delayMs = - (int) - Math.min( - minSleepTimeMs * Math.pow(scaleFactor, attempt - 1), (double) maxSleepTimeMs); - int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) (delayMs * 0.1))); - int sleepTimeMs = delayMs + jitter; + long sleepTimeMs = strategy.computeBackoff(attempt); LOG.warn( "Retrying task after failure: sleepTimeMs={} {}", sleepTimeMs, e.getMessage(), e); diff --git a/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java index 0bab053bcb99..1888c33f3a34 100644 --- a/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java +++ b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java @@ -32,6 +32,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.LocationUtil; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -199,6 +200,8 @@ protected void refreshFromMetadataLocation( Tasks.foreach(newLocation) .retry(numRetries) .exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */) + .backoffStrategy( + BackoffStrategies.from(currentMetadata != null ? currentMetadata.properties() : null)) .throwFailureWhenFinished() .stopRetryOn(NotFoundException.class) // overridden if shouldRetry is non-null .shouldRetryTest(shouldRetry) diff --git a/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java index 48bcfc3a6805..fde01b49f09e 100644 --- a/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java @@ -33,6 +33,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -72,6 +73,7 @@ public void commit() { PropertyUtil.propertyAsInt( base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) + .backoffStrategy(BackoffStrategies.from(base.properties())) .onlyRetryOn(CommitFailedException.class) .run(taskOps -> taskOps.commit(base, internalApply())); } diff --git a/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java b/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java index 481118c85991..c399b16a5811 100644 --- a/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java +++ b/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java @@ -30,6 +30,7 @@ import org.apache.iceberg.UpdateLocation; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -62,6 +63,7 @@ public void commit() { PropertyUtil.propertyAsInt( base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) + .backoffStrategy(BackoffStrategies.from(base.properties())) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> diff --git a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java index 8b3d087940a5..8836d5fd30e0 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java @@ -34,6 +34,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.Tasks; @@ -99,6 +100,7 @@ public void commit() { PropertyUtil.propertyAsInt( base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), 2.0 /* exponential */) + .backoffStrategy(BackoffStrategies.from(base.properties())) .onlyRetryOn(CommitFailedException.class) .run(taskOps -> taskOps.commit(base, internalApply())); } diff --git a/core/src/test/java/org/apache/iceberg/TestBackoffStrategyCommit.java b/core/src/test/java/org/apache/iceberg/TestBackoffStrategyCommit.java new file mode 100644 index 000000000000..17f903cfda05 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestBackoffStrategyCommit.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.iceberg.util.BackoffStrategies; +import org.apache.iceberg.util.BackoffStrategy; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestBackoffStrategyCommit extends TestBase { + + @TestTemplate + public void commitUsesConfiguredBackoffStrategy() { + CountingBackoffStrategy.reset(); + + table + .updateProperties() + .set(BackoffStrategies.STRATEGY_IMPL, CountingBackoffStrategy.class.getName()) + .commit(); + + TestTables.TestTableOperations ops = table.ops(); + ops.failCommits(2); + + table.newFastAppend().appendFile(FILE_B).commit(); + + assertThat(CountingBackoffStrategy.initialized).isTrue(); + // computeBackoff is invoked once before each of the two retried commit attempts + assertThat(CountingBackoffStrategy.COMPUTE_CALLS.get()).isEqualTo(2); + } + + @TestTemplate + public void commitWithoutPropertyDoesNotUseCustomStrategy() { + CountingBackoffStrategy.reset(); + + // keep the retry waits tiny so the default exponential backoff does not slow the test + table + .updateProperties() + .set(TableProperties.COMMIT_MIN_RETRY_WAIT_MS, "0") + .set(TableProperties.COMMIT_MAX_RETRY_WAIT_MS, "0") + .commit(); + + TestTables.TestTableOperations ops = table.ops(); + ops.failCommits(2); + + table.newFastAppend().appendFile(FILE_B).commit(); + + assertThat(CountingBackoffStrategy.COMPUTE_CALLS.get()).isZero(); + } + + /** Records how many times it is consulted; loaded reflectively via a no-arg constructor. */ + public static class CountingBackoffStrategy implements BackoffStrategy { + static final AtomicInteger COMPUTE_CALLS = new AtomicInteger(0); + static volatile boolean initialized = false; + + static void reset() { + COMPUTE_CALLS.set(0); + initialized = false; + } + + @Override + public void initialize(Map properties) { + initialized = true; + } + + @Override + public long computeBackoff(int attempt) { + COMPUTE_CALLS.incrementAndGet(); + return 0L; + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/util/TestBackoffStrategies.java b/core/src/test/java/org/apache/iceberg/util/TestBackoffStrategies.java new file mode 100644 index 000000000000..889fd8b3e011 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/util/TestBackoffStrategies.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; + +public class TestBackoffStrategies { + + @Test + public void defaultExponentialBackoffMatchesLegacyFormula() { + long min = 100; + long max = 60_000; + double scale = 2.0; + ExponentialBackoffStrategy backoff = new ExponentialBackoffStrategy(min, max, scale); + + for (int attempt = 1; attempt <= 12; attempt++) { + int delayMs = (int) Math.min(min * Math.pow(scale, attempt - 1), (double) max); + long jitterBound = Math.max(1, (int) (delayMs * 0.1)); + long wait = backoff.computeBackoff(attempt); + assertThat(wait) + .isGreaterThanOrEqualTo((long) delayMs) + .isLessThan((long) delayMs + jitterBound); + } + } + + @Test + public void defaultExponentialBackoffRespectsMaxCap() { + long max = 1000; + ExponentialBackoffStrategy backoff = new ExponentialBackoffStrategy(100, max, 2.0); + + long wait = backoff.computeBackoff(50); // attempt large enough to exceed the cap + long jitterBound = Math.max(1, (int) (max * 0.1)); + assertThat(wait).isGreaterThanOrEqualTo(max).isLessThan(max + jitterBound); + } + + @Test + public void fromReturnsNullWhenKeyAbsent() { + assertThat(BackoffStrategies.from(null)).isNull(); + assertThat(BackoffStrategies.from(ImmutableMap.of())).isNull(); + assertThat(BackoffStrategies.from(ImmutableMap.of("other", "value"))).isNull(); + } + + @Test + public void fromLoadsAndInitializesConfiguredStrategy() { + Map properties = + ImmutableMap.of( + BackoffStrategies.STRATEGY_IMPL, + FixedBackoffStrategy.class.getName(), + "delay-ms", + "42"); + + BackoffStrategy strategy = BackoffStrategies.from(properties); + + assertThat(strategy).isInstanceOf(FixedBackoffStrategy.class); + assertThat(strategy.computeBackoff(1)).isEqualTo(42L); + assertThat(strategy.computeBackoff(9)).isEqualTo(42L); + } + + @Test + public void loadBackoffStrategyRejectsMissingNoArgConstructor() { + assertThatThrownBy( + () -> + BackoffStrategies.loadBackoffStrategy( + NoNoArgStrategy.class.getName(), ImmutableMap.of())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("missing no-arg constructor"); + } + + @Test + public void loadBackoffStrategyRejectsNonStrategyClass() { + assertThatThrownBy( + () -> BackoffStrategies.loadBackoffStrategy(String.class.getName(), ImmutableMap.of())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("does not implement BackoffStrategy"); + } + + @Test + public void loadBackoffStrategyRejectsUnknownClass() { + assertThatThrownBy( + () -> + BackoffStrategies.loadBackoffStrategy( + "org.apache.iceberg.util.NotAClass", ImmutableMap.of())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("BackoffStrategy"); + } + + @Test + public void loadBackoffStrategyPropagatesInitializeFailures() { + assertThatThrownBy( + () -> + BackoffStrategies.loadBackoffStrategy( + ThrowingInitStrategy.class.getName(), ImmutableMap.of())) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("init failed"); + } + + /** Test strategy with a public no-arg constructor that records its initialize properties. */ + public static class FixedBackoffStrategy implements BackoffStrategy { + private long delayMs = 0; + + @Override + public void initialize(Map properties) { + this.delayMs = Long.parseLong(properties.getOrDefault("delay-ms", "0")); + } + + @Override + public long computeBackoff(int attempt) { + return delayMs; + } + } + + /** Test strategy without a no-arg constructor. */ + public static class NoNoArgStrategy implements BackoffStrategy { + public NoNoArgStrategy(String required) {} + + @Override + public long computeBackoff(int attempt) { + return 0; + } + } + + /** Test strategy whose initialize() throws, to verify the loader propagates the failure. */ + public static class ThrowingInitStrategy implements BackoffStrategy { + @Override + public void initialize(Map properties) { + throw new IllegalStateException("init failed"); + } + + @Override + public long computeBackoff(int attempt) { + return 0; + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/util/TestTasks.java b/core/src/test/java/org/apache/iceberg/util/TestTasks.java index 2ca69c66bb6c..fcaacedc3176 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestTasks.java +++ b/core/src/test/java/org/apache/iceberg/util/TestTasks.java @@ -20,9 +20,20 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import org.apache.iceberg.metrics.Counter; import org.apache.iceberg.metrics.DefaultMetricsContext; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.junit.jupiter.api.Test; public class TestTasks { @@ -57,4 +68,112 @@ public void attemptCounterIsIncreasedWithoutRetries() { assertThat(counter.value()).isOne(); } + + @Test + public void customBackoffStrategyOverridesExponentialBackoff() { + List attempts = Collections.synchronizedList(Lists.newArrayList()); + BackoffStrategy recording = + attempt -> { + attempts.add(attempt); + return 0L; + }; + + int retries = 3; + Counter counter = new DefaultMetricsContext().counter("counter"); + + Tasks.foreach(IntStream.range(0, 1)) + .countAttempts(counter) + .retry(retries) + .backoffStrategy(recording) + .onlyRetryOn(RuntimeException.class) + .run( + x -> { + if (counter.value() <= retries) { + throw new RuntimeException(); + } + }); + + // computeBackoff is called once before each of the 3 retries, with 1-based attempt numbers + assertThat(attempts).containsExactly(1, 2, 3); + } + + @Test + public void nullBackoffStrategyKeepsExponentialDefault() { + int retries = 3; + Counter counter = new DefaultMetricsContext().counter("counter"); + + Tasks.foreach(IntStream.range(0, 1)) + .countAttempts(counter) + .exponentialBackoff(0, 0, 5000, 0) + .retry(retries) + .backoffStrategy(null) // mirrors call sites passing BackoffStrategies.from(...) == null + .onlyRetryOn(RuntimeException.class) + .run( + x -> { + if (counter.value() <= retries) { + throw new RuntimeException(); + } + }); + + assertThat(counter.value()).isEqualTo(retries + 1); + } + + @Test + public void parallelTasksShareStrategyAcrossWorkers() throws InterruptedException { + int items = 20; + Set threadsSeen = ConcurrentHashMap.newKeySet(); + AtomicInteger calls = new AtomicInteger(); + BackoffStrategy recording = + attempt -> { + threadsSeen.add(Thread.currentThread()); + calls.incrementAndGet(); + return 0L; + }; + ConcurrentMap perItemAttempts = Maps.newConcurrentMap(); + ExecutorService svc = Executors.newFixedThreadPool(4); + try { + Tasks.range(items) + .executeWith(svc) + .retry(2) + .backoffStrategy(recording) + .onlyRetryOn(RuntimeException.class) + .run( + i -> { + int attemptCount = + perItemAttempts.computeIfAbsent(i, k -> new AtomicInteger()).incrementAndGet(); + if (attemptCount == 1) { + throw new RuntimeException("first attempt fails"); + } + }); + } finally { + svc.shutdownNow(); + svc.awaitTermination(5, TimeUnit.SECONDS); + } + + assertThat(calls.get()).isEqualTo(items); + assertThat(threadsSeen).hasSizeGreaterThan(1); + } + + @Test + public void backoffStrategyReturnValueDrivesSleepDuration() { + long perRetryMs = 150L; + int retries = 3; + Counter counter = new DefaultMetricsContext().counter("counter"); + + long startMs = System.currentTimeMillis(); + Tasks.foreach(IntStream.range(0, 1)) + .countAttempts(counter) + .retry(retries) + .backoffStrategy(attempt -> perRetryMs) + .onlyRetryOn(RuntimeException.class) + .run( + x -> { + if (counter.value() <= retries) { + throw new RuntimeException(); + } + }); + long elapsedMs = System.currentTimeMillis() - startMs; + + assertThat(elapsedMs).isGreaterThanOrEqualTo((long) (retries * perRetryMs * 0.9)); + } } diff --git a/docs/docs/configuration.md b/docs/docs/configuration.md index 17bf1f8ac0a1..6bf32cdc3388 100644 --- a/docs/docs/configuration.md +++ b/docs/docs/configuration.md @@ -110,6 +110,7 @@ See the [Encryption](encryption.md) document for additional details. | commit.retry.min-wait-ms | 100 | Minimum time in milliseconds to wait before retrying a commit | | commit.retry.max-wait-ms | 60000 (1 min) | Maximum time in milliseconds to wait before retrying a commit | | commit.retry.total-timeout-ms | 1800000 (30 min) | Total retry timeout period in milliseconds for a commit | +| retry.strategy-impl | (exponential backoff) | Fully-qualified name of a custom `org.apache.iceberg.util.BackoffStrategy` used to compute the wait between retries. Global to every `Tasks` retry site (commit, lock acquisition, status checks, REST scan polling, FileIO cleanup); there is no per-site override. Read from whichever properties map is local to the call site (table/view properties, catalog properties, FileIO properties, or Hadoop `Configuration`). When unset, the built-in exponential backoff with jitter is used | | commit.status-check.num-retries | 3 | Number of times to check whether a commit succeeded after a connection is lost before failing due to an unknown commit state | | commit.status-check.min-wait-ms | 1000 (1s) | Minimum time in milliseconds to wait before retrying a status-check | | commit.status-check.max-wait-ms | 60000 (1 min) | Maximum time in milliseconds to wait before retrying a status-check | diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java index c77cfb896c8a..8893a7053408 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java @@ -45,8 +45,11 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.util.BackoffStrategies; +import org.apache.iceberg.util.BackoffStrategy; import org.apache.iceberg.util.Tasks; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -91,6 +94,7 @@ class MetastoreLock implements HiveLock { private final long lockHeartbeatIntervalTime; private final ScheduledExecutorService exitingScheduledExecutorService; private final String agentInfo; + private final BackoffStrategy backoffStrategy; private Optional hmsLockId = Optional.empty(); private ReentrantLock jvmLock = null; @@ -126,6 +130,12 @@ class MetastoreLock implements HiveLock { this.agentInfo = "Iceberg-" + UUID.randomUUID(); + String backoffImpl = conf.get(BackoffStrategies.STRATEGY_IMPL); + this.backoffStrategy = + backoffImpl == null + ? null + : BackoffStrategies.loadBackoffStrategy(backoffImpl, ImmutableMap.of()); + this.exitingScheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder() @@ -204,6 +214,7 @@ private long acquireLock() throws LockException { Tasks.foreach(lockInfo.lockId) .retry(Integer.MAX_VALUE - 100) .exponentialBackoff(lockCheckMinWaitTime, lockCheckMaxWaitTime, lockAcquireTimeout, 1.5) + .backoffStrategy(backoffStrategy) .throwFailureWhenFinished() .onlyRetryOn(WaitingForLockException.class) .run( @@ -293,6 +304,7 @@ private LockInfo createLock() throws LockException { .retry(Integer.MAX_VALUE - 100) .exponentialBackoff( lockCreationMinWaitTime, lockCreationMaxWaitTime, lockCreationTimeout, 2.0) + .backoffStrategy(backoffStrategy) .shouldRetryTest( e -> !interrupted.get() diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java index 8d5972ac7acf..8f921720c3a9 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.ContentFile; @@ -27,6 +28,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; import org.apache.spark.TaskContext; @@ -91,6 +93,15 @@ public static void deleteFiles(String context, FileIO io, List ioProperties(FileIO io) { + try { + return io.properties(); + } catch (UnsupportedOperationException e) { + // FileIO implementations are not required to expose their configuration + return null; + } + } + private static void delete(String context, FileIO io, List paths) { AtomicInteger deletedFilesCount = new AtomicInteger(0); @@ -105,6 +116,7 @@ private static void delete(String context, FileIO io, List paths) { DELETE_MAX_RETRY_WAIT_MS, DELETE_TOTAL_RETRY_TIME_MS, 2 /* exponential */) + .backoffStrategy(BackoffStrategies.from(ioProperties(io))) .run( path -> { io.deleteFile(path); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java index 8d5972ac7acf..8f921720c3a9 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.ContentFile; @@ -27,6 +28,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; import org.apache.spark.TaskContext; @@ -91,6 +93,15 @@ public static void deleteFiles(String context, FileIO io, List ioProperties(FileIO io) { + try { + return io.properties(); + } catch (UnsupportedOperationException e) { + // FileIO implementations are not required to expose their configuration + return null; + } + } + private static void delete(String context, FileIO io, List paths) { AtomicInteger deletedFilesCount = new AtomicInteger(0); @@ -105,6 +116,7 @@ private static void delete(String context, FileIO io, List paths) { DELETE_MAX_RETRY_WAIT_MS, DELETE_TOTAL_RETRY_TIME_MS, 2 /* exponential */) + .backoffStrategy(BackoffStrategies.from(ioProperties(io))) .run( path -> { io.deleteFile(path); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java index 8d5972ac7acf..8f921720c3a9 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.ContentFile; @@ -27,6 +28,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.Tasks; import org.apache.iceberg.util.ThreadPools; import org.apache.spark.TaskContext; @@ -91,6 +93,15 @@ public static void deleteFiles(String context, FileIO io, List ioProperties(FileIO io) { + try { + return io.properties(); + } catch (UnsupportedOperationException e) { + // FileIO implementations are not required to expose their configuration + return null; + } + } + private static void delete(String context, FileIO io, List paths) { AtomicInteger deletedFilesCount = new AtomicInteger(0); @@ -105,6 +116,7 @@ private static void delete(String context, FileIO io, List paths) { DELETE_MAX_RETRY_WAIT_MS, DELETE_TOTAL_RETRY_TIME_MS, 2 /* exponential */) + .backoffStrategy(BackoffStrategies.from(ioProperties(io))) .run( path -> { io.deleteFile(path); From ad5a65cfb30a2c2d877b1ff77941fbead3e39d03 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Wed, 20 May 2026 11:21:30 +0700 Subject: [PATCH 2/3] Core: Split exponentialBackoff into totalTimeoutMs and backoffStrategy Tasks.Builder.exponentialBackoff(min, max, totalTimeout, scale) was carrying two unrelated concerns: the per-attempt formula parameters (min/max/scale) and the total retry budget (totalTimeout). With the BackoffStrategy SPI now mediating the per-attempt wait, every wired call site read as ".exponentialBackoff(...).backoffStrategy(...)" where the first call's formula args became dead weight whenever the second call's strategy actually won. The internal field maxDurationMs and method parameter backoffMaxRetryTimeMs also diverged from the operator-facing property commit.retry.total-timeout-ms. Splits the method into Tasks.Builder.totalTimeoutMs(long) (the retry budget) and Tasks.Builder.backoffStrategy(BackoffStrategy) (the per-attempt wait formula). The internal field is renamed maxDurationMs to totalTimeoutMs to match the operator-facing name. A new BackoffStrategies.from(Map, long, long, double) overload preserves operator-tunable formula defaults: it returns the custom strategy when retry.strategy-impl is set, otherwise a fresh ExponentialBackoffStrategy with the provided min/max/scale. Builder.backoffStrategy(null) is now a true no-op, the Builder field is initialized to a non-null default, and the in-Tasks fallback construction path is gone. Every production call site is migrated. MetastoreLock's two sites with different scales (1.5 for lock-acquire, 2.0 for lock-creation) are preserved via two separate strategy fields. OAuth2Util, which has no properties map at its call site, passes null to the new helper. Revapi accepts the removal against the 1.11.0 baseline with a justification that the behavior is fully recoverable via the new two-call form. Part of #7528 Co-Authored-By: Claude Opus 4.7 (1M context) --- .palantir/revapi.yml | 9 +++ .../aws/dynamodb/DynamoDbLockManager.java | 2 +- .../iceberg/BaseMetastoreOperations.java | 4 +- .../iceberg/BaseMetastoreTableOperations.java | 5 +- .../apache/iceberg/BaseReplaceSortOrder.java | 14 +++-- .../org/apache/iceberg/BaseTransaction.java | 32 ++++++----- .../org/apache/iceberg/PropertiesUpdate.java | 14 +++-- .../org/apache/iceberg/RemoveSnapshots.java | 14 +++-- .../java/org/apache/iceberg/SetLocation.java | 14 +++-- .../apache/iceberg/SetSnapshotOperation.java | 14 +++-- .../org/apache/iceberg/SetStatistics.java | 16 ++++-- .../org/apache/iceberg/SnapshotProducer.java | 14 +++-- .../apache/iceberg/rest/CatalogHandlers.java | 24 ++++---- .../apache/iceberg/rest/RESTTableScan.java | 5 +- .../apache/iceberg/rest/auth/OAuth2Util.java | 13 +++-- .../iceberg/util/BackoffStrategies.java | 24 ++++++++ .../org/apache/iceberg/util/LockManagers.java | 5 +- .../java/org/apache/iceberg/util/Tasks.java | 56 +++++++++---------- .../iceberg/view/BaseViewOperations.java | 5 +- .../apache/iceberg/view/PropertiesUpdate.java | 18 +++--- .../apache/iceberg/view/SetViewLocation.java | 18 +++--- .../iceberg/view/ViewVersionReplace.java | 18 +++--- .../iceberg/util/TestBackoffStrategies.java | 33 +++++++++++ .../org/apache/iceberg/util/TestTasks.java | 10 ++-- .../apache/iceberg/hive/MetastoreLock.java | 22 +++++--- .../spark/source/SparkCleanupUtil.java | 10 ++-- .../spark/source/SparkCleanupUtil.java | 10 ++-- .../spark/source/SparkCleanupUtil.java | 10 ++-- 28 files changed, 267 insertions(+), 166 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index cd4afe6fdc8d..bd86495711e6 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1655,3 +1655,12 @@ acceptedBreaks: \ boolean)" justification: "IncrementalScanEvent should only be constructed by Iceberg code.\ \ Hence the change of constructor params shouldn't affect users" + "1.11.0": + org.apache.iceberg:iceberg-core: + - code: "java.method.removed" + old: "method org.apache.iceberg.util.Tasks.Builder org.apache.iceberg.util.Tasks.Builder::exponentialBackoff(long,\ + \ long, long, double)" + justification: "Split into Tasks.Builder.totalTimeoutMs(long) for the retry\ + \ budget and Tasks.Builder.backoffStrategy(BackoffStrategy) for the per-attempt\ + \ wait formula. The previous 4-arg method conflated two unrelated concerns\ + \ and obscured operator-facing terminology (commit.retry.total-timeout-ms)." diff --git a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbLockManager.java b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbLockManager.java index 1b3d5ca8862b..277bc59a8470 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbLockManager.java +++ b/aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbLockManager.java @@ -177,7 +177,7 @@ public boolean acquire(String entityId, String ownerId) { Tasks.foreach(entityId) .throwFailureWhenFinished() .retry(Integer.MAX_VALUE - 1) - .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1) + .totalTimeoutMs(acquireTimeoutMs()) .backoffStrategy(backoffStrategy()) .onlyRetryOn( ConditionalCheckFailedException.class, diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java index 60d88ed1ce2a..89eeeda5c2ad 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreOperations.java @@ -122,8 +122,8 @@ protected CommitStatus checkCommitStatusStrict( Tasks.foreach(newMetadataLocation) .retry(maxAttempts) .suppressFailureWhenFinished() - .exponentialBackoff(minWaitMs, maxWaitMs, totalRetryMs, 2.0) - .backoffStrategy(BackoffStrategies.from(properties)) + .totalTimeoutMs(totalRetryMs) + .backoffStrategy(BackoffStrategies.from(properties, minWaitMs, maxWaitMs, 2.0)) .onFailure( (location, checkException) -> LOG.error("Cannot check if commit to {} exists.", tableOrViewName, checkException)) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java index 357497f2438f..897673638d9d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreTableOperations.java @@ -194,9 +194,10 @@ protected void refreshFromMetadataLocation( AtomicReference newMetadata = new AtomicReference<>(); Tasks.foreach(newLocation) .retry(numRetries) - .exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */) + .totalTimeoutMs(600000) .backoffStrategy( - BackoffStrategies.from(currentMetadata != null ? currentMetadata.properties() : null)) + BackoffStrategies.from( + currentMetadata != null ? currentMetadata.properties() : null, 100, 5000, 4.0)) .throwFailureWhenFinished() .stopRetryOn(NotFoundException.class) // overridden if shouldRetry is non-null .shouldRetryTest(shouldRetry) diff --git a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java index 6cec75c59632..1a6b2b87d577 100644 --- a/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java +++ b/core/src/main/java/org/apache/iceberg/BaseReplaceSortOrder.java @@ -52,12 +52,14 @@ public SortOrder apply() { public void commit() { Tasks.foreach(ops) .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( - base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .backoffStrategy(BackoffStrategies.from(base.properties())) + .totalTimeoutMs( + base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) + .backoffStrategy( + BackoffStrategies.from( + base.properties(), + base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + 2.0)) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 8c559f55921b..9d9a9939f9f3 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -302,15 +302,17 @@ private void commitReplaceTransaction(boolean orCreate) { try { Tasks.foreach(ops) .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( + .totalTimeoutMs( PropertyUtil.propertyAsInt( - props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - PropertyUtil.propertyAsInt( - props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - PropertyUtil.propertyAsInt( - props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .backoffStrategy(BackoffStrategies.from(props)) + props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) + .backoffStrategy( + BackoffStrategies.from( + props, + PropertyUtil.propertyAsInt( + props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + PropertyUtil.propertyAsInt( + props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + 2.0)) .onlyRetryOn(CommitFailedException.class) .run( underlyingOps -> { @@ -361,12 +363,14 @@ private void commitSimpleTransaction() { try { Tasks.foreach(ops) .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( - base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .backoffStrategy(BackoffStrategies.from(base.properties())) + .totalTimeoutMs( + base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) + .backoffStrategy( + BackoffStrategies.from( + base.properties(), + base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + 2.0)) .onlyRetryOn(CommitFailedException.class) .run( underlyingOps -> { diff --git a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java index 4c322e7c696d..5479f56d0fd5 100644 --- a/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java @@ -102,12 +102,14 @@ public void commit() { // If existing table commit properties in base are corrupted, allow rectification Tasks.foreach(ops) .retry(base.propertyTryAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( - base.propertyTryAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - base.propertyTryAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - base.propertyTryAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .backoffStrategy(BackoffStrategies.from(base.properties())) + .totalTimeoutMs( + base.propertyTryAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) + .backoffStrategy( + BackoffStrategies.from( + base.properties(), + base.propertyTryAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyTryAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + 2.0)) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index 7e0bf4aed4a5..86280f18c156 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -357,12 +357,14 @@ private Set unreferencedSnapshotsToRetain(Collection refs) { public void commit() { Tasks.foreach(ops) .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( - base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .backoffStrategy(BackoffStrategies.from(base.properties())) + .totalTimeoutMs( + base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) + .backoffStrategy( + BackoffStrategies.from( + base.properties(), + base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + 2.0)) .onlyRetryOn(CommitFailedException.class) .run( item -> { diff --git a/core/src/main/java/org/apache/iceberg/SetLocation.java b/core/src/main/java/org/apache/iceberg/SetLocation.java index c77ac62038c0..77b761e7e0d5 100644 --- a/core/src/main/java/org/apache/iceberg/SetLocation.java +++ b/core/src/main/java/org/apache/iceberg/SetLocation.java @@ -56,12 +56,14 @@ public void commit() { TableMetadata base = ops.refresh(); Tasks.foreach(ops) .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( - base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .backoffStrategy(BackoffStrategies.from(base.properties())) + .totalTimeoutMs( + base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) + .backoffStrategy( + BackoffStrategies.from( + base.properties(), + base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + 2.0)) .onlyRetryOn(CommitFailedException.class) .run(taskOps -> taskOps.commit(base, base.updateLocation(newLocation))); } diff --git a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java index 9a8e66befc4c..07310cdf7288 100644 --- a/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java +++ b/core/src/main/java/org/apache/iceberg/SetSnapshotOperation.java @@ -110,12 +110,14 @@ public Snapshot apply() { public void commit() { Tasks.foreach(ops) .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( - base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .backoffStrategy(BackoffStrategies.from(base.properties())) + .totalTimeoutMs( + base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) + .backoffStrategy( + BackoffStrategies.from( + base.properties(), + base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + 2.0)) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { diff --git a/core/src/main/java/org/apache/iceberg/SetStatistics.java b/core/src/main/java/org/apache/iceberg/SetStatistics.java index 9bf3d780bfb4..2955e6fbc66e 100644 --- a/core/src/main/java/org/apache/iceberg/SetStatistics.java +++ b/core/src/main/java/org/apache/iceberg/SetStatistics.java @@ -65,13 +65,17 @@ public List apply() { public void commit() { Tasks.foreach(ops) .retry(ops.current().propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( - ops.current().propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - ops.current().propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + .totalTimeoutMs( ops.current() - .propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .backoffStrategy(BackoffStrategies.from(ops.current().properties())) + .propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) + .backoffStrategy( + BackoffStrategies.from( + ops.current().properties(), + ops.current() + .propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + ops.current() + .propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + 2.0)) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 8506d90b533b..638e0be828a3 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -464,12 +464,14 @@ public void commit() { try { Tasks.foreach(ops) .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( - base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .backoffStrategy(BackoffStrategies.from(base.properties())) + .totalTimeoutMs( + base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) + .backoffStrategy( + BackoffStrategies.from( + base.properties(), + base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + 2.0)) .onlyRetryOn(CommitFailedException.class) .countAttempts(commitMetrics().attempts()) .run( diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index 63fad76499e9..0852f7c0d3d6 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -616,13 +616,13 @@ static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { try { Tasks.foreach(ops) .retry(COMMIT_NUM_RETRIES_DEFAULT) - .exponentialBackoff( - COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, - COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, - COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, - 2.0 /* exponential */) + .totalTimeoutMs(COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT) .backoffStrategy( - BackoffStrategies.from(ops.current() != null ? ops.current().properties() : null)) + BackoffStrategies.from( + ops.current() != null ? ops.current().properties() : null, + COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, + COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + 2.0)) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { @@ -780,13 +780,13 @@ static ViewMetadata commit(ViewOperations ops, UpdateTableRequest request) { try { Tasks.foreach(ops) .retry(COMMIT_NUM_RETRIES_DEFAULT) - .exponentialBackoff( - COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, - COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, - COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, - 2.0 /* exponential */) + .totalTimeoutMs(COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT) .backoffStrategy( - BackoffStrategies.from(ops.current() != null ? ops.current().properties() : null)) + BackoffStrategies.from( + ops.current() != null ? ops.current().properties() : null, + COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, + COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + 2.0)) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> { diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java index cc6eacdf353c..faa29cad17f9 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java @@ -262,8 +262,9 @@ private CloseableIterable fetchPlanningResult() { AtomicReference result = new AtomicReference<>(); try { Tasks.foreach(planId) - .exponentialBackoff(MIN_SLEEP_MS, MAX_SLEEP_MS, maxWaitTimeMs, SCALE_FACTOR) - .backoffStrategy(BackoffStrategies.from(catalogProperties)) + .totalTimeoutMs(maxWaitTimeMs) + .backoffStrategy( + BackoffStrategies.from(catalogProperties, MIN_SLEEP_MS, MAX_SLEEP_MS, SCALE_FACTOR)) .retry(MAX_RETRIES) .onlyRetryOn(NotCompleteException.class) .onFailure( diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java index 7a244bff70f6..42927b6b0b9e 100644 --- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java +++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java @@ -49,6 +49,7 @@ import org.apache.iceberg.rest.RESTClient; import org.apache.iceberg.rest.RESTUtil; import org.apache.iceberg.rest.responses.OAuthTokenResponse; +import org.apache.iceberg.util.BackoffStrategies; import org.apache.iceberg.util.JsonUtil; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.Tasks; @@ -512,11 +513,13 @@ public Pair refresh(RESTClient client) { LOG.warn("Failed to refresh token", err); } }) - .exponentialBackoff( - COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, - COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, - COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT, - 2.0 /* exponential */) + .totalTimeoutMs(COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT) + .backoffStrategy( + BackoffStrategies.from( + null, + COMMIT_MIN_RETRY_WAIT_MS_DEFAULT, + COMMIT_MAX_RETRY_WAIT_MS_DEFAULT, + 2.0)) .run(holder -> holder.set(refreshCurrentToken(client))); if (!isSuccessful || ref.get() == null) { diff --git a/core/src/main/java/org/apache/iceberg/util/BackoffStrategies.java b/core/src/main/java/org/apache/iceberg/util/BackoffStrategies.java index 45160933fc3d..6dc2b849179d 100644 --- a/core/src/main/java/org/apache/iceberg/util/BackoffStrategies.java +++ b/core/src/main/java/org/apache/iceberg/util/BackoffStrategies.java @@ -53,6 +53,30 @@ public static BackoffStrategy from(Map properties) { return loadBackoffStrategy(impl, properties); } + /** + * Resolves the configured strategy from the given properties, falling back to a default {@link + * ExponentialBackoffStrategy} configured from the provided min/max/scale parameters when {@link + * #STRATEGY_IMPL} is not set. Never returns {@code null}. + * + * @param properties properties that may contain {@link #STRATEGY_IMPL}; may be {@code null} + * @param defaultMinSleepTimeMs min sleep for the default exponential fallback + * @param defaultMaxSleepTimeMs max sleep for the default exponential fallback + * @param defaultScaleFactor scale factor for the default exponential fallback + * @return the configured custom strategy if {@link #STRATEGY_IMPL} is set, otherwise a fresh + * {@link ExponentialBackoffStrategy} with the given parameters + */ + public static BackoffStrategy from( + Map properties, + long defaultMinSleepTimeMs, + long defaultMaxSleepTimeMs, + double defaultScaleFactor) { + BackoffStrategy custom = from(properties); + return custom != null + ? custom + : new ExponentialBackoffStrategy( + defaultMinSleepTimeMs, defaultMaxSleepTimeMs, defaultScaleFactor); + } + /** * Loads and initializes a {@link BackoffStrategy} by class name. * diff --git a/core/src/main/java/org/apache/iceberg/util/LockManagers.java b/core/src/main/java/org/apache/iceberg/util/LockManagers.java index bdcaf5c51ce3..741e6e250b02 100644 --- a/core/src/main/java/org/apache/iceberg/util/LockManagers.java +++ b/core/src/main/java/org/apache/iceberg/util/LockManagers.java @@ -163,7 +163,8 @@ public void initialize(Map properties) { properties, CatalogProperties.LOCK_HEARTBEAT_THREADS, CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT); - this.backoffStrategy = BackoffStrategies.from(properties); + this.backoffStrategy = + BackoffStrategies.from(properties, acquireIntervalMs, acquireIntervalMs, 1.0); } @Override @@ -248,7 +249,7 @@ public boolean acquire(String entityId, String ownerId) { .retry(Integer.MAX_VALUE - 1) .onlyRetryOn(IllegalStateException.class) .throwFailureWhenFinished() - .exponentialBackoff(acquireIntervalMs(), acquireIntervalMs(), acquireTimeoutMs(), 1) + .totalTimeoutMs(acquireTimeoutMs()) .backoffStrategy(backoffStrategy()) .run(id -> acquireOnce(id, ownerId)); return true; diff --git a/core/src/main/java/org/apache/iceberg/util/Tasks.java b/core/src/main/java/org/apache/iceberg/util/Tasks.java index c04cdbb68196..9e91d5c209db 100644 --- a/core/src/main/java/org/apache/iceberg/util/Tasks.java +++ b/core/src/main/java/org/apache/iceberg/util/Tasks.java @@ -77,16 +77,20 @@ public static class Builder { private boolean stopAbortsOnFailure = false; // retry settings + private static final long DEFAULT_MIN_SLEEP_TIME_MS = 1000; // 1 second + private static final long DEFAULT_MAX_SLEEP_TIME_MS = 600000; // 10 minutes + private static final long DEFAULT_TOTAL_TIMEOUT_MS = 600000; // 10 minutes + private static final double DEFAULT_SCALE_FACTOR = 2.0; + private final List> stopRetryExceptions = Lists.newArrayList(UnrecoverableException.class); private List> onlyRetryExceptions = null; private Predicate shouldRetryPredicate = null; private int maxAttempts = 1; // not all operations can be retried - private long minSleepTimeMs = 1000; // 1 second - private long maxSleepTimeMs = 600000; // 10 minutes - private long maxDurationMs = 600000; // 10 minutes - private double scaleFactor = 2.0; // exponential - private BackoffStrategy backoffStrategy = null; + private long totalTimeoutMs = DEFAULT_TOTAL_TIMEOUT_MS; + private BackoffStrategy backoffStrategy = + new ExponentialBackoffStrategy( + DEFAULT_MIN_SLEEP_TIME_MS, DEFAULT_MAX_SLEEP_TIME_MS, DEFAULT_SCALE_FACTOR); private Counter attemptsCounter; public Builder(Iterable items) { @@ -180,29 +184,27 @@ public Builder countAttempts(Counter counter) { return this; } - public Builder exponentialBackoff( - long backoffMinSleepTimeMs, - long backoffMaxSleepTimeMs, - long backoffMaxRetryTimeMs, - double backoffScaleFactor) { - this.minSleepTimeMs = backoffMinSleepTimeMs; - this.maxSleepTimeMs = backoffMaxSleepTimeMs; - this.maxDurationMs = backoffMaxRetryTimeMs; - this.scaleFactor = backoffScaleFactor; + /** + * Sets the total retry timeout, after which retries stop even if {@link #retry(int)} attempts + * remain. Corresponds to the {@code commit.retry.total-timeout-ms} family of properties. + */ + public Builder totalTimeoutMs(long ms) { + this.totalTimeoutMs = ms; return this; } /** * Use a custom {@link BackoffStrategy} to compute the wait between retries. * - *

When set to a non-null strategy, it replaces the built-in exponential backoff configured - * via {@link #exponentialBackoff}. A {@code null} argument is ignored and the built-in - * exponential backoff is kept, so callers can pass {@link - * BackoffStrategies#from(java.util.Map)} directly. The total retry duration and maximum number - * of attempts are unaffected. + *

A non-null strategy replaces the built-in exponential backoff default. A {@code null} + * argument is ignored, so callers can pass {@link BackoffStrategies#from(java.util.Map)} + * directly without a separate null check. The total retry duration and maximum number of + * attempts are unaffected. */ public Builder backoffStrategy(BackoffStrategy strategy) { - this.backoffStrategy = strategy; + if (strategy != null) { + this.backoffStrategy = strategy; + } return this; } @@ -416,10 +418,6 @@ public void run() { @SuppressWarnings("checkstyle:CyclomaticComplexity") private void runTaskWithRetry(Task task, I item) throws E { long start = System.currentTimeMillis(); - BackoffStrategy strategy = - backoffStrategy != null - ? backoffStrategy - : new ExponentialBackoffStrategy(minSleepTimeMs, maxSleepTimeMs, scaleFactor); int attempt = 0; while (true) { attempt += 1; @@ -432,10 +430,10 @@ private void runTaskWithRetry(Task task, I item) thr break; } catch (Exception e) { - long durationMs = System.currentTimeMillis() - start; - if (attempt >= maxAttempts || (durationMs > maxDurationMs && attempt > 1)) { - if (durationMs > maxDurationMs) { - LOG.info("Stopping retries after {} ms", durationMs); + long elapsedMs = System.currentTimeMillis() - start; + if (attempt >= maxAttempts || (elapsedMs > totalTimeoutMs && attempt > 1)) { + if (elapsedMs > totalTimeoutMs) { + LOG.info("Stopping retries after {} ms", elapsedMs); } throw e; } @@ -467,7 +465,7 @@ private void runTaskWithRetry(Task task, I item) thr } } - long sleepTimeMs = strategy.computeBackoff(attempt); + long sleepTimeMs = backoffStrategy.computeBackoff(attempt); LOG.warn( "Retrying task after failure: sleepTimeMs={} {}", sleepTimeMs, e.getMessage(), e); diff --git a/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java index 1888c33f3a34..9975269a64fd 100644 --- a/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java +++ b/core/src/main/java/org/apache/iceberg/view/BaseViewOperations.java @@ -199,9 +199,10 @@ protected void refreshFromMetadataLocation( AtomicReference newMetadata = new AtomicReference<>(); Tasks.foreach(newLocation) .retry(numRetries) - .exponentialBackoff(100, 5000, 600000, 4.0 /* 100, 400, 1600, ... */) + .totalTimeoutMs(600000) .backoffStrategy( - BackoffStrategies.from(currentMetadata != null ? currentMetadata.properties() : null)) + BackoffStrategies.from( + currentMetadata != null ? currentMetadata.properties() : null, 100, 5000, 4.0)) .throwFailureWhenFinished() .stopRetryOn(NotFoundException.class) // overridden if shouldRetry is non-null .shouldRetryTest(shouldRetry) diff --git a/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java index fde01b49f09e..ca98f177b332 100644 --- a/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java +++ b/core/src/main/java/org/apache/iceberg/view/PropertiesUpdate.java @@ -65,15 +65,17 @@ public void commit() { .retry( PropertyUtil.propertyAsInt( base.properties(), COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( + .totalTimeoutMs( PropertyUtil.propertyAsInt( - base.properties(), COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - PropertyUtil.propertyAsInt( - base.properties(), COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - PropertyUtil.propertyAsInt( - base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .backoffStrategy(BackoffStrategies.from(base.properties())) + base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) + .backoffStrategy( + BackoffStrategies.from( + base.properties(), + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + 2.0)) .onlyRetryOn(CommitFailedException.class) .run(taskOps -> taskOps.commit(base, internalApply())); } diff --git a/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java b/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java index c399b16a5811..88085efdf04c 100644 --- a/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java +++ b/core/src/main/java/org/apache/iceberg/view/SetViewLocation.java @@ -55,15 +55,17 @@ public void commit() { .retry( PropertyUtil.propertyAsInt( base.properties(), COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( + .totalTimeoutMs( PropertyUtil.propertyAsInt( - base.properties(), COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - PropertyUtil.propertyAsInt( - base.properties(), COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - PropertyUtil.propertyAsInt( - base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .backoffStrategy(BackoffStrategies.from(base.properties())) + base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) + .backoffStrategy( + BackoffStrategies.from( + base.properties(), + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + 2.0)) .onlyRetryOn(CommitFailedException.class) .run( taskOps -> diff --git a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java index 8836d5fd30e0..cc23ecac81c4 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java @@ -92,15 +92,17 @@ public void commit() { .retry( PropertyUtil.propertyAsInt( base.properties(), COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)) - .exponentialBackoff( + .totalTimeoutMs( PropertyUtil.propertyAsInt( - base.properties(), COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), - PropertyUtil.propertyAsInt( - base.properties(), COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), - PropertyUtil.propertyAsInt( - base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT), - 2.0 /* exponential */) - .backoffStrategy(BackoffStrategies.from(base.properties())) + base.properties(), COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT)) + .backoffStrategy( + BackoffStrategies.from( + base.properties(), + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT), + PropertyUtil.propertyAsInt( + base.properties(), COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT), + 2.0)) .onlyRetryOn(CommitFailedException.class) .run(taskOps -> taskOps.commit(base, internalApply())); } diff --git a/core/src/test/java/org/apache/iceberg/util/TestBackoffStrategies.java b/core/src/test/java/org/apache/iceberg/util/TestBackoffStrategies.java index 889fd8b3e011..df6eaff4c85a 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestBackoffStrategies.java +++ b/core/src/test/java/org/apache/iceberg/util/TestBackoffStrategies.java @@ -115,6 +115,39 @@ public void loadBackoffStrategyPropagatesInitializeFailures() { .hasMessageContaining("init failed"); } + @Test + public void fromWithDefaultsReturnsExponentialWhenKeyAbsent() { + BackoffStrategy strategy = BackoffStrategies.from(ImmutableMap.of(), 100, 1000, 2.0); + + assertThat(strategy).isInstanceOf(ExponentialBackoffStrategy.class); + long wait = strategy.computeBackoff(1); + assertThat(wait).isGreaterThanOrEqualTo(100L).isLessThan(110L); + } + + @Test + public void fromWithDefaultsReturnsCustomWhenKeySet() { + Map properties = + ImmutableMap.of( + BackoffStrategies.STRATEGY_IMPL, + FixedBackoffStrategy.class.getName(), + "delay-ms", + "42"); + + BackoffStrategy strategy = BackoffStrategies.from(properties, 100, 1000, 2.0); + + assertThat(strategy).isInstanceOf(FixedBackoffStrategy.class); + assertThat(strategy.computeBackoff(1)).isEqualTo(42L); + } + + @Test + public void fromWithDefaultsAcceptsNullProperties() { + BackoffStrategy strategy = BackoffStrategies.from(null, 100, 1000, 2.0); + + assertThat(strategy).isInstanceOf(ExponentialBackoffStrategy.class); + long wait = strategy.computeBackoff(1); + assertThat(wait).isGreaterThanOrEqualTo(100L).isLessThan(110L); + } + /** Test strategy with a public no-arg constructor that records its initialize properties. */ public static class FixedBackoffStrategy implements BackoffStrategy { private long delayMs = 0; diff --git a/core/src/test/java/org/apache/iceberg/util/TestTasks.java b/core/src/test/java/org/apache/iceberg/util/TestTasks.java index fcaacedc3176..c40ebefda9af 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestTasks.java +++ b/core/src/test/java/org/apache/iceberg/util/TestTasks.java @@ -46,7 +46,8 @@ public void attemptCounterIsIncreasedOnRetries() { Tasks.foreach(IntStream.range(0, 10)) .countAttempts(counter) - .exponentialBackoff(0, 0, 5000, 0) + .totalTimeoutMs(5000) + .backoffStrategy(attempt -> 0L) .retry(retries) .onlyRetryOn(RuntimeException.class) .run( @@ -70,7 +71,7 @@ public void attemptCounterIsIncreasedWithoutRetries() { } @Test - public void customBackoffStrategyOverridesExponentialBackoff() { + public void customBackoffStrategyIsUsed() { List attempts = Collections.synchronizedList(Lists.newArrayList()); BackoffStrategy recording = attempt -> { @@ -98,14 +99,15 @@ public void customBackoffStrategyOverridesExponentialBackoff() { } @Test - public void nullBackoffStrategyKeepsExponentialDefault() { + public void nullBackoffStrategyIsNoOp() { int retries = 3; Counter counter = new DefaultMetricsContext().counter("counter"); Tasks.foreach(IntStream.range(0, 1)) .countAttempts(counter) - .exponentialBackoff(0, 0, 5000, 0) + .totalTimeoutMs(5000) .retry(retries) + .backoffStrategy(attempt -> 0L) // set a non-null strategy first .backoffStrategy(null) // mirrors call sites passing BackoffStrategies.from(...) == null .onlyRetryOn(RuntimeException.class) .run( diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java index 8893a7053408..871342aeb7af 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/MetastoreLock.java @@ -94,7 +94,8 @@ class MetastoreLock implements HiveLock { private final long lockHeartbeatIntervalTime; private final ScheduledExecutorService exitingScheduledExecutorService; private final String agentInfo; - private final BackoffStrategy backoffStrategy; + private final BackoffStrategy acquireBackoff; + private final BackoffStrategy creationBackoff; private Optional hmsLockId = Optional.empty(); private ReentrantLock jvmLock = null; @@ -131,10 +132,18 @@ class MetastoreLock implements HiveLock { this.agentInfo = "Iceberg-" + UUID.randomUUID(); String backoffImpl = conf.get(BackoffStrategies.STRATEGY_IMPL); - this.backoffStrategy = + BackoffStrategy customStrategy = backoffImpl == null ? null : BackoffStrategies.loadBackoffStrategy(backoffImpl, ImmutableMap.of()); + this.acquireBackoff = + customStrategy != null + ? customStrategy + : BackoffStrategies.from(null, lockCheckMinWaitTime, lockCheckMaxWaitTime, 1.5); + this.creationBackoff = + customStrategy != null + ? customStrategy + : BackoffStrategies.from(null, lockCreationMinWaitTime, lockCreationMaxWaitTime, 2.0); this.exitingScheduledExecutorService = Executors.newSingleThreadScheduledExecutor( @@ -213,8 +222,8 @@ private long acquireLock() throws LockException { // boundary issues. Tasks.foreach(lockInfo.lockId) .retry(Integer.MAX_VALUE - 100) - .exponentialBackoff(lockCheckMinWaitTime, lockCheckMaxWaitTime, lockAcquireTimeout, 1.5) - .backoffStrategy(backoffStrategy) + .totalTimeoutMs(lockAcquireTimeout) + .backoffStrategy(acquireBackoff) .throwFailureWhenFinished() .onlyRetryOn(WaitingForLockException.class) .run( @@ -302,9 +311,8 @@ private LockInfo createLock() throws LockException { AtomicBoolean interrupted = new AtomicBoolean(false); Tasks.foreach(lockRequest) .retry(Integer.MAX_VALUE - 100) - .exponentialBackoff( - lockCreationMinWaitTime, lockCreationMaxWaitTime, lockCreationTimeout, 2.0) - .backoffStrategy(backoffStrategy) + .totalTimeoutMs(lockCreationTimeout) + .backoffStrategy(creationBackoff) .shouldRetryTest( e -> !interrupted.get() diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java index 8f921720c3a9..30ad23eab15e 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java @@ -111,12 +111,10 @@ private static void delete(String context, FileIO io, List paths) { .suppressFailureWhenFinished() .onFailure((path, exc) -> LOG.warn("Failed to delete {} ({})", path, context, exc)) .retry(DELETE_NUM_RETRIES) - .exponentialBackoff( - DELETE_MIN_RETRY_WAIT_MS, - DELETE_MAX_RETRY_WAIT_MS, - DELETE_TOTAL_RETRY_TIME_MS, - 2 /* exponential */) - .backoffStrategy(BackoffStrategies.from(ioProperties(io))) + .totalTimeoutMs(DELETE_TOTAL_RETRY_TIME_MS) + .backoffStrategy( + BackoffStrategies.from( + ioProperties(io), DELETE_MIN_RETRY_WAIT_MS, DELETE_MAX_RETRY_WAIT_MS, 2)) .run( path -> { io.deleteFile(path); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java index 8f921720c3a9..30ad23eab15e 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java @@ -111,12 +111,10 @@ private static void delete(String context, FileIO io, List paths) { .suppressFailureWhenFinished() .onFailure((path, exc) -> LOG.warn("Failed to delete {} ({})", path, context, exc)) .retry(DELETE_NUM_RETRIES) - .exponentialBackoff( - DELETE_MIN_RETRY_WAIT_MS, - DELETE_MAX_RETRY_WAIT_MS, - DELETE_TOTAL_RETRY_TIME_MS, - 2 /* exponential */) - .backoffStrategy(BackoffStrategies.from(ioProperties(io))) + .totalTimeoutMs(DELETE_TOTAL_RETRY_TIME_MS) + .backoffStrategy( + BackoffStrategies.from( + ioProperties(io), DELETE_MIN_RETRY_WAIT_MS, DELETE_MAX_RETRY_WAIT_MS, 2)) .run( path -> { io.deleteFile(path); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java index 8f921720c3a9..30ad23eab15e 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCleanupUtil.java @@ -111,12 +111,10 @@ private static void delete(String context, FileIO io, List paths) { .suppressFailureWhenFinished() .onFailure((path, exc) -> LOG.warn("Failed to delete {} ({})", path, context, exc)) .retry(DELETE_NUM_RETRIES) - .exponentialBackoff( - DELETE_MIN_RETRY_WAIT_MS, - DELETE_MAX_RETRY_WAIT_MS, - DELETE_TOTAL_RETRY_TIME_MS, - 2 /* exponential */) - .backoffStrategy(BackoffStrategies.from(ioProperties(io))) + .totalTimeoutMs(DELETE_TOTAL_RETRY_TIME_MS) + .backoffStrategy( + BackoffStrategies.from( + ioProperties(io), DELETE_MIN_RETRY_WAIT_MS, DELETE_MAX_RETRY_WAIT_MS, 2)) .run( path -> { io.deleteFile(path); From f1fc5f109694b14495d2cb9720380c2383194e5f Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Wed, 20 May 2026 11:31:55 +0700 Subject: [PATCH 3/3] Core: Fix stale BackoffStrategy javadoc link MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The class-level javadoc referenced Tasks.Builder#exponentialBackoff, which was removed in the previous commit when the method was split into totalTimeoutMs and backoffStrategy. The broken link caused :iceberg-core:javadoc to fail on CI. Updates the javadoc to point at BackoffStrategies.from(Map, long, long, double) — the helper call sites use to resolve the strategy — and at the ExponentialBackoffStrategy default. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../java/org/apache/iceberg/util/BackoffStrategy.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/util/BackoffStrategy.java b/core/src/main/java/org/apache/iceberg/util/BackoffStrategy.java index 51d4fd9091dd..075fdd54a12d 100644 --- a/core/src/main/java/org/apache/iceberg/util/BackoffStrategy.java +++ b/core/src/main/java/org/apache/iceberg/util/BackoffStrategy.java @@ -24,10 +24,10 @@ * Strategy that decides how long to wait between retry attempts in {@link Tasks}. * *

{@link Tasks} always retries through a {@code BackoffStrategy}. When none is supplied, the - * built-in exponential backoff (configured via {@link Tasks.Builder#exponentialBackoff}) is used. A - * custom strategy can be selected by setting the {@link BackoffStrategies#STRATEGY_IMPL} property - * to the fully-qualified name of a class implementing this interface with a public no-arg - * constructor. + * built-in {@link ExponentialBackoffStrategy} is used. Call sites typically resolve the strategy + * via {@link BackoffStrategies#from(java.util.Map, long, long, double)} so operators can supply a + * custom implementation by setting the {@link BackoffStrategies#STRATEGY_IMPL} property to the + * fully-qualified name of a class implementing this interface with a public no-arg constructor. * *

The total retry duration ({@code commit.retry.total-timeout-ms} and similar) and the maximum * number of attempts remain {@link Tasks} concerns; a strategy only computes the per-attempt wait.