Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,11 @@ public class AkkaOptions {
public static final ConfigOption<Boolean> JVM_EXIT_ON_FATAL_ERROR = ConfigOptions
.key("akka.jvm-exit-on-fatal-error")
.defaultValue(true);

/**
* Milliseconds a gate should be closed for after a remote connection was disconnected.
*/
public static final ConfigOption<Long> RETRY_GATE_CLOSED_FOR = ConfigOptions
.key("akka.retry-gate-closed-for")
.defaultValue(50L);
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ object AkkaUtils {
val akkaEnableSSLConfig = configuration.getBoolean(AkkaOptions.SSL_ENABLED) &&
SSLUtils.getSSLEnabled(configuration)

val retryGateClosedFor = configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR)

val akkaEnableSSL = if (akkaEnableSSLConfig) "on" else "off"

val akkaSSLKeyStore = configuration.getString(SecurityOptions.SSL_KEYSTORE)
Expand Down Expand Up @@ -355,6 +357,8 @@ object AkkaUtils {
| }
|
| log-remote-lifecycle-events = $logLifecycleEvents
|
| retry-gate-closed-for = ${retryGateClosedFor + " ms"}
| }
|}
""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ package org.apache.flink.api.scala.runtime.jobmanager

import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions}
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable}
import org.apache.flink.runtime.messages.Acknowledge
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered
import org.apache.flink.runtime.testingUtils.TestingMessages.DisableDisconnect
import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingCluster, TestingUtils}
import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
Expand All @@ -51,7 +51,7 @@ class JobManagerFailsITCase(_system: ActorSystem)
"A TaskManager" should {
"detect a lost connection to the JobManager and try to reconnect to it" in {

val num_slots = 13
val num_slots = 4
val cluster = startDeathwatchCluster(num_slots, 1)

try {
Expand Down Expand Up @@ -83,7 +83,7 @@ class JobManagerFailsITCase(_system: ActorSystem)
}

"go into a clean state in case of a JobManager failure" in {
val num_slots = 36
val num_slots = 4

val sender = new JobVertex("BlockingSender")
sender.setParallelism(num_slots)
Expand Down Expand Up @@ -135,6 +135,9 @@ class JobManagerFailsITCase(_system: ActorSystem)
val config = new Configuration()
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots)
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskmanagers)
config.setInteger(JobManagerOptions.PORT, 0)
config.setString(ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, "50 ms")
config.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, "100 ms")

val cluster = new TestingCluster(config, singleActorSystem = false)

Expand Down