Skip to content

Commit

Permalink
[FLINK-6341] [jm] Add test case to guard against RM registration loop
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Apr 28, 2017
1 parent 2383839 commit 591841f
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 19 deletions.
Expand Up @@ -72,6 +72,16 @@ public class JobManagerOptions {
.defaultValue(16)
.withDeprecatedKeys("job-manager.max-attempts-history-size");

/**
* This option specifies the interval in order to trigger a resource manager reconnection if the connection
* to the resource manager has been lost.
*
* This option is only intended for internal use.
*/
public static final ConfigOption<Long> RESOURCE_MANAGER_RECONNECT_INTERVAL =
key("jobmanager.resourcemanager.reconnect-interval")
.defaultValue(2000L);

// ------------------------------------------------------------------------
// JobManager web UI
// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -22,36 +22,38 @@
import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
import org.apache.flink.util.Preconditions;

import java.util.UUID;
import java.io.Serializable;

/**
* This message signals that the ResourceManager should reconnect to the JobManager. It is processed
* by the JobManager if it fails to register resources with the ResourceManager. The JobManager wants
* the ResourceManager to go through the reconciliation phase to sync up with the JobManager bookkeeping.
* This is done by forcing the ResourceManager to reconnect.
*/
public class ReconnectResourceManager implements RequiresLeaderSessionID, java.io.Serializable {
public class ReconnectResourceManager implements RequiresLeaderSessionID, Serializable {
private static final long serialVersionUID = 1L;

private final ActorRef resourceManager;

private final UUID currentConnID;
private final long connectionId;

public ReconnectResourceManager(ActorRef resourceManager, UUID currentConnID) {
public ReconnectResourceManager(ActorRef resourceManager, long connectionId) {
this.resourceManager = Preconditions.checkNotNull(resourceManager);
this.currentConnID = Preconditions.checkNotNull(currentConnID);
this.connectionId = Preconditions.checkNotNull(connectionId);
}

public ActorRef resourceManager() {
return resourceManager;
}

public UUID connID() {
return currentConnID;
public long getConnectionId() {
return connectionId;
}

@Override
public String toString() {
return "ReconnectResourceManager " + resourceManager.path();
return "ReconnectResourceManager(" +
resourceManager.path() + ", " +
connectionId + ')';
}
}
Expand Up @@ -178,10 +178,14 @@ class JobManager(
/** The resource manager actor responsible for allocating and managing task manager resources. */
var currentResourceManager: Option[ActorRef] = None

var currentRMConnID: UUID = null
var currentResourceManagerConnectionId: Long = 0

val taskManagerMap = mutable.Map[ActorRef, InstanceID]()

val triggerResourceManagerReconnectInterval = new FiniteDuration(
flinkConfiguration.getLong(JobManagerOptions.RESOURCE_MANAGER_RECONNECT_INTERVAL),
TimeUnit.MILLISECONDS)

/**
* Run when the job manager is started. Simply logs an informational message.
* The method also starts the leader election service.
Expand Down Expand Up @@ -339,7 +343,7 @@ class JobManager(

// ditch current resource manager (if any)
currentResourceManager = Option(msg.resourceManager())
currentRMConnID = UUID.randomUUID()
currentResourceManagerConnectionId += 1

val taskManagerResources = instanceManager.getAllRegisteredInstances.asScala.map(
instance => instance.getTaskManagerID).toList.asJava
Expand All @@ -358,24 +362,25 @@ class JobManager(
def reconnectRepeatedly(): Unit = {
msg.resourceManager() ! decorateMessage(new TriggerRegistrationAtJobManager(self))
// try again after some delay
context.system.scheduler.scheduleOnce(2 seconds) {
context.system.scheduler.scheduleOnce(triggerResourceManagerReconnectInterval) {
self ! decorateMessage(msg)
}(context.dispatcher)
}

currentResourceManager match {
case Some(rm) if rm.equals(msg.resourceManager()) && currentRMConnID.equals(msg.connID()) =>
case Some(rm) if rm.equals(msg.resourceManager()) &&
currentResourceManagerConnectionId == msg.getConnectionId =>
// we should ditch the current resource manager
log.debug(s"Disconnecting resource manager $rm and forcing a reconnect.")
currentResourceManager = None
reconnectRepeatedly()
case Some(rm) =>
// we have registered with another ResourceManager in the meantime, stop sending
// TriggerRegistrationAtJobManager messages to the old ResourceManager
case None =>
log.warn(s"No resource manager ${msg.resourceManager()} connected. " +
s"Telling old ResourceManager to register again.")
reconnectRepeatedly()
case _ =>
// we have established a new connection to a ResourceManager in the meantime, stop sending
// TriggerRegistrationAtJobManager messages to the old ResourceManager
}

case msg @ RegisterTaskManager(
Expand All @@ -399,7 +404,10 @@ class JobManager(
case _ =>
log.warn("Failure while asking ResourceManager for RegisterResource. Retrying", t)
}
self ! decorateMessage(new ReconnectResourceManager(rm, currentRMConnID))
self ! decorateMessage(
new ReconnectResourceManager(
rm,
currentResourceManagerConnectionId))
}(context.dispatcher)

case None =>
Expand Down
Expand Up @@ -18,18 +18,22 @@

package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.*;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import com.typesafe.config.Config;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
Expand All @@ -40,6 +44,7 @@
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -64,6 +69,7 @@
import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
Expand All @@ -73,6 +79,7 @@
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
Expand All @@ -89,11 +96,15 @@
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentCaptor;
import scala.Option;
import scala.Some;
import scala.Tuple2;
Expand All @@ -107,6 +118,7 @@
import java.net.InetAddress;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
import static org.apache.flink.runtime.messages.JobManagerMessages.JobResultSuccess;
Expand All @@ -121,6 +133,9 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.*;

public class JobManagerTest extends TestLogger {

Expand Down Expand Up @@ -1261,4 +1276,88 @@ public void testSavepointRestoreSettings() throws Exception {
}
}
}

/**
* This tests makes sure that triggering a reconnection from the ResourceManager will stop after a new
* ResourceManager has connected. Furthermore it makes sure that there is not endless loop of reconnection
* commands (see FLINK-6341).
*/
@Test
public void testResourceManagerConnection() throws TimeoutException, InterruptedException {
FiniteDuration testTimeout = new FiniteDuration(30L, TimeUnit.SECONDS);
final long reconnectionInterval = 200L;

final Configuration configuration = new Configuration();
configuration.setLong(JobManagerOptions.RESOURCE_MANAGER_RECONNECT_INTERVAL, reconnectionInterval);


final ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(configuration);

try {
final ActorGateway jmGateway = TestingUtils.createJobManager(
actorSystem,
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
configuration);

final TestProbe probe = TestProbe.apply(actorSystem);
final AkkaActorGateway rmGateway = new AkkaActorGateway(probe.ref(), HighAvailabilityServices.DEFAULT_LEADER_ID);

// wait for the JobManager to become the leader
Future<?> leaderFuture = jmGateway.ask(TestingJobManagerMessages.getNotifyWhenLeader(), testTimeout);
Await.ready(leaderFuture, testTimeout);

jmGateway.tell(new RegisterResourceManager(probe.ref()), rmGateway);

JobManagerMessages.LeaderSessionMessage leaderSessionMessage = probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);

assertEquals(HighAvailabilityServices.DEFAULT_LEADER_ID, leaderSessionMessage.leaderSessionID());
assertTrue(leaderSessionMessage.message() instanceof RegisterResourceManagerSuccessful);

jmGateway.tell(
new RegistrationMessages.RegisterTaskManager(
ResourceID.generate(),
mock(TaskManagerLocation.class),
new HardwareDescription(1, 1L, 1L, 1L),
1));
leaderSessionMessage = probe.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);

assertTrue(leaderSessionMessage.message() instanceof NotifyResourceStarted);

// fail the NotifyResourceStarted so that we trigger the reconnection process on the JobManager's side
probe.lastSender().tell(new Status.Failure(new Exception("Test exception")), ActorRef.noSender());

Deadline reconnectionDeadline = new FiniteDuration(5L * reconnectionInterval, TimeUnit.MILLISECONDS).fromNow();
boolean registered = false;

while (reconnectionDeadline.hasTimeLeft()) {
try {
leaderSessionMessage = probe.expectMsgClass(reconnectionDeadline.timeLeft(), JobManagerMessages.LeaderSessionMessage.class);
} catch (AssertionError ignored) {
// expected timeout after the reconnectionDeadline has been exceeded
continue;
}

if (leaderSessionMessage.message() instanceof TriggerRegistrationAtJobManager) {
if (registered) {
fail("A successful registration should not be followed by another TriggerRegistrationAtJobManager message.");
}

jmGateway.tell(new RegisterResourceManager(probe.ref()), rmGateway);
} else if (leaderSessionMessage.message() instanceof RegisterResourceManagerSuccessful) {
// now we should no longer receive TriggerRegistrationAtJobManager messages
registered = true;
} else {
fail("Received unknown message: " + leaderSessionMessage.message() + '.');
}
}

assertTrue(registered);

} finally {
// cleanup the actor system and with it all of the started actors if not already terminated
actorSystem.shutdown();
actorSystem.awaitTermination();
}
}
}

0 comments on commit 591841f

Please sign in to comment.