Skip to content

Commit

Permalink
[FLINK-33702][core] Add the IncrementalDelayRetryStrategy implementat…
Browse files Browse the repository at this point in the history
…ion of RetryStrategy
  • Loading branch information
xiangyuf committed Dec 4, 2023
1 parent 591e886 commit 4a40f07
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util.concurrent;

import org.apache.flink.util.Preconditions;

import java.time.Duration;

/** An implementation of {@link RetryStrategy} that retries at an incremental delay with a cap. */
public class IncrementalDelayRetryStrategy implements RetryStrategy {
private final int remainingRetries;
private final Duration currentRetryDelay;
private final Duration increment;
private final Duration maxRetryDelay;

/**
* @param remainingRetries number of times to retry
* @param currentRetryDelay the current delay between retries
* @param increment the delay increment between retries
* @param maxRetryDelay the max delay between retries
*/
public IncrementalDelayRetryStrategy(
int remainingRetries,
Duration currentRetryDelay,
Duration increment,
Duration maxRetryDelay) {
Preconditions.checkArgument(
remainingRetries >= 0, "The number of retries must be greater or equal to 0.");
this.remainingRetries = remainingRetries;
Preconditions.checkArgument(
currentRetryDelay.toMillis() >= 0, "The currentRetryDelay must be positive");
this.currentRetryDelay = currentRetryDelay;
Preconditions.checkArgument(
increment.toMillis() >= 0, "The delay increment must be greater or equal to 0.");
this.increment = increment;
Preconditions.checkArgument(
maxRetryDelay.toMillis() >= 0, "The maxRetryDelay must be positive");
this.maxRetryDelay = maxRetryDelay;
}

@Override
public int getNumRemainingRetries() {
return remainingRetries;
}

@Override
public Duration getRetryDelay() {
return currentRetryDelay;
}

@Override
public RetryStrategy getNextRetryStrategy() {
int nextRemainingRetries = remainingRetries - 1;
Preconditions.checkState(
nextRemainingRetries >= 0, "The number of remaining retries must not be negative");
long nextRetryDelayMillis =
Math.min(currentRetryDelay.plus(increment).toMillis(), maxRetryDelay.toMillis());
return new IncrementalDelayRetryStrategy(
nextRemainingRetries,
Duration.ofMillis(nextRetryDelayMillis),
increment,
maxRetryDelay);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,5 @@ public void testRetryFailure() {
new FixedRetryStrategy(0, Duration.ofMillis(5L))
.getNextRetryStrategy())
.isInstanceOf(IllegalStateException.class);
;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util.concurrent;

import org.apache.flink.util.TestLogger;

import org.junit.jupiter.api.Test;

import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link IncrementalDelayRetryStrategy}. */
public class IncrementalDelayRetryStrategyTest extends TestLogger {

@Test
public void testGettersNotCapped() throws Exception {
RetryStrategy retryStrategy =
new IncrementalDelayRetryStrategy(
10, Duration.ofMillis(5L), Duration.ofMillis(4L), Duration.ofMillis(20L));
assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(10);
assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(5L));

RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy();
assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(9);
assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(9L));
}

@Test
public void testGettersHitCapped() throws Exception {
RetryStrategy retryStrategy =
new IncrementalDelayRetryStrategy(
5, Duration.ofMillis(15L), Duration.ofMillis(10L), Duration.ofMillis(20L));
assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(5);
assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(15L));

RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy();
assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(4);
assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L));
}

@Test
public void testGettersAtCap() throws Exception {
RetryStrategy retryStrategy =
new IncrementalDelayRetryStrategy(
5, Duration.ofMillis(20L), Duration.ofMillis(5L), Duration.ofMillis(20L));
assertThat(retryStrategy.getNumRemainingRetries()).isEqualTo(5);
assertThat(retryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L));

RetryStrategy nextRetryStrategy = retryStrategy.getNextRetryStrategy();
assertThat(nextRetryStrategy.getNumRemainingRetries()).isEqualTo(4);
assertThat(nextRetryStrategy.getRetryDelay()).isEqualTo(Duration.ofMillis(20L));
}

/** Tests that getting a next RetryStrategy below zero remaining retries fails. */
@Test
public void testRetryFailure() {
assertThatThrownBy(
() ->
new IncrementalDelayRetryStrategy(
0,
Duration.ofMillis(20L),
Duration.ofMillis(5L),
Duration.ofMillis(20L))
.getNextRetryStrategy())
.isInstanceOf(IllegalStateException.class);
}
}

0 comments on commit 4a40f07

Please sign in to comment.