Skip to content

Commit

Permalink
[FLINK-5971] [flip-6] Add timeout for registered jobs on the Resource…
Browse files Browse the repository at this point in the history
…Manager

This PR introduces a timeout for inactive jobs on the ResourceManager. A job is inactive
if there is no active leader known for this job. In case that a job times out, it will
be removed from the ResourceManager. Additionally, this PR removes the dependency of
the JobLeaderIdService on the RunningJobsRegistry.

Fix YarnFlinkApplicationMasterRunner to use correct arguments for JobLeaderIdService

Fix race condition in JobLeaderIdListener#cancelTimeout

This closes #3488.
  • Loading branch information
tillrohrmann committed Mar 13, 2017
1 parent 04aee61 commit fcd264a
Show file tree
Hide file tree
Showing 14 changed files with 498 additions and 70 deletions.
Expand Up @@ -28,6 +28,13 @@
@PublicEvolving @PublicEvolving
public class AkkaOptions { public class AkkaOptions {


/**
* Timeout for akka ask calls
*/
public static final ConfigOption<String> AKKA_ASK_TIMEOUT = ConfigOptions
.key("akka.ask.timeout")
.defaultValue("10 s");

/** /**
* The Akka tcp connection timeout. * The Akka tcp connection timeout.
*/ */
Expand Down
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.configuration;

import org.apache.flink.annotation.PublicEvolving;

/**
* The set of configuration options relating to the ResourceManager
*/
@PublicEvolving
public class ResourceManagerOptions {

/**
* Timeout for jobs which don't have a job manager as leader assigned.
*/
public static final ConfigOption<String> JOB_TIMEOUT = ConfigOptions
.key("resourcemanager.job.timeout")
.defaultValue("5 minutes");

// ---------------------------------------------------------------------------------------------

/** Not intended to be instantiated */
private ResourceManagerOptions() {}
}
Expand Up @@ -36,11 +36,13 @@ public interface JobLeaderIdActions {
void jobLeaderLostLeadership(JobID jobId, UUID oldJobLeaderId); void jobLeaderLostLeadership(JobID jobId, UUID oldJobLeaderId);


/** /**
* Request to remove the job from the {@link JobLeaderIdService}. * Notify a job timeout. The job is identified by the given JobID. In order to check
* for the validity of the timeout the timeout id of the triggered timeout is provided.
* *
* @param jobId identifying the job to remove * @param jobId JobID which identifies the timed out job
* @param timeoutId Id of the calling timeout to differentiate valid from invalid timeouts
*/ */
void removeJob(JobID jobId); void notifyJobTimeout(JobID jobId, UUID timeoutId);


/** /**
* Callback to report occurring errors. * Callback to report occurring errors.
Expand Down
Expand Up @@ -19,24 +19,27 @@
package org.apache.flink.runtime.resourcemanager; package org.apache.flink.runtime.resourcemanager;


import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry.JobSchedulingStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.IOException; import javax.annotation.Nullable;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;


/** /**
* Service which retrieves for a registered job the current job leader id (the leader id of the * Service which retrieves for a registered job the current job leader id (the leader id of the
Expand All @@ -51,19 +54,23 @@ public class JobLeaderIdService {
/** High availability services to use by this service */ /** High availability services to use by this service */
private final HighAvailabilityServices highAvailabilityServices; private final HighAvailabilityServices highAvailabilityServices;


/** Registry to retrieve running jobs */ private final ScheduledExecutor scheduledExecutor;
private final RunningJobsRegistry runningJobsRegistry;
private final Time jobTimeout;


/** Map of currently monitored jobs */ /** Map of currently monitored jobs */
private final Map<JobID, JobLeaderIdListener> jobLeaderIdListeners; private final Map<JobID, JobLeaderIdListener> jobLeaderIdListeners;


/** Actions to call when the job leader changes */ /** Actions to call when the job leader changes */
private JobLeaderIdActions jobLeaderIdActions; private JobLeaderIdActions jobLeaderIdActions;


public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) throws Exception { public JobLeaderIdService(
this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); HighAvailabilityServices highAvailabilityServices,

ScheduledExecutor scheduledExecutor,
this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry(); Time jobTimeout) throws Exception {
this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices, "highAvailabilityServices");
this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor, "scheduledExecutor");
this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout");


jobLeaderIdListeners = new HashMap<>(4); jobLeaderIdListeners = new HashMap<>(4);


Expand Down Expand Up @@ -142,8 +149,8 @@ public void addJob(JobID jobId) throws Exception {
if (!jobLeaderIdListeners.containsKey(jobId)) { if (!jobLeaderIdListeners.containsKey(jobId)) {
LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(jobId); LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(jobId);


JobLeaderIdListener jobidListener = new JobLeaderIdListener(jobId, jobLeaderIdActions, leaderRetrievalService); JobLeaderIdListener jobIdListener = new JobLeaderIdListener(jobId, jobLeaderIdActions, leaderRetrievalService);
jobLeaderIdListeners.put(jobId, jobidListener); jobLeaderIdListeners.put(jobId, jobIdListener);
} }
} }


Expand Down Expand Up @@ -183,6 +190,16 @@ public Future<UUID> getLeaderId(JobID jobId) throws Exception {
return listener.getLeaderIdFuture(); return listener.getLeaderIdFuture();
} }


public boolean isValidTimeout(JobID jobId, UUID timeoutId) {
JobLeaderIdListener jobLeaderIdListener = jobLeaderIdListeners.get(jobId);

if (null != jobLeaderIdListener) {
return Objects.equals(timeoutId, jobLeaderIdListener.getTimeoutId());
} else {
return false;
}
}

// -------------------------------------------------------------------------------- // --------------------------------------------------------------------------------
// Static utility classes // Static utility classes
// -------------------------------------------------------------------------------- // --------------------------------------------------------------------------------
Expand All @@ -193,13 +210,23 @@ public Future<UUID> getLeaderId(JobID jobId) throws Exception {
* listener. * listener.
*/ */
private final class JobLeaderIdListener implements LeaderRetrievalListener { private final class JobLeaderIdListener implements LeaderRetrievalListener {
private final Object timeoutLock = new Object();
private final JobID jobId; private final JobID jobId;
private final JobLeaderIdActions listenerJobLeaderIdActions; private final JobLeaderIdActions listenerJobLeaderIdActions;
private final LeaderRetrievalService leaderRetrievalService; private final LeaderRetrievalService leaderRetrievalService;


private volatile CompletableFuture<UUID> leaderIdFuture; private volatile CompletableFuture<UUID> leaderIdFuture;
private volatile boolean running = true; private volatile boolean running = true;


/** Null if no timeout has been scheduled; otherwise non null */
@Nullable
private volatile ScheduledFuture<?> timeoutFuture;

/** Null if no timeout has been scheduled; otherwise non null */
@Nullable
private volatile UUID timeoutId;


private JobLeaderIdListener( private JobLeaderIdListener(
JobID jobId, JobID jobId,
JobLeaderIdActions listenerJobLeaderIdActions, JobLeaderIdActions listenerJobLeaderIdActions,
Expand All @@ -210,6 +237,8 @@ private JobLeaderIdListener(


leaderIdFuture = new FlinkCompletableFuture<>(); leaderIdFuture = new FlinkCompletableFuture<>();


activateTimeout();

// start the leader service we're listening to // start the leader service we're listening to
leaderRetrievalService.start(this); leaderRetrievalService.start(this);
} }
Expand All @@ -218,9 +247,15 @@ public Future<UUID> getLeaderIdFuture() {
return leaderIdFuture; return leaderIdFuture;
} }


@Nullable
public UUID getTimeoutId() {
return timeoutId;
}

public void stop() throws Exception { public void stop() throws Exception {
running = false; running = false;
leaderRetrievalService.stop(); leaderRetrievalService.stop();
cancelTimeout();
leaderIdFuture.completeExceptionally(new Exception("Job leader id service has been stopped.")); leaderIdFuture.completeExceptionally(new Exception("Job leader id service has been stopped."));
} }


Expand All @@ -244,29 +279,22 @@ public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionId) {
leaderIdFuture.complete(leaderSessionId); leaderIdFuture.complete(leaderSessionId);
} }


try { if (previousJobLeaderId != null && !previousJobLeaderId.equals(leaderSessionId)) {
final JobSchedulingStatus jobStatus = runningJobsRegistry.getJobSchedulingStatus(jobId); // we had a previous job leader, so notify about his lost leadership
if (jobStatus == JobSchedulingStatus.PENDING || jobStatus == JobSchedulingStatus.RUNNING) { listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId);
if (leaderSessionId == null) {
// there is no new leader if (null == leaderSessionId) {
if (previousJobLeaderId != null) { // No current leader active ==> Set a timeout for the job
// we had a previous job leader, so notify about his lost leadership activateTimeout();
listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId);
} // check if we got stopped asynchronously
} else { if (!running) {
if (previousJobLeaderId != null && !leaderSessionId.equals(previousJobLeaderId)) { cancelTimeout();
// we had a previous leader and he's not the same as the new leader
listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId);
}
} }
} else {
// the job is no longer running so remove it
listenerJobLeaderIdActions.removeJob(jobId);
} }
} catch (IOException e) { } else if (null != leaderSessionId) {
// cannot tell whether the job is still running or not so just remove the listener // Cancel timeout because we've found an active leader for it
LOG.debug("Encountered an error while checking the job registry for running jobs.", e); cancelTimeout();
listenerJobLeaderIdActions.removeJob(jobId);
} }
} else { } else {
LOG.debug("A leader id change {}@{} has been detected after the listener has been stopped.", LOG.debug("A leader id change {}@{} has been detected after the listener has been stopped.",
Expand All @@ -283,5 +311,32 @@ public void handleError(Exception exception) {
JobLeaderIdListener.class.getSimpleName(), exception); JobLeaderIdListener.class.getSimpleName(), exception);
} }
} }

private void activateTimeout() {
synchronized (timeoutLock) {
cancelTimeout();

final UUID newTimeoutId = UUID.randomUUID();

timeoutId = newTimeoutId;
timeoutFuture = scheduledExecutor.schedule(new Runnable() {
@Override
public void run() {
listenerJobLeaderIdActions.notifyJobTimeout(jobId, newTimeoutId);
}
}, jobTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}
}

private void cancelTimeout() {
synchronized (timeoutLock) {
if (timeoutFuture != null) {
timeoutFuture.cancel(true);
}

timeoutFuture = null;
timeoutId = null;
}
}
} }
} }
Expand Up @@ -817,11 +817,13 @@ public void jobLeaderLostLeadership(final JobID jobId, final UUID oldJobLeaderId
} }


@Override @Override
public void removeJob(final JobID jobId) { public void notifyJobTimeout(final JobID jobId, final UUID timeoutId) {
runAsync(new Runnable() { runAsync(new Runnable() {
@Override @Override
public void run() { public void run() {
ResourceManager.this.removeJob(jobId); if (jobLeaderIdService.isValidTimeout(jobId, timeoutId)) {
removeJob(jobId);
}
} }
}); });
} }
Expand Down
Expand Up @@ -19,10 +19,9 @@
package org.apache.flink.runtime.resourcemanager; package org.apache.flink.runtime.resourcemanager;


import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
Expand All @@ -34,10 +33,15 @@ public class ResourceManagerConfiguration {


private final Time timeout; private final Time timeout;
private final Time heartbeatInterval; private final Time heartbeatInterval;
private final Time jobTimeout;


public ResourceManagerConfiguration(Time timeout, Time heartbeatInterval) { public ResourceManagerConfiguration(
this.timeout = Preconditions.checkNotNull(timeout); Time timeout,
this.heartbeatInterval = Preconditions.checkNotNull(heartbeatInterval); Time heartbeatInterval,
Time jobTimeout) {
this.timeout = Preconditions.checkNotNull(timeout, "timeout");
this.heartbeatInterval = Preconditions.checkNotNull(heartbeatInterval, "heartbeatInterval");
this.jobTimeout = Preconditions.checkNotNull(jobTimeout, "jobTimeout");
} }


public Time getTimeout() { public Time getTimeout() {
Expand All @@ -48,39 +52,45 @@ public Time getHeartbeatInterval() {
return heartbeatInterval; return heartbeatInterval;
} }


public Time getJobTimeout() {
return jobTimeout;
}

// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
// Static factory methods // Static factory methods
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------


public static ResourceManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException { public static ResourceManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException {
ConfigOption<String> timeoutOption = ConfigOptions final String strTimeout = configuration.getString(AkkaOptions.AKKA_ASK_TIMEOUT);
.key(ConfigConstants.AKKA_ASK_TIMEOUT)
.defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);

final String strTimeout = configuration.getString(timeoutOption);
final Time timeout; final Time timeout;


try { try {
timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis()); timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis());
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
throw new ConfigurationException("Could not parse the resource manager's timeout " + throw new ConfigurationException("Could not parse the resource manager's timeout " +
"value " + timeoutOption + '.', e); "value " + AkkaOptions.AKKA_ASK_TIMEOUT + '.', e);
} }


ConfigOption<String> heartbeatIntervalOption = ConfigOptions final String strHeartbeatInterval = configuration.getString(AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL);
.key(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL)
.defaultValue(timeout.toString());

final String strHeartbeatInterval = configuration.getString(heartbeatIntervalOption);
final Time heartbeatInterval; final Time heartbeatInterval;


try { try {
heartbeatInterval = Time.milliseconds(Duration.apply(strHeartbeatInterval).toMillis()); heartbeatInterval = Time.milliseconds(Duration.apply(strHeartbeatInterval).toMillis());
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
throw new ConfigurationException("Could not parse the resource manager's heartbeat interval " + throw new ConfigurationException("Could not parse the resource manager's heartbeat interval " +
"value " + timeoutOption + '.', e); "value " + AkkaOptions.AKKA_WATCH_HEARTBEAT_INTERVAL + '.', e);
}

final String strJobTimeout = configuration.getString(ResourceManagerOptions.JOB_TIMEOUT);
final Time jobTimeout;

try {
jobTimeout = Time.milliseconds(Duration.apply(strJobTimeout).toMillis());
} catch (NumberFormatException e) {
throw new ConfigurationException("Could not parse the resource manager's job timeout " +
"value " + ResourceManagerOptions.JOB_TIMEOUT + '.', e);
} }


return new ResourceManagerConfiguration(timeout, heartbeatInterval); return new ResourceManagerConfiguration(timeout, heartbeatInterval, jobTimeout);
} }
} }

0 comments on commit fcd264a

Please sign in to comment.