Skip to content

Commit

Permalink
[FLINK-13883] Remove unused AkkaOptions related to Akka's death watch
Browse files Browse the repository at this point in the history
We no longer use Akka's death watch. Hence, this commit removes the death
watch related configuration options AkkaOptions#WATCH_HEARTBEAT_INTERVAL,
AkkaOptions#WATCH_THRESHOLD and AkkaOptions#WATCH_HEARTBEAT_PAUSE.
  • Loading branch information
tillrohrmann committed Aug 28, 2019
1 parent b78e598 commit 9c4b2d8
Show file tree
Hide file tree
Showing 8 changed files with 9 additions and 114 deletions.
15 changes: 0 additions & 15 deletions docs/_includes/generated/akka_configuration.html
Expand Up @@ -122,20 +122,5 @@
<td style="word-wrap: break-word;">300.0</td>
<td>Threshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value.</td>
</tr>
<tr>
<td><h5>akka.watch.heartbeat.interval</h5></td>
<td style="word-wrap: break-word;">"10 s"</td>
<td>Heartbeat interval for Akka’s DeathWatch mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should decrease this value or increase akka.watch.heartbeat.pause. A thorough description of Akka’s DeathWatch can be found <a href="http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector">here</a></td>
</tr>
<tr>
<td><h5>akka.watch.heartbeat.pause</h5></td>
<td style="word-wrap: break-word;">"60 s"</td>
<td>Acceptable heartbeat pause for Akka’s DeathWatch mechanism. A low value does not allow an irregular heartbeat. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should increase this value or decrease akka.watch.heartbeat.interval. Higher value increases the time to detect a dead TaskManager. A thorough description of Akka’s DeathWatch can be found <a href="http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector">here</a></td>
</tr>
<tr>
<td><h5>akka.watch.threshold</h5></td>
<td style="word-wrap: break-word;">12</td>
<td>Threshold for the DeathWatch failure detector. A low value is prone to false positives whereas a high value increases the time to detect a dead TaskManager. A thorough description of Akka’s DeathWatch can be found <a href="http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector">here</a></td>
</tr>
</tbody>
</table>
Expand Up @@ -118,8 +118,6 @@ public static void shutDownServices() throws Exception {

protected static Configuration getFlinkConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "5 s");
flinkConfig.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1 s");
flinkConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "16m");
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 s");
flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
Expand Down
Expand Up @@ -21,8 +21,6 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.description.Description;

import static org.apache.flink.configuration.description.LinkElement.link;

/**
* Akka configuration options.
*/
Expand All @@ -38,34 +36,6 @@ public class AkkaOptions {
.withDescription("Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you" +
" should try to increase this value. Timeouts can be caused by slow machines or a congested network. The" +
" timeout value requires a time-unit specifier (ms/s/min/h/d).");

/**
* The Akka death watch heartbeat interval.
*/
public static final ConfigOption<String> WATCH_HEARTBEAT_INTERVAL = ConfigOptions
.key("akka.watch.heartbeat.interval")
.defaultValue(ASK_TIMEOUT.defaultValue())
.withDescription(Description.builder()
.text("Heartbeat interval for Akka’s DeathWatch mechanism to detect dead TaskManagers. If" +
" TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you" +
" should decrease this value or increase akka.watch.heartbeat.pause. A thorough description of" +
" Akka’s DeathWatch can be found %s",
link("http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector", "here")).build());

/**
* The maximum acceptable Akka death watch heartbeat pause.
*/
public static final ConfigOption<String> WATCH_HEARTBEAT_PAUSE = ConfigOptions
.key("akka.watch.heartbeat.pause")
.defaultValue("60 s")
.withDescription(Description.builder()
.text("Acceptable heartbeat pause for Akka’s DeathWatch mechanism. A low value does not allow an" +
" irregular heartbeat. If TaskManagers are wrongly marked dead because of lost or delayed" +
" heartbeat messages, then you should increase this value or decrease akka.watch.heartbeat.interval." +
" Higher value increases the time to detect a dead TaskManager. A thorough description of Akka’s" +
" DeathWatch can be found %s",
link("http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector", "here")
).build());
/**
* The Akka tcp connection timeout.
*/
Expand Down Expand Up @@ -114,18 +84,6 @@ public class AkkaOptions {
.withDescription("Threshold for the transport failure detector. Since Flink uses TCP, the detector is not" +
" necessary and, thus, the threshold is set to a high value.");

/**
* Detection threshold for the phi accrual watch failure detector.
*/
public static final ConfigOption<Integer> WATCH_THRESHOLD = ConfigOptions
.key("akka.watch.threshold")
.defaultValue(12)
.withDescription(Description.builder()
.text("Threshold for the DeathWatch failure detector. A low value is prone to false positives whereas" +
" a high value increases the time to detect a dead TaskManager. A thorough description of Akka’s" +
" DeathWatch can be found %s",
link("http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector", "here")).build());

/**
* Override SSL support for the Akka transport.
*/
Expand Down
Expand Up @@ -863,23 +863,23 @@ public final class ConfigConstants {
/**
* Heartbeat interval of watch failure detector.
*
* @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_INTERVAL} instead.
* @deprecated This option is no longer used and has no effect on Flink.
*/
@Deprecated
public static final String AKKA_WATCH_HEARTBEAT_INTERVAL = "akka.watch.heartbeat.interval";

/**
* Allowed heartbeat pause for the watch failure detector.
*
* @deprecated Use {@link AkkaOptions#WATCH_HEARTBEAT_PAUSE} instead.
* @deprecated This option is no longer used and has no effect on Flink.
*/
@Deprecated
public static final String AKKA_WATCH_HEARTBEAT_PAUSE = "akka.watch.heartbeat.pause";

/**
* Detection threshold for the phi accrual watch failure detector.
*
* @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead.
* @deprecated This option is no longer used and has no effect on Flink.
*/
@Deprecated
public static final String AKKA_WATCH_THRESHOLD = "akka.watch.threshold";
Expand Down Expand Up @@ -1769,7 +1769,7 @@ public final class ConfigConstants {
public static final double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;

/**
* @deprecated Use {@link AkkaOptions#WATCH_THRESHOLD} instead.
* @deprecated This default value is no longer used and has no effect on Flink.
*/
@Deprecated
public static final double DEFAULT_AKKA_WATCH_THRESHOLD = 12;
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.executiongraph.restart;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;

Expand Down Expand Up @@ -93,24 +92,16 @@ public static RestartStrategyFactory createRestartStrategyFactory(Configuration
// support deprecated ConfigConstants values
final int numberExecutionRetries = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS,
ConfigConstants.DEFAULT_EXECUTION_RETRIES);
String pauseString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE);
String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY,
pauseString);
String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY);

long delay;

try {
delay = Duration.apply(delayString).toMillis();
} catch (NumberFormatException nfe) {
if (delayString.equals(pauseString)) {
throw new Exception("Invalid config value for " +
AkkaOptions.WATCH_HEARTBEAT_PAUSE.key() + ": " + pauseString +
". Value must be a valid duration (such as '10 s' or '1 min')");
} else {
throw new Exception("Invalid config value for " +
ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
". Value must be a valid duration (such as '100 milli' or '10 s')");
}
throw new Exception("Invalid config value for " +
ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
". Value must be a valid duration (such as '100 milli' or '10 s')");
}

if (numberExecutionRetries > 0 && delay >= 0) {
Expand Down
Expand Up @@ -417,19 +417,6 @@ object AkkaUtils {

val transportThreshold = configuration.getDouble(AkkaOptions.TRANSPORT_THRESHOLD)

val watchHeartbeatInterval = configuration.getString(
AkkaOptions.WATCH_HEARTBEAT_INTERVAL)

val watchHeartbeatPause = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_PAUSE)

validateHeartbeat(
AkkaOptions.WATCH_HEARTBEAT_PAUSE.key(),
watchHeartbeatPause,
AkkaOptions.WATCH_HEARTBEAT_INTERVAL.key(),
watchHeartbeatInterval)

val watchThreshold = configuration.getInteger(AkkaOptions.WATCH_THRESHOLD)

val akkaTCPTimeout = configuration.getString(AkkaOptions.TCP_TIMEOUT)

val akkaFramesize = configuration.getString(AkkaOptions.FRAMESIZE)
Expand Down Expand Up @@ -505,12 +492,6 @@ object AkkaUtils {
| threshold = $transportThreshold
| }
|
| watch-failure-detector{
| heartbeat-interval = $watchHeartbeatInterval
| acceptable-heartbeat-pause = $watchHeartbeatPause
| threshold = $watchThreshold
| }
|
| netty {
| tcp {
| transport-class = "akka.remote.transport.netty.NettyTransport"
Expand Down
Expand Up @@ -81,12 +81,7 @@ public static Configuration configureZooKeeperHA(
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, fsStateHandlePath + "/checkpoints");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery");

// Akka failure detection and execution retries
config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "1000 ms");
config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "6 s");
config.setInteger(AkkaOptions.WATCH_THRESHOLD, 9);
config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
config.setString(HighAvailabilityOptions.HA_JOB_DELAY, "10 s");

return config;
}
Expand Down
Expand Up @@ -20,35 +20,22 @@ package org.apache.flink.runtime.akka

import java.net.{InetAddress, InetSocketAddress}

import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException, MetricOptions}
import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException}
import org.apache.flink.runtime.clusterframework.BootstrapTools.FixedThreadPoolExecutorConfiguration
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
import org.apache.flink.runtime.metrics.util.MetricUtils
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.AkkaProtocol
import org.apache.flink.util.NetUtils
import org.junit.Assert.assertEquals
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.slf4j.LoggerFactory

@RunWith(classOf[JUnitRunner])
class AkkaUtilsTest
extends FunSuite
with Matchers
with BeforeAndAfterAll {

test("getAkkaConfig should validate watch heartbeats") {
val configuration = new Configuration()
configuration.setString(
AkkaOptions.WATCH_HEARTBEAT_PAUSE.key(),
AkkaOptions.WATCH_HEARTBEAT_INTERVAL.defaultValue())
intercept[IllegalConfigurationException] {
AkkaUtils.getAkkaConfig(configuration, Some(("localhost", 31337)))
}
}

test("getAkkaConfig should validate transport heartbeats") {
val configuration = new Configuration()
configuration.setString(
Expand Down

0 comments on commit 9c4b2d8

Please sign in to comment.