From b79b2aca448691481cca54b5d608cb8232d4ee3b Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 20 Mar 2017 18:15:57 +0100 Subject: [PATCH] [FLINK-6134] Set UUID(0L, 0L) as default leader session id Before the default leader session id was null in the standalone case. However, in the ZooKeeper case null indicated that there was no active leader available. With this commit, the default leader id will be set to UUID(0L, 0L). This allows the uniform treatment of null denoting that there is no active leader across the standalone and the ZooKeeper case. With this change, the FlinkActors will now ignore all LeaderSessionMessages if the actors's leader id field is null. This indicates that the FlinkActor does not know the current leader. --- .../flink/client/program/ClientTest.java | 37 ++++--- .../MesosFlinkResourceManagerTest.java | 3 +- .../BackPressureStatsTrackerITCase.java | 3 +- .../StackTraceSampleCoordinatorITCase.java | 3 +- .../flink/runtime/akka/FlinkUntypedActor.java | 14 ++- .../flink/runtime/client/JobClientActor.java | 98 +++++++++++++------ .../FlinkResourceManager.java | 4 +- .../runtime/instance/AkkaActorGateway.java | 5 +- .../StandaloneLeaderElectionService.java | 3 +- .../StandaloneLeaderRetrievalService.java | 4 +- .../runtime/LeaderSessionMessageFilter.scala | 21 ++-- .../runtime/messages/JobClientMessages.scala | 9 +- .../runtime/messages/JobManagerMessages.scala | 2 +- .../runtime/taskmanager/TaskManager.scala | 5 +- .../runtime/client/JobClientActorTest.java | 29 +++--- .../jobmanager/JobManagerHARecoveryTest.java | 5 +- .../runtime/jobmanager/JobManagerTest.java | 41 ++++---- .../runtime/jobmaster/JobMasterTest.java | 4 +- ...LeaderElectionRetrievalTestingCluster.java | 8 +- .../StandaloneLeaderElectionTest.java | 5 +- .../TestingLeaderRetrievalService.java | 6 +- .../metrics/TaskManagerMetricsTest.java | 7 +- .../AkkaKvStateLocationLookupServiceTest.java | 24 +++-- .../JobLeaderIdServiceTest.java | 15 ++- .../ResourceManagerJobMasterTest.java | 13 ++- .../taskexecutor/TaskExecutorITCase.java | 2 +- .../taskexecutor/TaskExecutorTest.java | 24 +++-- .../TaskManagerRegistrationTest.java | 3 +- .../runtime/taskmanager/TaskManagerTest.java | 7 +- .../src/test/resources/log4j-test.properties | 2 +- .../JobManagerRegistrationTest.scala | 24 +++-- .../testingUtils/ScalaTestingUtils.scala | 3 +- .../runtime/testingUtils/TestingUtils.scala | 15 +-- .../JobManagerHAJobGraphRecoveryITCase.java | 88 ----------------- .../LocalFlinkMiniClusterITCase.java | 3 +- .../java/org/apache/flink/yarn/UtilsTest.java | 4 +- 36 files changed, 292 insertions(+), 251 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index 96785f471d542..75cb0e71a3a79 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -42,11 +42,13 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -54,6 +56,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; import java.net.URL; import java.util.Collections; import java.util.UUID; @@ -68,7 +71,7 @@ /** * Simple and maybe stupid test to check the {@link ClusterClient} class. */ -public class ClientTest { +public class ClientTest extends TestLogger { private PackagedProgram program; @@ -217,27 +220,21 @@ public void shouldSubmitToJobClient() { * This test verifies correct that the correct exception is thrown when the job submission fails. */ @Test - public void shouldSubmitToJobClientFails() { - try { - jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME()); + public void shouldSubmitToJobClientFails() throws IOException { + jobManagerSystem.actorOf(Props.create(FailureReturningActor.class), JobManager.JOB_MANAGER_NAME()); - ClusterClient out = new StandaloneClusterClient(config); - out.setDetached(true); + ClusterClient out = new StandaloneClusterClient(config); + out.setDetached(true); - try { - out.run(program.getPlanWithJars(), 1); - fail("This should fail with an exception"); - } - catch (ProgramInvocationException e) { - // bam! - } - catch (Exception e) { - fail("wrong exception " + e); - } + try { + out.run(program.getPlanWithJars(), 1); + fail("This should fail with an exception"); + } + catch (ProgramInvocationException e) { + // bam! } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + fail("wrong exception " + e); } } @@ -308,7 +305,7 @@ public void testGetExecutionPlan() { public static class SuccessReturningActor extends FlinkUntypedActor { - private UUID leaderSessionID = null; + private UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID; @Override public void handleMessage(Object message) { @@ -338,7 +335,7 @@ protected UUID getLeaderSessionID() { public static class FailureReturningActor extends FlinkUntypedActor { - private UUID leaderSessionID = null; + private UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID; @Override public void handleMessage(Object message) { diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java index dcf6a823c4c53..c854a1713ef87 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManagerTest.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.clusterframework.ContainerSpecification; import org.apache.flink.runtime.clusterframework.messages.*; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -208,7 +209,7 @@ public void initialize() { TestingMesosFlinkResourceManager.class, config, mesosConfig, workerStore, retrievalService, tmParams, containerSpecification, artifactResolver, LOG)); resourceManagerInstance = resourceManagerRef.underlyingActor(); - resourceManager = new AkkaActorGateway(resourceManagerRef, null); + resourceManager = new AkkaActorGateway(resourceManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID); verify(schedulerDriver).start(); resourceManagerInstance.connectionMonitor.expectMsgClass(ConnectionMonitor.Start.class); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java index 46f8be6f02cf3..e80c509cc57da 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -138,7 +139,7 @@ public void testBackPressuredProducer() throws Exception { @Override protected void run() { try { - ActorGateway testActor = new AkkaActorGateway(getTestActor(), null); + ActorGateway testActor = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID); // Submit the job and wait until it is running JobClient.submitJobDetached( diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java index a44e212b18483..54633842b2b18 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -110,7 +111,7 @@ public void testTaskClearedWhileSampling() throws Exception { @Override protected void run() { try { - ActorGateway testActor = new AkkaActorGateway(getTestActor(), null); + ActorGateway testActor = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID); int maxAttempts = 10; int sleepTime = 100; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java index 3255778fe95d6..05ae50177069a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java @@ -84,10 +84,14 @@ private void handleLeaderSessionID(Object message) throws Exception { UUID expectedID = getLeaderSessionID(); UUID actualID = msg.leaderSessionID(); - if(expectedID == actualID || (expectedID != null && expectedID.equals(actualID))) { - handleMessage(msg.message()); + if (expectedID != null) { + if (expectedID.equals(actualID)) { + handleMessage(msg.message()); + } else { + handleDiscardedMessage(expectedID, msg); + } } else { - handleDiscardedMessage(expectedID, msg); + handleNoLeaderId(msg); } } else if (message instanceof RequiresLeaderSessionID) { throw new Exception("Received a message " + message + " without a leader session " + @@ -104,6 +108,10 @@ private void handleDiscardedMessage(UUID expectedLeaderSessionID, LeaderSessionM msg.leaderSessionID()); } + private void handleNoLeaderId(LeaderSessionMessage msg) { + LOG.warn("Discard message {} because there is currently no valid leader id known.", msg); + } + /** * This method contains the actor logic which defines how to react to incoming messages. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java index 1380e76d41fe7..368a2b6d3bda5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClientActor.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.client; import akka.actor.ActorRef; +import akka.actor.Cancellable; import akka.actor.PoisonPill; import akka.actor.Status; import akka.actor.Terminated; @@ -36,6 +37,7 @@ import org.apache.flink.util.Preconditions; import scala.concurrent.duration.FiniteDuration; +import java.util.Objects; import java.util.UUID; @@ -66,6 +68,10 @@ public abstract class JobClientActor extends FlinkUntypedActor implements Leader /** The client which the actor is responsible for */ protected ActorRef client; + private Cancellable connectionTimeout; + + private UUID connectionTimeoutId; + public JobClientActor( LeaderRetrievalService leaderRetrievalService, FiniteDuration timeout, @@ -73,6 +79,11 @@ public JobClientActor( this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService); this.timeout = Preconditions.checkNotNull(timeout); this.sysoutUpdates = sysoutUpdates; + this.jobManager = ActorRef.noSender(); + this.leaderSessionID = null; + + connectionTimeout = null; + connectionTimeoutId = null; } @Override @@ -146,6 +157,9 @@ public void onSuccess(ActorRef result) throws Throwable { getSelf().tell(decorateMessage(new JobManagerActorRef(result)), ActorRef.noSender()); } }, getContext().dispatcher()); + } else if (isClientConnected() && connectionTimeoutId == null) { + // msg.address == null means that the leader has lost its leadership + registerConnectionTimeout(); } } else if (message instanceof JobManagerActorRef) { // Resolved JobManager ActorRef @@ -184,36 +198,35 @@ else if (message instanceof Terminated) { jobManager.path()); disconnectFromJobManager(); - // we only issue a connection timeout if we have submitted a job before - // otherwise, we might have some more time to find another job manager - // Important: The ConnectionTimeout message is filtered out in case that we are - // notified about a new leader by setting the new leader session ID, because - // ConnectionTimeout extends RequiresLeaderSessionID if (isClientConnected()) { - getContext().system().scheduler().scheduleOnce( - timeout, - getSelf(), - decorateMessage(JobClientMessages.getConnectionTimeout()), - getContext().dispatcher(), - ActorRef.noSender()); + if (connectionTimeoutId == null) { + // only register a connection timeout if we haven't done this before + registerConnectionTimeout(); + } } } else { LOG.warn("Received 'Terminated' for unknown actor " + target); } } - else if (JobClientMessages.getConnectionTimeout().equals(message)) { - // check if we haven't found a job manager yet - if (!isJobManagerConnected()) { - final JobClientActorConnectionTimeoutException errorMessage = - new JobClientActorConnectionTimeoutException("Lost connection to the JobManager."); - final Object replyMessage = decorateMessage(new Status.Failure(errorMessage)); - if (isClientConnected()) { - client.tell( - replyMessage, - getSelf()); + else if (message instanceof JobClientMessages.ConnectionTimeout) { + JobClientMessages.ConnectionTimeout timeoutMessage = (JobClientMessages.ConnectionTimeout) message; + + if (Objects.equals(connectionTimeoutId, timeoutMessage.id())) { + // check if we haven't found a job manager yet + if (!isJobManagerConnected()) { + final JobClientActorConnectionTimeoutException errorMessage = + new JobClientActorConnectionTimeoutException("Lost connection to the JobManager."); + final Object replyMessage = decorateMessage(new Status.Failure(errorMessage)); + if (isClientConnected()) { + client.tell( + replyMessage, + getSelf()); + } + // Connection timeout reached, let's terminate + terminate(); } - // Connection timeout reached, let's terminate - terminate(); + } else { + LOG.debug("Received outdated connection timeout."); } } @@ -225,13 +238,10 @@ else if (!isJobManagerConnected() && getClientMessageClass().equals(message.getC message); // We want to submit/attach to a job, but we haven't found a job manager yet. // Let's give him another chance to find a job manager within the given timeout. - getContext().system().scheduler().scheduleOnce( - timeout, - getSelf(), - decorateMessage(JobClientMessages.getConnectionTimeout()), - getContext().dispatcher(), - ActorRef.noSender() - ); + if (connectionTimeoutId == null) { + // only register the connection timeout once + registerConnectionTimeout(); + } handleCustomMessage(message); } else { @@ -304,6 +314,8 @@ private void disconnectFromJobManager() { getContext().unwatch(jobManager); jobManager = ActorRef.noSender(); } + + leaderSessionID = null; } private void connectToJobManager(ActorRef jobManager) { @@ -316,6 +328,8 @@ private void connectToJobManager(ActorRef jobManager) { this.jobManager = jobManager; getContext().watch(jobManager); + + unregisterConnectionTimeout(); } protected void terminate() { @@ -333,4 +347,28 @@ protected boolean isClientConnected() { return client != ActorRef.noSender(); } + private void registerConnectionTimeout() { + if (connectionTimeout != null) { + connectionTimeout.cancel(); + } + + connectionTimeoutId = UUID.randomUUID(); + + connectionTimeout = getContext().system().scheduler().scheduleOnce( + timeout, + getSelf(), + decorateMessage(new JobClientMessages.ConnectionTimeout(connectionTimeoutId)), + getContext().dispatcher(), + ActorRef.noSender() + ); + } + + private void unregisterConnectionTimeout() { + if (connectionTimeout != null) { + connectionTimeout.cancel(); + connectionTimeout = null; + connectionTimeoutId = null; + } + } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java index 911c1f6604a59..77dbad45a17e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java @@ -410,9 +410,9 @@ private void newJobManagerLeaderAvailable(String leaderAddress, UUID leaderSessi // disconnect from the current leader (no-op if no leader yet) jobManagerLostLeadership(); - // a null leader address means that only a leader disconnect + // a null leader session id means that only a leader disconnect // happened, without a new leader yet - if (leaderAddress != null) { + if (leaderSessionID != null && leaderAddress != null) { // the leaderSessionID implicitly filters out success and failure messages // that come after leadership changed again this.leaderSessionID = leaderSessionID; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java index adeae03f7c18f..26b6176ff22bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.messages.LeaderSessionMessageDecorator; import org.apache.flink.runtime.messages.MessageDecorator; +import org.apache.flink.util.Preconditions; import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -48,8 +49,8 @@ public class AkkaActorGateway implements ActorGateway, Serializable { private final MessageDecorator decorator; public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) { - this.actor = actor; - this.leaderSessionID = leaderSessionID; + this.actor = Preconditions.checkNotNull(actor); + this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID); // we want to wrap RequiresLeaderSessionID messages in a LeaderSessionMessage this.decorator = new LeaderSessionMessageDecorator(leaderSessionID); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java index 2d366163cec26..a956a5eeae685 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.util.Preconditions; import java.util.UUID; @@ -42,7 +43,7 @@ public void start(LeaderContender newContender) throws Exception { contender = Preconditions.checkNotNull(newContender); // directly grant leadership to the given contender - contender.grantLeadership(null); + contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java index 4ad4646274aaf..174e1064c384f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.leaderretrieval; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; + import java.util.UUID; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -54,7 +56,7 @@ public class StandaloneLeaderRetrievalService implements LeaderRetrievalService @Deprecated public StandaloneLeaderRetrievalService(String leaderAddress) { this.leaderAddress = checkNotNull(leaderAddress); - this.leaderId = null; + this.leaderId = HighAvailabilityServices.DEFAULT_LEADER_ID; } /** diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala index 1fb32cee3e463..1a6be25c1a806 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/LeaderSessionMessageFilter.scala @@ -32,10 +32,15 @@ trait LeaderSessionMessageFilter extends FlinkActor { abstract override def receive: Receive = { case leaderMessage @ LeaderSessionMessage(msgID, msg) => - if (leaderSessionID.equals(Option(msgID))) { - super.receive(msg) - } else { - handleDiscardedMessage(leaderSessionID, leaderMessage) + leaderSessionID match { + case Some(leaderId) => + if (leaderId.equals(msgID)) { + super.receive(msg) + } else { + handleDiscardedMessage(leaderId, leaderMessage) + } + case None => + handleNoLeaderId(leaderMessage) } case msg: RequiresLeaderSessionID => throw new Exception(s"Received a message $msg without a leader session ID, even though" + @@ -45,12 +50,16 @@ trait LeaderSessionMessageFilter extends FlinkActor { } private def handleDiscardedMessage( - expectedLeaderSessionID: Option[UUID], + expectedLeaderSessionID: UUID, msg: LeaderSessionMessage) : Unit = { log.warn(s"Discard message $msg because the expected leader session ID " + s"$expectedLeaderSessionID did not equal the received leader session ID " + - s"${Option(msg.leaderSessionID)}.") + s"${msg.leaderSessionID}.") + } + + private def handleNoLeaderId(msg: LeaderSessionMessage): Unit = { + log.warn(s"Discard message $msg because there is currently no valid leader id known.") } /** Wrap [[RequiresLeaderSessionID]] messages in a [[LeaderSessionMessage]] diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala index 1f29e322ff450..7268b0f90c6a8 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobClientMessages.scala @@ -66,10 +66,13 @@ object JobClientMessages { /** Message which is triggered when the JobClient registration at the JobManager times out */ case object RegistrationTimeout extends RequiresLeaderSessionID - /** Message which is triggered when the connection timeout has been reached. */ - case object ConnectionTimeout extends RequiresLeaderSessionID + /** + * Message which is triggered when the connection timeout has been reached. + * + * @param id Timeout id which identifies the concurrent timeouts + */ + case class ConnectionTimeout(id: UUID) def getSubmissionTimeout(): AnyRef = SubmissionTimeout def getRegistrationTimeout(): AnyRef = RegistrationTimeout - def getConnectionTimeout(): AnyRef = ConnectionTimeout } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index f3025ab6b78c9..4db2584bd1f87 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -45,7 +45,7 @@ object JobManagerMessages { * [[RequiresLeaderSessionID]] interface and have to be wrapped in a [[LeaderSessionMessage]], * which also contains the current leader session ID. * - * @param leaderSessionID Current leader session ID or null, if no leader session ID was set + * @param leaderSessionID Current leader session ID * @param message [[RequiresLeaderSessionID]] message to be wrapped in a [[LeaderSessionMessage]] */ case class LeaderSessionMessage(leaderSessionID: UUID, message: Any) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 25d53665afcf1..2e8a6fa19132a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1434,7 +1434,10 @@ class TaskManager( this.jobManagerAkkaURL = Option(newJobManagerAkkaURL) this.leaderSessionID = Option(leaderSessionID) - triggerTaskManagerRegistration() + if (this.leaderSessionID.isDefined) { + // only trigger the registration if we have obtained a valid leader id (!= null) + triggerTaskManagerRegistration() + } } /** Starts the TaskManager's registration process to connect to the JobManager. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java index 2e3384ffaa0db..0ec00df870917 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/JobClientActorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; @@ -73,7 +74,7 @@ public static void teardown() { */ @Test(expected=JobClientActorSubmissionTimeoutException.class) public void testSubmissionTimeout() throws Exception { - FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); FiniteDuration timeout = jobClientActorTimeout.$times(2); UUID leaderSessionID = UUID.randomUUID(); @@ -112,7 +113,7 @@ public void testSubmissionTimeout() throws Exception { */ @Test(expected=JobClientActorRegistrationTimeoutException.class) public void testRegistrationTimeout() throws Exception { - FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); FiniteDuration timeout = jobClientActorTimeout.$times(2); UUID leaderSessionID = UUID.randomUUID(); @@ -142,17 +143,19 @@ public void testRegistrationTimeout() throws Exception { Await.result(jobExecutionResult, timeout); } - /** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException} + /** Tests that a {@link JobClientActorConnectionTimeoutException} * is thrown when the JobSubmissionClientActor wants to submit a job but has not connected to a JobManager. * * @throws Exception */ @Test(expected=JobClientActorConnectionTimeoutException.class) public void testConnectionTimeoutWithoutJobManagerForSubmission() throws Exception { - FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); FiniteDuration timeout = jobClientActorTimeout.$times(2); - TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + "localhost", + HighAvailabilityServices.DEFAULT_LEADER_ID); Props jobClientActorProps = JobSubmissionClientActor.createActorProps( testingLeaderRetrievalService, @@ -170,16 +173,18 @@ public void testConnectionTimeoutWithoutJobManagerForSubmission() throws Excepti Await.result(jobExecutionResult, timeout); } - /** Tests that a {@link org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException} + /** Tests that a {@link JobClientActorConnectionTimeoutException} * is thrown when the JobAttachmentClientActor attach to a job at the JobManager * but has not connected to a JobManager. */ @Test(expected=JobClientActorConnectionTimeoutException.class) public void testConnectionTimeoutWithoutJobManagerForRegistration() throws Exception { - FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); FiniteDuration timeout = jobClientActorTimeout.$times(2); - TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService( + "localhost", + HighAvailabilityServices.DEFAULT_LEADER_ID); Props jobClientActorProps = JobAttachmentClientActor.createActorProps( testingLeaderRetrievalService, @@ -203,7 +208,7 @@ public void testConnectionTimeoutWithoutJobManagerForRegistration() throws Excep */ @Test(expected=JobClientActorConnectionTimeoutException.class) public void testConnectionTimeoutAfterJobSubmission() throws Exception { - FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); + FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); FiniteDuration timeout = jobClientActorTimeout.$times(2); UUID leaderSessionID = UUID.randomUUID(); @@ -245,8 +250,8 @@ public void testConnectionTimeoutAfterJobSubmission() throws Exception { */ @Test(expected=JobClientActorConnectionTimeoutException.class) public void testConnectionTimeoutAfterJobRegistration() throws Exception { - FiniteDuration jobClientActorTimeout = new FiniteDuration(5, TimeUnit.SECONDS); - FiniteDuration timeout = jobClientActorTimeout.$times(2); + FiniteDuration jobClientActorTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); + FiniteDuration timeout = jobClientActorTimeout.$times(2L); UUID leaderSessionID = UUID.randomUUID(); @@ -287,7 +292,7 @@ public void testConnectionTimeoutAfterJobRegistration() throws Exception { */ @Test public void testGuaranteedAnswerIfJobClientDies() throws Exception { - FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS); + FiniteDuration timeout = new FiniteDuration(2L, TimeUnit.SECONDS); UUID leaderSessionID = UUID.randomUUID(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 32358c0c85a29..2ebc36e1cb672 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.InstanceManager; @@ -166,7 +167,9 @@ public void testJobRecoveryWhenLosingLeadership() throws Exception { CheckpointIDCounter checkpointCounter = new StandaloneCheckpointIDCounter(); CheckpointRecoveryFactory checkpointStateFactory = new MyCheckpointRecoveryFactory(checkpointStore, checkpointCounter); TestingLeaderElectionService myLeaderElectionService = new TestingLeaderElectionService(); - TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService myLeaderRetrievalService = new TestingLeaderRetrievalService( + "localhost", + HighAvailabilityServices.DEFAULT_LEADER_ID); InstanceManager instanceManager = new InstanceManager(); instanceManager.addInstanceListener(scheduler); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 727fc6507a4c6..3944752c635a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -86,6 +87,7 @@ import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.StoppableInvokable; +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -120,7 +122,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class JobManagerTest { +public class JobManagerTest extends TestLogger { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -183,7 +185,7 @@ protected void run() { TestingUtils.TESTING_DURATION()); // we can set the leader session ID to None because we don't use this gateway to send messages - final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null); + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID); // Submit the job and wait for all vertices to be running jobManagerGateway.tell( @@ -304,7 +306,7 @@ protected void run() { TestingUtils.TESTING_DURATION()); // we can set the leader session ID to None because we don't use this gateway to send messages - final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null); + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID); // Submit the job and wait for all vertices to be running jobManagerGateway.tell( @@ -395,7 +397,7 @@ protected void run() { TestingUtils.TESTING_DURATION()); // we can set the leader session ID to None because we don't use this gateway to send messages - final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null); + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID); // Submit the job and wait for all vertices to be running jobManagerGateway.tell( @@ -478,7 +480,7 @@ protected void run() { final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); // we can set the leader session ID to None because we don't use this gateway to send messages - final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null); + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID); // Submit the job and wait for all vertices to be running jobManagerGateway.tell( @@ -530,7 +532,7 @@ protected void run() { final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); // we can set the leader session ID to None because we don't use this gateway to send messages - final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null); + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID); // Submit the job and wait for all vertices to be running jobManagerGateway.tell( @@ -572,7 +574,6 @@ public void testKvStateMessages() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, "100ms"); - UUID leaderSessionId = null; ActorGateway jobManager = new AkkaActorGateway( JobManager.startJobManagerActors( config, @@ -581,7 +582,7 @@ public void testKvStateMessages() throws Exception { TestingUtils.defaultExecutor(), TestingJobManager.class, MemoryArchivist.class)._1(), - leaderSessionId); + HighAvailabilityServices.DEFAULT_LEADER_ID); LeaderRetrievalService leaderRetrievalService = new StandaloneLeaderRetrievalService( AkkaUtils.getAkkaURL(system, jobManager.actor())); @@ -794,8 +795,8 @@ public void testCancelWithSavepoint() throws Exception { TestingJobManager.class, TestingMemoryArchivist.class); - jobManager = new AkkaActorGateway(master._1(), null); - archiver = new AkkaActorGateway(master._2(), null); + jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID); + archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID); ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( config, @@ -807,7 +808,7 @@ public void testCancelWithSavepoint() throws Exception { true, TestingTaskManager.class); - taskManager = new AkkaActorGateway(taskManagerRef, null); + taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID); // Wait until connected Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor()); @@ -920,8 +921,8 @@ public void testCancelWithSavepointNoDirectoriesConfigured() throws Exception { TestingJobManager.class, TestingMemoryArchivist.class); - jobManager = new AkkaActorGateway(master._1(), null); - archiver = new AkkaActorGateway(master._2(), null); + jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID); + archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID); ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( config, @@ -933,7 +934,7 @@ public void testCancelWithSavepointNoDirectoriesConfigured() throws Exception { true, TestingTaskManager.class); - taskManager = new AkkaActorGateway(taskManagerRef, null); + taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID); // Wait until connected Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor()); @@ -1026,8 +1027,8 @@ public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception TestingJobManager.class, TestingMemoryArchivist.class); - jobManager = new AkkaActorGateway(master._1(), null); - archiver = new AkkaActorGateway(master._2(), null); + jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID); + archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID); ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor( config, @@ -1039,7 +1040,7 @@ public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception true, TestingTaskManager.class); - taskManager = new AkkaActorGateway(taskManagerRef, null); + taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID); // Wait until connected Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor()); @@ -1126,8 +1127,8 @@ public void testSavepointRestoreSettings() throws Exception { TestingJobManager.class, TestingMemoryArchivist.class); - jobManager = new AkkaActorGateway(master._1(), null); - archiver = new AkkaActorGateway(master._2(), null); + jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID); + archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID); Configuration tmConfig = new Configuration(); tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4); @@ -1142,7 +1143,7 @@ public void testSavepointRestoreSettings() throws Exception { true, TestingTaskManager.class); - taskManager = new AkkaActorGateway(taskManagerRef, null); + taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID); // Wait until connected Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 567a8fc604433..43536b608656e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -64,7 +64,9 @@ public class JobMasterTest extends TestLogger { @Test public void testHeartbeatTimeoutWithTaskManager() throws Exception { final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(); + final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class)); final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java index c143fe2408a78..1cab0eabdaffa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionRetrievalTestingCluster.java @@ -53,8 +53,8 @@ public LeaderElectionRetrievalTestingCluster( this.userConfiguration = userConfiguration; this.useSingleActorSystem = singleActorSystem; - leaderElectionServices = new ArrayList(); - leaderRetrievalServices = new ArrayList(); + leaderElectionServices = new ArrayList<>(); + leaderRetrievalServices = new ArrayList<>(); } @Override @@ -78,7 +78,9 @@ public Option createLeaderElectionService() { @Override public LeaderRetrievalService createLeaderRetrievalService() { - leaderRetrievalServices.add(new TestingLeaderRetrievalService()); + leaderRetrievalServices.add(new TestingLeaderRetrievalService( + null, + null)); return leaderRetrievalServices.get(leaderRetrievalServices.size() - 1); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java index b04be6310e069..18b620fa45074 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -46,12 +47,12 @@ public void testStandaloneLeaderElectionRetrieval() throws Exception { contender.waitForLeader(1000l); assertTrue(contender.isLeader()); - assertEquals(null, contender.getLeaderSessionID()); + assertEquals(HighAvailabilityServices.DEFAULT_LEADER_ID, contender.getLeaderSessionID()); testingListener.waitForNewLeader(1000l); assertEquals(TEST_URL, testingListener.getAddress()); - assertEquals(null, testingListener.getLeaderSessionID()); + assertEquals(HighAvailabilityServices.DEFAULT_LEADER_ID, testingListener.getLeaderSessionID()); } finally { leaderElectionService.stop(); leaderRetrievalService.stop(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java index d6bcaafacf75b..887772af70541 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderRetrievalService.java @@ -35,10 +35,6 @@ public class TestingLeaderRetrievalService implements LeaderRetrievalService { private volatile LeaderRetrievalListener listener; - public TestingLeaderRetrievalService() { - this(null, null); - } - public TestingLeaderRetrievalService(String leaderAddress, UUID leaderSessionID) { this.leaderAddress = leaderAddress; this.leaderSessionID = leaderSessionID; @@ -48,7 +44,7 @@ public TestingLeaderRetrievalService(String leaderAddress, UUID leaderSessionID) public void start(LeaderRetrievalListener listener) throws Exception { this.listener = Preconditions.checkNotNull(listener); - if (leaderAddress != null) { + if (leaderSessionID != null && leaderAddress != null) { listener.notifyLeaderAddress(leaderAddress, leaderSessionID); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index aed2b6f3a56d7..100c83d38e8ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -109,7 +110,11 @@ protected void run() { expectMsgEquals(TaskManagerMessages.getRegisteredAtJobManagerMessage()); // trigger re-registration of TM; this should include a disconnect from the current JM - taskManager.tell(new TaskManagerMessages.JobManagerLeaderAddress(jobManager.path().toString(), null), jobManager); + taskManager.tell( + new TaskManagerMessages.JobManagerLeaderAddress( + jobManager.path().toString(), + HighAvailabilityServices.DEFAULT_LEADER_ID), + jobManager); // wait for re-registration to be completed taskManager.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java index e9950fbd7009b..34e31742264d5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/AkkaKvStateLocationLookupServiceTest.java @@ -26,12 +26,14 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategy; import org.apache.flink.runtime.query.AkkaKvStateLocationLookupService.LookupRetryStrategyFactory; import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -51,7 +53,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class AkkaKvStateLocationLookupServiceTest { +public class AkkaKvStateLocationLookupServiceTest extends TestLogger { /** The default timeout. */ private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS); @@ -77,7 +79,9 @@ public static void tearDown() throws Exception { */ @Test public void testNoJobManagerRegistered() throws Exception { - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); Queue received = new LinkedBlockingQueue<>(); AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( @@ -108,7 +112,7 @@ public void testNoJobManagerRegistered() throws Exception { // // Leader registration => communicate with new leader // - UUID leaderSessionId = null; + UUID leaderSessionId = HighAvailabilityServices.DEFAULT_LEADER_ID; KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea"); ActorRef testActor = LookupResponseActor.create(received, leaderSessionId, expected); @@ -154,7 +158,9 @@ public void testNoJobManagerRegistered() throws Exception { */ @Test public void testLeaderSessionIdChange() throws Exception { - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + "localhost", + HighAvailabilityServices.DEFAULT_LEADER_ID); Queue received = new LinkedBlockingQueue<>(); AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( @@ -216,7 +222,9 @@ public LookupRetryStrategy createRetryStrategy() { } }; - final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( leaderRetrievalService, @@ -268,7 +276,7 @@ public FiniteDuration getRetryDelay() { @Override public boolean tryRetry() { - leaderRetrievalService.notifyListener(testActorAddress, null); + leaderRetrievalService.notifyListener(testActorAddress, HighAvailabilityServices.DEFAULT_LEADER_ID); return true; } }); @@ -279,7 +287,9 @@ public boolean tryRetry() { @Test public void testUnexpectedResponseType() throws Exception { - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + "localhost", + HighAvailabilityServices.DEFAULT_LEADER_ID); Queue received = new LinkedBlockingQueue<>(); AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java index d5e99bd5b1a08..149cc10de7e75 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.util.TestLogger; @@ -64,7 +65,9 @@ public void testAddingJob() throws Exception { final String address = "foobar"; final UUID leaderId = UUID.randomUUID(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); @@ -98,7 +101,7 @@ public void testAddingJob() throws Exception { public void testRemovingJob() throws Exception { final JobID jobId = new JobID(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(null, null); highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); @@ -139,7 +142,9 @@ public void testRemovingJob() throws Exception { public void testInitialJobTimeout() throws Exception { final JobID jobId = new JobID(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); @@ -181,7 +186,9 @@ public void jobTimeoutAfterLostLeadership() throws Exception { final String address = "foobar"; final UUID leaderId = UUID.randomUUID(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); highAvailabilityServices.setJobMasterLeaderRetriever(jobId, leaderRetrievalService); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index fb166d41e49cf..9a68eca1c0e64 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; @@ -112,7 +113,9 @@ public void testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws Exceptio String jobMasterAddress = "/jobMasterAddress1"; JobID jobID = mockJobMaster(jobMasterAddress); TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService( + "localhost", + HighAvailabilityServices.DEFAULT_LEADER_ID); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); @@ -136,7 +139,9 @@ public void testRegisterJobMasterFromInvalidAddress() throws Exception { String jobMasterAddress = "/jobMasterAddress1"; JobID jobID = mockJobMaster(jobMasterAddress); TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService( + "localhost", + HighAvailabilityServices.DEFAULT_LEADER_ID); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); @@ -160,7 +165,9 @@ public void testRegisterJobMasterWithFailureLeaderListener() throws Exception { String jobMasterAddress = "/jobMasterAddress1"; JobID jobID = mockJobMaster(jobMasterAddress); TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService( + "localhost", + HighAvailabilityServices.DEFAULT_LEADER_ID); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index e74ba29935181..f6c2dcef8ca36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -83,7 +83,7 @@ public void testSlotAllocation() throws Exception { final ResourceID taskManagerResourceId = new ResourceID("foobar"); final UUID rmLeaderId = UUID.randomUUID(); final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); - final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(); + final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(null, null); final String rmAddress = "rm"; final String jmAddress = "jm"; final UUID jmLeaderId = UUID.randomUUID(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 5702eebe15260..67196aaaaf009 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -119,8 +119,12 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception { final TestingSerialRpcService rpc = new TestingSerialRpcService(); final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); - final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(); - final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(); + final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); + final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); haServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService); haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); @@ -296,7 +300,9 @@ public void testTriggerRegistrationOnLeaderChange() throws Exception { rpc.registerGateway(address1, rmGateway1); rpc.registerGateway(address2, rmGateway2); - TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService( + null, + null); TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); haServices.setResourceManagerLeaderRetriever(testLeaderService); @@ -514,8 +520,12 @@ public void testJobLeaderDetection() throws Exception { final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation); final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); - final TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(); - final TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(); + final TestingLeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); + final TestingLeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService); haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService); @@ -735,7 +745,9 @@ public void testRejectAllocationRequestsForOutOfSyncSlots() throws Exception { ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class); rpc.registerGateway(address1, rmGateway1); - TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService( + "localhost", + HighAvailabilityServices.DEFAULT_LEADER_ID); TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); haServices.setResourceManagerLeaderRetriever(testLeaderService); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java index e234cba6ab1ce..f3b1d4a1db8eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmanager.JobManager; @@ -567,7 +568,7 @@ public void testCheckForValidRegistrationSessionIDs() { final ActorRef taskManager = taskManagerGateway.actor(); final UUID falseLeaderSessionID = UUID.randomUUID(); - final UUID trueLeaderSessionID = null; + final UUID trueLeaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID; new Within(timeout) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index 730595cb6203f..a754cff3d3cd9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.InstanceID; @@ -125,7 +126,7 @@ public class TaskManagerTest extends TestLogger { private static ActorSystem system; - final static UUID leaderSessionID = null; + final static UUID leaderSessionID = HighAvailabilityServices.DEFAULT_LEADER_ID; @BeforeClass public static void setup() { @@ -1165,8 +1166,8 @@ public void testTriggerStackTraceSampleMessage() throws Exception { // We need this to be a JM that answers to update messages for // robustness on Travis (if jobs need to be resubmitted in (4)). - ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator(null))); - ActorGateway jobManagerActorGateway = new AkkaActorGateway(jm, null); + ActorRef jm = system.actorOf(Props.create(new SimpleLookupJobManagerCreator(leaderSessionID))); + ActorGateway jobManagerActorGateway = new AkkaActorGateway(jm, leaderSessionID); final ActorGateway testActorGateway = new AkkaActorGateway( getTestActor(), diff --git a/flink-runtime/src/test/resources/log4j-test.properties b/flink-runtime/src/test/resources/log4j-test.properties index 7ba163366a631..98f136aca541f 100644 --- a/flink-runtime/src/test/resources/log4j-test.properties +++ b/flink-runtime/src/test/resources/log4j-test.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=OFF, console +log4j.rootLogger=INFO, console # ----------------------------------------------------------------------------- # Console (use 'console') diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala index dfcbf77a83529..76d95917461c9 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerRegistrationTest.scala @@ -27,12 +27,12 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.instance._ import org.apache.flink.runtime.jobmanager.JobManagerRegistrationTest.PlainForwardingActor import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage import org.apache.flink.runtime.messages.RegistrationMessages.{AcknowledgeRegistration, AlreadyRegistered, RegisterTaskManager} import org.apache.flink.runtime.taskmanager.TaskManagerLocation - import org.apache.flink.runtime.testutils.TestingResourceManager import org.apache.flink.runtime.util.LeaderRetrievalUtils import org.junit.Assert.{assertNotEquals, assertNotNull} @@ -87,7 +87,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { connectionInfo1, hardwareDescription, 1), - new AkkaActorGateway(tm1, null)) + new AkkaActorGateway(tm1, HighAvailabilityServices.DEFAULT_LEADER_ID)) val response = expectMsgType[LeaderSessionMessage] response match { @@ -104,7 +104,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { connectionInfo2, hardwareDescription, 1), - new AkkaActorGateway(tm2, null)) + new AkkaActorGateway(tm2, HighAvailabilityServices.DEFAULT_LEADER_ID)) val response = expectMsgType[LeaderSessionMessage] response match { @@ -123,7 +123,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { val jm = startTestingJobManager(_system) val rm = startTestingResourceManager(_system, jm.actor()) - val selfGateway = new AkkaActorGateway(testActor, null) + val selfGateway = new AkkaActorGateway(testActor, HighAvailabilityServices.DEFAULT_LEADER_ID) val resourceID = ResourceID.generate() val connectionInfo = new TaskManagerLocation(resourceID, InetAddress.getLocalHost, 1) @@ -155,17 +155,23 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { selfGateway) expectMsgType[LeaderSessionMessage] match { - case LeaderSessionMessage(null, AcknowledgeRegistration(_, _)) => + case LeaderSessionMessage( + HighAvailabilityServices.DEFAULT_LEADER_ID, + AcknowledgeRegistration(_, _)) => case m => fail("Wrong message type: " + m) } expectMsgType[LeaderSessionMessage] match { - case LeaderSessionMessage(null, AlreadyRegistered(_, _)) => + case LeaderSessionMessage( + HighAvailabilityServices.DEFAULT_LEADER_ID, + AlreadyRegistered(_, _)) => case m => fail("Wrong message type: " + m) } expectMsgType[LeaderSessionMessage] match { - case LeaderSessionMessage(null, AlreadyRegistered(_, _)) => + case LeaderSessionMessage( + HighAvailabilityServices.DEFAULT_LEADER_ID, + AlreadyRegistered(_, _)) => case m => fail("Wrong message type: " + m) } } @@ -182,7 +188,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { None, classOf[JobManager], classOf[MemoryArchivist]) - new AkkaActorGateway(jm, null) + new AkkaActorGateway(jm, HighAvailabilityServices.DEFAULT_LEADER_ID) } private def startTestingResourceManager(system: ActorSystem, jm: ActorRef): ActorGateway = { @@ -193,7 +199,7 @@ ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { _system, LeaderRetrievalUtils.createLeaderRetrievalService(config, jm), classOf[TestingResourceManager]) - new AkkaActorGateway(rm, null) + new AkkaActorGateway(rm, HighAvailabilityServices.DEFAULT_LEADER_ID) } } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala index d46dd71bf14d3..d4ca8f6394017 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/ScalaTestingUtils.scala @@ -19,6 +19,7 @@ package org.apache.flink.runtime.testingUtils import akka.actor.ActorRef +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} /** Mixing for testing utils @@ -32,6 +33,6 @@ trait ScalaTestingUtils { * @return [[ActorGateway]] encapsulating the given [[ActorRef]] */ implicit def actorRef2InstanceGateway(actor: ActorRef): ActorGateway = { - new AkkaActorGateway(actor, null) + new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID) } } diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala index 20260c703cd88..d6221f5138720 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala @@ -33,6 +33,7 @@ import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.client.JobClient import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.highavailability.HighAvailabilityServices import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} import org.apache.flink.runtime.jobgraph.JobGraph import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist} @@ -45,7 +46,7 @@ import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMess import scala.concurrent.duration.TimeUnit import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContextExecutor, Await, ExecutionContext} +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor} import scala.language.postfixOps /** @@ -326,7 +327,7 @@ object TestingUtils { Await.ready(notificationResult, TESTING_DURATION) } - new AkkaActorGateway(taskManager, null) + new AkkaActorGateway(taskManager, HighAvailabilityServices.DEFAULT_LEADER_ID) } /** Stops the given actor by sending it a Kill message @@ -456,7 +457,7 @@ object TestingUtils { jobManagerClass, classOf[MemoryArchivist]) - new AkkaActorGateway(actor, null) + new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID) } /** Creates a forwarding JobManager which sends all received message to the forwarding target. @@ -478,7 +479,7 @@ object TestingUtils { Props( classOf[ForwardingActor], forwardingTarget, - None), + Option(HighAvailabilityServices.DEFAULT_LEADER_ID)), name ) case None => @@ -486,11 +487,11 @@ object TestingUtils { Props( classOf[ForwardingActor], forwardingTarget, - None) + Option(HighAvailabilityServices.DEFAULT_LEADER_ID)) ) } - new AkkaActorGateway(actor, null) + new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID) } def submitJobAndWait( @@ -537,7 +538,7 @@ object TestingUtils { LeaderRetrievalUtils.createLeaderRetrievalService(configuration, jobManager), classOf[TestingResourceManager]) - new AkkaActorGateway(actor, null) + new AkkaActorGateway(actor, HighAvailabilityServices.DEFAULT_LEADER_ID) } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java index 236e92208ab14..e4d0f65f99e8b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java @@ -41,12 +41,10 @@ import org.apache.flink.runtime.leaderelection.TestingListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.messages.JobManagerMessages.JobStatusResponse; import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingCluster; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; @@ -70,7 +68,6 @@ import java.io.File; import java.io.IOException; import java.util.Collection; -import java.util.List; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -167,91 +164,6 @@ public void testJobPersistencyWhenJobManagerShutdown() throws Exception { verifyRecoveryState(config); } - /** - * Tests that submissions to non-leaders are handled. - */ - @Test - public void testSubmitJobToNonLeader() throws Exception { - Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig( - ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath()); - - // Configure the cluster - config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2); - config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); - - TestingCluster flink = new TestingCluster(config, false, false); - - try { - final Deadline deadline = TestTimeOut.fromNow(); - - // Start the JobManager and TaskManager - flink.start(true); - - JobGraph jobGraph = createBlockingJobGraph(); - - List bothJobManagers = flink.getJobManagersAsJava(); - - ActorGateway leadingJobManager = flink.getLeaderGateway(deadline.timeLeft()); - - ActorGateway nonLeadingJobManager; - if (bothJobManagers.get(0).equals(leadingJobManager.actor())) { - nonLeadingJobManager = new AkkaActorGateway(bothJobManagers.get(1), null); - } - else { - nonLeadingJobManager = new AkkaActorGateway(bothJobManagers.get(0), null); - } - - log.info("Leading job manager: " + leadingJobManager); - log.info("Non-leading job manager: " + nonLeadingJobManager); - - // Submit the job - nonLeadingJobManager.tell(new SubmitJob(jobGraph, ListeningBehaviour.DETACHED)); - - log.info("Submitted job graph to " + nonLeadingJobManager); - - // Wait for the job to start. We are asking the *leading** JM here although we've - // submitted the job to the non-leading JM. This is the behaviour under test. - JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, - leadingJobManager, deadline.timeLeft()); - - log.info("Wait that the non-leader removes the submitted job."); - - // Make sure that the **non-leading** JM has actually removed the job graph from its - // local state. - boolean success = false; - while (!success && deadline.hasTimeLeft()) { - JobStatusResponse jobStatusResponse = JobManagerActorTestUtils.requestJobStatus( - jobGraph.getJobID(), nonLeadingJobManager, deadline.timeLeft()); - - if (jobStatusResponse instanceof JobManagerMessages.JobNotFound) { - success = true; - } - else { - log.info(((JobManagerMessages.CurrentJobStatus)jobStatusResponse).status().toString()); - Thread.sleep(100); - } - } - - if (!success) { - fail("Non-leading JM was still holding reference to the job graph."); - } - - Future jobRemoved = leadingJobManager.ask( - new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), - deadline.timeLeft()); - - leadingJobManager.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID())); - - Await.ready(jobRemoved, deadline.timeLeft()); - } - finally { - flink.shutdown(); - } - - // Verify that everything is clean - verifyCleanRecoveryState(config); - } - /** * Tests that clients receive updates after recovery by a new leader. */ diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java index 440cfff9ad1a6..e38fab4f6f4f6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/minicluster/LocalFlinkMiniClusterITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.JobManagerMessages; @@ -75,7 +76,7 @@ public void testLocalFlinkMiniClusterWithMultipleTaskManagers() { final ActorGateway jmGateway = miniCluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); new JavaTestKit(system) {{ - final ActorGateway selfGateway = new AkkaActorGateway(getRef(), null); + final ActorGateway selfGateway = new AkkaActorGateway(getRef(), HighAvailabilityServices.DEFAULT_LEADER_ID); new Within(TestingUtils.TESTING_DURATION()) { diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java index 8534ba8e9bd02..a09c5b261ae85 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -93,7 +93,9 @@ public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Except Configuration flinkConfig = new Configuration(); YarnConfiguration yarnConfig = new YarnConfiguration(); - TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); String applicationMasterHostName = "localhost"; String webInterfaceURL = "foobar"; ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters(