From 42495a43d4bfca57396eb9613d0f0db02f6d27bb Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 25 Oct 2017 12:39:49 +0200 Subject: [PATCH 1/2] [FLINK-7914] Introduce AkkaOptions.RETRY_GATE_CLOSED_FOR The AkkaOptions.RETRY_GATE_CLOSED_FOR allows to configure how long a remote ActorSystem is gated in case of a connection loss. The default value is set to 50 ms. --- .../java/org/apache/flink/configuration/AkkaOptions.java | 7 +++++++ .../scala/org/apache/flink/runtime/akka/AkkaUtils.scala | 4 ++++ 2 files changed, 11 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java index 9bfc237546473..c88f32ad640f6 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java @@ -139,4 +139,11 @@ public class AkkaOptions { public static final ConfigOption JVM_EXIT_ON_FATAL_ERROR = ConfigOptions .key("akka.jvm-exit-on-fatal-error") .defaultValue(true); + + /** + * Milliseconds a gate should be closed for after a remote connection was disconnected. + */ + public static final ConfigOption RETRY_GATE_CLOSED_FOR = ConfigOptions + .key("akka.retry-gate-closed-for") + .defaultValue(50L); } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 05ffc8772e7f7..024c13740355d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -304,6 +304,8 @@ object AkkaUtils { val akkaEnableSSLConfig = configuration.getBoolean(AkkaOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(configuration) + val retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR) + val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off" val akkaSSLKeyStore = configuration.getString(SecurityOptions.SSL_KEYSTORE) @@ -355,6 +357,8 @@ object AkkaUtils { | } | | log-remote-lifecycle-events = $logLifecycleEvents + | + | retry-gate-closed-for = ${retryGateClosedFor + " ms"} | } |} """.stripMargin From a25a6dd32cac49256caa269012117f8a8e0b66a7 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 25 Oct 2017 13:24:41 +0200 Subject: [PATCH 2/2] [hotfix] Speed up JobManagerFailsITCase by decreasing timeout --- .../runtime/jobmanager/JobManagerFailsITCase.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala index e5f26c54e9676..44f14a090384c 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala @@ -20,15 +20,15 @@ package org.apache.flink.api.scala.runtime.jobmanager import akka.actor.ActorSystem import akka.testkit.{ImplicitSender, TestKit} -import org.apache.flink.configuration.{ConfigConstants, Configuration} +import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions} import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex} -import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable} import org.apache.flink.runtime.messages.Acknowledge import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils} +import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} @@ -51,7 +51,7 @@ class JobManagerFailsITCase(_system: ActorSystem) "A TaskManager" should { "detect a lost connection to the JobManager and try to reconnect to it" in { - val num_slots = 13 + val num_slots = 4 val cluster = startDeathwatchCluster(num_slots, 1) try { @@ -83,7 +83,7 @@ class JobManagerFailsITCase(_system: ActorSystem) } "go into a clean state in case of a JobManager failure" in { - val num_slots = 36 + val num_slots = 4 val sender = new JobVertex("BlockingSender") sender.setParallelism(num_slots) @@ -135,6 +135,9 @@ class JobManagerFailsITCase(_system: ActorSystem) val config = new Configuration() config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots) config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers) + config.setInteger(JobManagerOptions.PORT, 0) + config.setString(ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, "50 ms") + config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, "100 ms") val cluster = new TestingCluster(config, singleActorSystem = false)