From 45ddafd6cba58864dd9293be72424f937b379f35 Mon Sep 17 00:00:00 2001 From: Bo WANG Date: Thu, 30 May 2019 17:06:09 +0800 Subject: [PATCH] [FLINK-12670][runtime] Implement FailureRateRestartBackoffTimeStrategy * use ManualClock in unit test and add configuration description This closes #8573. --- ...t_backoff_time_strategy_configuration.html | 26 +++ .../RestartBackoffTimeStrategyOptions.java | 56 +++++++ ...FailureRateRestartBackoffTimeStrategy.java | 149 ++++++++++++++++++ ...ureRateRestartBackoffTimeStrategyTest.java | 83 ++++++++++ 4 files changed, 314 insertions(+) create mode 100644 docs/_includes/generated/restart_backoff_time_strategy_configuration.html create mode 100644 flink-core/src/main/java/org/apache/flink/configuration/RestartBackoffTimeStrategyOptions.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureRateRestartBackoffTimeStrategy.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureRateRestartBackoffTimeStrategyTest.java diff --git a/docs/_includes/generated/restart_backoff_time_strategy_configuration.html b/docs/_includes/generated/restart_backoff_time_strategy_configuration.html new file mode 100644 index 0000000000000..0c5c2bc7e199d --- /dev/null +++ b/docs/_includes/generated/restart_backoff_time_strategy_configuration.html @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + + + + + + +
KeyDefaultDescription
restart-backoff-time-strategy.failure-rate.backoff-time
0Backoff time in milliseconds between two consecutive restart attempts.
restart-backoff-time-strategy.failure-rate.failure-rate-interval
60000Time interval in milliseconds for measuring failure rate.
restart-backoff-time-strategy.failure-rate.max-failures-per-interval
1Maximum number of failures in given time interval before failing a job.
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/RestartBackoffTimeStrategyOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/RestartBackoffTimeStrategyOptions.java new file mode 100644 index 0000000000000..18ebec6f368a9 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/configuration/RestartBackoffTimeStrategyOptions.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Configuration options for the RestartBackoffTimeStrategy. + */ +@PublicEvolving +public class RestartBackoffTimeStrategyOptions { + /** + * Maximum number of failures in given time interval {@link #RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL} + * before failing a job in FailureRateRestartBackoffTimeStrategy. + */ + @PublicEvolving + public static final ConfigOption RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL = ConfigOptions + .key("restart-backoff-time-strategy.failure-rate.max-failures-per-interval") + .defaultValue(1) + .withDescription("Maximum number of failures in given time interval before failing a job."); + + /** + * Time interval in which greater amount of failures than {@link #RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL} + * causes job fail in FailureRateRestartBackoffTimeStrategy. + */ + @PublicEvolving + public static final ConfigOption RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL = ConfigOptions + .key("restart-backoff-time-strategy.failure-rate.failure-rate-interval") + .defaultValue(60_000L) + .withDescription("Time interval in milliseconds for measuring failure rate."); + + /** + * Backoff time between two consecutive restart attempts in FailureRateRestartBackoffTimeStrategy. + */ + @PublicEvolving + public static final ConfigOption RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_FAILURE_RATE_BACKOFF_TIME = ConfigOptions + .key("restart-backoff-time-strategy.failure-rate.backoff-time") + .defaultValue(0L) + .withDescription("Backoff time in milliseconds between two consecutive restart attempts."); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureRateRestartBackoffTimeStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureRateRestartBackoffTimeStrategy.java new file mode 100644 index 0000000000000..8404ddaf1b28f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureRateRestartBackoffTimeStrategy.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.clock.Clock; +import org.apache.flink.runtime.util.clock.SystemClock; + +import java.util.ArrayDeque; +import java.util.Deque; + +import static org.apache.flink.configuration.RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_FAILURE_RATE_BACKOFF_TIME; +import static org.apache.flink.configuration.RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL; +import static org.apache.flink.configuration.RestartBackoffTimeStrategyOptions.RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Restart strategy which can restart when failure rate is not exceeded. + */ +public class FailureRateRestartBackoffTimeStrategy implements RestartBackoffTimeStrategy { + + private final long failuresIntervalMS; + + private final long backoffTimeMS; + + private final int maxFailuresPerInterval; + + private final Deque failureTimestamps; + + private final String strategyString; + + private final Clock clock; + + FailureRateRestartBackoffTimeStrategy(Clock clock, int maxFailuresPerInterval, long failuresIntervalMS, long backoffTimeMS) { + + checkArgument(maxFailuresPerInterval > 0, "Maximum number of restart attempts per time unit must be greater than 0."); + checkArgument(failuresIntervalMS > 0, "Failures interval must be greater than 0 ms."); + checkArgument(backoffTimeMS >= 0, "Backoff time must be at least 0 ms."); + + this.failuresIntervalMS = failuresIntervalMS; + this.backoffTimeMS = backoffTimeMS; + this.maxFailuresPerInterval = maxFailuresPerInterval; + this.failureTimestamps = new ArrayDeque<>(maxFailuresPerInterval); + this.strategyString = generateStrategyString(); + this.clock = checkNotNull(clock); + } + + @Override + public boolean canRestart() { + if (isFailureTimestampsQueueFull()) { + Long now = clock.absoluteTimeMillis(); + Long earliestFailure = failureTimestamps.peek(); + + return (now - earliestFailure) > failuresIntervalMS; + } else { + return true; + } + } + + @Override + public long getBackoffTime() { + return backoffTimeMS; + } + + @Override + public void notifyFailure(Throwable cause) { + if (isFailureTimestampsQueueFull()) { + failureTimestamps.remove(); + } + failureTimestamps.add(clock.absoluteTimeMillis()); + } + + @Override + public String toString() { + return strategyString; + } + + private boolean isFailureTimestampsQueueFull() { + return failureTimestamps.size() >= maxFailuresPerInterval; + } + + private String generateStrategyString() { + StringBuilder str = new StringBuilder("FailureRateRestartBackoffTimeStrategy("); + str.append("FailureRateRestartBackoffTimeStrategy(failuresIntervalMS="); + str.append(failuresIntervalMS); + str.append(",backoffTimeMS="); + str.append(backoffTimeMS); + str.append(",maxFailuresPerInterval="); + str.append(maxFailuresPerInterval); + str.append(")"); + + return str.toString(); + } + + public static FailureRateRestartBackoffTimeStrategyFactory createFactory(final Configuration configuration) { + return new FailureRateRestartBackoffTimeStrategyFactory( + configuration.getInteger(RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL), + configuration.getLong(RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL), + configuration.getLong(RESTART_BACKOFF_TIME_STRATEGY_FAILURE_RATE_FAILURE_RATE_BACKOFF_TIME)); + } + + /** + * The factory for creating {@link FailureRateRestartBackoffTimeStrategy}. + */ + public static class FailureRateRestartBackoffTimeStrategyFactory implements RestartBackoffTimeStrategy.Factory { + + private final int maxFailuresPerInterval; + + private final long failuresIntervalMS; + + private final long backoffTimeMS; + + public FailureRateRestartBackoffTimeStrategyFactory( + int maxFailuresPerInterval, + long failuresIntervalMS, + long backoffTimeMS) { + + this.maxFailuresPerInterval = maxFailuresPerInterval; + this.failuresIntervalMS = failuresIntervalMS; + this.backoffTimeMS = backoffTimeMS; + } + + @Override + public RestartBackoffTimeStrategy create() { + return new FailureRateRestartBackoffTimeStrategy( + SystemClock.getInstance(), + maxFailuresPerInterval, + failuresIntervalMS, + backoffTimeMS); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureRateRestartBackoffTimeStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureRateRestartBackoffTimeStrategyTest.java new file mode 100644 index 0000000000000..a891f1c9cc571 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureRateRestartBackoffTimeStrategyTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph.failover.flip1; + +import org.apache.flink.runtime.util.clock.ManualClock; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for {@link FailureRateRestartBackoffTimeStrategy}. + */ +public class FailureRateRestartBackoffTimeStrategyTest extends TestLogger { + + private final Exception failure = new Exception(); + + @Test + public void testManyFailuresWithinRate() { + final int numFailures = 3; + final long intervalMS = 1L; + + ManualClock clock = new ManualClock(); + + final FailureRateRestartBackoffTimeStrategy restartStrategy = + new FailureRateRestartBackoffTimeStrategy(clock, 1, intervalMS, 0); + + for (int failuresLeft = numFailures; failuresLeft > 0; failuresLeft--) { + assertTrue(restartStrategy.canRestart()); + restartStrategy.notifyFailure(failure); + clock.advanceTime(intervalMS + 1, TimeUnit.MILLISECONDS); + } + + assertTrue(restartStrategy.canRestart()); + } + + @Test + public void testFailuresExceedingRate() { + final int numFailures = 3; + final long intervalMS = 10_000L; + + final FailureRateRestartBackoffTimeStrategy restartStrategy = + new FailureRateRestartBackoffTimeStrategy(new ManualClock(), numFailures, intervalMS, 0); + + for (int failuresLeft = numFailures; failuresLeft > 0; failuresLeft--) { + assertTrue(restartStrategy.canRestart()); + restartStrategy.notifyFailure(failure); + } + + assertFalse(restartStrategy.canRestart()); + } + + @Test + public void testBackoffTime() { + final long backoffTimeMS = 10_000L; + + final FailureRateRestartBackoffTimeStrategy restartStrategy = + new FailureRateRestartBackoffTimeStrategy(new ManualClock(), 1, 1, backoffTimeMS); + + assertEquals(backoffTimeMS, restartStrategy.getBackoffTime()); + } +}