Skip to content

Commit

Permalink
Report executor exit to framework schedulers.
Browse files Browse the repository at this point in the history
This is an MVP to start the work of notifying scheduler on scheduler
refresh. Next step would be sending this message reliabily, and/or
splitting Event::FAILURE for slave failure and executor termination.

Review: https://reviews.apache.org/r/40429/
  • Loading branch information
zhitaoli authored and adam-mesos committed Jan 7, 2016
1 parent 95406d6 commit 932df48
Show file tree
Hide file tree
Showing 13 changed files with 109 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG
Expand Up @@ -2,6 +2,7 @@ Release Notes - Mesos - Version 0.27.0 (WIP)
--------------------------------------------

** API Changes:
* [MESOS-313] - Report executor termination to framework schedulers.
* [MESOS-2315] - Removed deprecated CommandInfo::ContainerInfo.
* [MESOS-3988] - Implicit roles.

Expand Down
2 changes: 2 additions & 0 deletions docs/app-framework-development-guide.md
Expand Up @@ -114,6 +114,8 @@ virtual void slaveLost(SchedulerDriver* driver,
* Invoked when an executor has exited/terminated. Note that any
* tasks running will have TASK_LOST status updates automagically
* generated.
*
* NOTE: This callback is not reliably delivered.
*/
virtual void executorLost(SchedulerDriver* driver,
const ExecutorID& executorId,
Expand Down
3 changes: 2 additions & 1 deletion docs/upgrades.md
Expand Up @@ -12,6 +12,8 @@ This document serves as a guide for users who wish to upgrade an existing Mesos

* The Allocator API has changed due to the introduction of implicit roles. Custom allocator implementations will need to be updated. See [MESOS-4000](https://issues.apache.org/jira/browse/MESOS-4000) for more information.

* The `executorLost` callback in the Scheduler interface will now be called whenever the slave detects termination of a custom executor. This callback was never called in previous versions, so please make sure any framework schedulers can now safely handle this callback. Note that this callback may not be reliably delivered.

## Upgrading from 0.25.x to 0.26.x ##

**NOTE** The names of some TaskStatus::Reason enums have been changed. But the tag numbers remain unchanged, so it is backwards compatible. Frameworks using the new version might need to do some compile time adjustments:
Expand All @@ -27,7 +29,6 @@ On slaves, the affected `data` field was originally found via `executors[*].task

**NOTE** The `NetworkInfo` protobuf has been changed. The fields `protocol` and `ip_address` are now deprecated. The new field `ip_addresses` subsumes the information provided by them.


## Upgrading from 0.24.x to 0.25.x

**NOTE** The following endpoints will be deprecated in favor of new endpoints. Both versions will be available in 0.25 but the deprecated endpoints will be removed in a subsequent release.
Expand Down
1 change: 1 addition & 0 deletions include/mesos/scheduler.hpp
Expand Up @@ -149,6 +149,7 @@ class Scheduler
// Invoked when an executor has exited/terminated. Note that any
// tasks running will have TASK_LOST status updates automagically
// generated.
// NOTE: This callback is not reliably delivered.
virtual void executorLost(
SchedulerDriver* driver,
const ExecutorID& executorId,
Expand Down
2 changes: 2 additions & 0 deletions src/java/src/org/apache/mesos/Scheduler.java
Expand Up @@ -177,6 +177,8 @@ void frameworkMessage(SchedulerDriver driver,
* tasks running will have TASK_LOST status updates automagically
* generated.
*
* NOTE: This callback is not reliably delivered.
*
* @param driver The driver that was used to run this scheduler.
* @param executorId The ID of the executor that was lost.
* @param slaveId The ID of the slave that launched the executor.
Expand Down
1 change: 1 addition & 0 deletions src/python/interface/src/mesos/interface/__init__.py
Expand Up @@ -118,6 +118,7 @@ def executorLost(self, driver, executorId, slaveId, status):
"""
Invoked when an executor has exited/terminated. Note that any tasks
running will have TASK_LOST status updates automatically generated.
NOTE: This callback is not reliabily delivered.
"""

def error(self, driver, message):
Expand Down
56 changes: 52 additions & 4 deletions src/sched/sched.cpp
Expand Up @@ -204,6 +204,12 @@ class SchedulerProcess : public ProtobufProcess<SchedulerProcess>
&SchedulerProcess::lostSlave,
&LostSlaveMessage::slave_id);

install<ExitedExecutorMessage>(
&SchedulerProcess::lostExecutor,
&ExitedExecutorMessage::executor_id,
&ExitedExecutorMessage::slave_id,
&ExitedExecutorMessage::status);

install<ExecutorToFrameworkMessage>(
&SchedulerProcess::frameworkMessage,
&ExecutorToFrameworkMessage::slave_id,
Expand Down Expand Up @@ -583,10 +589,12 @@ class SchedulerProcess : public ProtobufProcess<SchedulerProcess>

if (event.failure().has_slave_id() &&
event.failure().has_executor_id()) {
// NOTE: We silently drop executor FAILURE messages
// because this matches the existing behavior of the
// scheduler driver: there is currently no install
// handler for ExitedExecutorMessage.
CHECK(event.failure().has_status());
lostExecutor(
from,
event.failure().executor_id(),
event.failure().slave_id(),
event.failure().status());
} else if (event.failure().has_slave_id()) {
lostSlave(from, event.failure().slave_id());
} else {
Expand Down Expand Up @@ -992,6 +1000,46 @@ class SchedulerProcess : public ProtobufProcess<SchedulerProcess>
VLOG(1) << "Scheduler::slaveLost took " << stopwatch.elapsed();
}

void lostExecutor(
const UPID& from,
const ExecutorID& executorId,
const SlaveID& slaveId,
int32_t status)
{
if (!running.load()) {
VLOG(1)
<< "Ignoring lost executor message because the driver is not running!";
return;
}

if (!connected) {
VLOG(1)
<< "Ignoring lost executor message because the driver is disconnected!";
return;
}

CHECK_SOME(master);
if (from != master.get().pid()) {
VLOG(1) << "Ignoring lost executor message because it was sent "
<< "from '" << from << "' instead of the leading master '"
<< master.get().pid() << "'";
return;
}

VLOG(1)
<< "Executor " << executorId << " on slave " << slaveId
<< " exited with status " << status;

Stopwatch stopwatch;
if (FLAGS_v >= 1) {
stopwatch.start();
}

scheduler->executorLost(driver, executorId, slaveId, status);

VLOG(1) << "Scheduler::executorLost took " << stopwatch.elapsed();
}

void frameworkMessage(
const SlaveID& slaveId,
const ExecutorID& executorId,
Expand Down
16 changes: 8 additions & 8 deletions src/tests/fault_tolerance_tests.cpp
Expand Up @@ -286,17 +286,16 @@ TEST_F(FaultToleranceTest, ReregisterCompletedFrameworks)
Future<Nothing> executorTerminated =
FUTURE_DISPATCH(_, &Slave::executorTerminated);

Future<Nothing> executorLost;
EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _))
.WillOnce(FutureSatisfy(&executorLost));
// Induce an ExitedExecutorMessage from the slave.
containerizer.destroy(
frameworkId.get(), DEFAULT_EXECUTOR_INFO.executor_id());

AWAIT_READY(executorTerminated);

// Slave should consider the framework completed after it executes
// "executorTerminated".
Clock::pause();
Clock::settle();
Clock::resume();
AWAIT_READY(executorLost);

// Verify slave sees completed framework.
slaveState = process::http::get(slave.get(), "state");
Expand Down Expand Up @@ -1712,14 +1711,15 @@ TEST_F(FaultToleranceTest, FrameworkReregisterEmptyExecutor)
FUTURE_PROTOBUF(ExitedExecutorMessage(), slave.get(), master.get());

// Now kill the executor.
Future<Nothing> executorLost;
EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _))
.WillOnce(FutureSatisfy(&executorLost));
containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);

// Ensure the master correctly handles the exited executor
// with no tasks!
AWAIT_READY(executorExitedMessage);
Clock::pause();
Clock::settle();
Clock::resume();
AWAIT_READY(executorLost);

driver.stop();
driver.join();
Expand Down
6 changes: 6 additions & 0 deletions src/tests/gc_tests.cpp
Expand Up @@ -547,6 +547,8 @@ TEST_F(GarbageCollectorIntegrationTest, ExitedExecutor)
EXPECT_CALL(sched, statusUpdate(_, _))
.Times(AtMost(1)); // Ignore TASK_LOST from killed executor.

EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));

// Kill the executor and inform the slave.
containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);

Expand Down Expand Up @@ -646,6 +648,8 @@ TEST_F(GarbageCollectorIntegrationTest, DiskUsage)
EXPECT_CALL(sched, statusUpdate(_, _))
.Times(AtMost(1)); // Ignore TASK_LOST from killed executor.

EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, slaveId, _));

// Kill the executor and inform the slave.
containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);

Expand Down Expand Up @@ -790,6 +794,8 @@ TEST_F(GarbageCollectorIntegrationTest, Unschedule)
EXPECT_CALL(exec2, launchTask(_, _))
.WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));

EXPECT_CALL(sched, executorLost(&driver, exec1.id, _, _));

Clock::pause();

// Kill the first executor.
Expand Down
2 changes: 2 additions & 0 deletions src/tests/master_slave_reconciliation_tests.cpp
Expand Up @@ -134,6 +134,8 @@ TEST_F(MasterSlaveReconciliationTest, SlaveReregisterTerminatedExecutor)
// Ensure the update was sent.
AWAIT_READY(statusUpdateMessage);

EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));

// Now kill the executor.
containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);

Expand Down
2 changes: 2 additions & 0 deletions src/tests/master_tests.cpp
Expand Up @@ -731,6 +731,8 @@ TEST_F(MasterTest, RecoverResources)
EXPECT_CALL(exec, shutdown(_))
.Times(AtMost(1));

EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));

// Now kill the executor, scheduler should get an offer it's resources.
containerizer.destroy(offer.framework_id(), executorInfo.executor_id());

Expand Down
16 changes: 13 additions & 3 deletions src/tests/scheduler_event_call_tests.cpp
Expand Up @@ -584,18 +584,28 @@ TEST_F(SchedulerDriverEventTest, Failure)
AWAIT_READY(frameworkRegisteredMessage);
UPID frameworkPid = frameworkRegisteredMessage.get().to;

// Send a failure for an executor, this should be dropped
// to match the existing behavior of the scheduler driver.
// Send a failure for an executor, which should trigger executorLost callback.
SlaveID slaveId;
slaveId.set_value("S");

ExecutorID executorId = DEFAULT_EXECUTOR_ID;

const int32_t status = 255;

Event event;
event.set_type(Event::FAILURE);
event.mutable_failure()->mutable_slave_id()->CopyFrom(slaveId);
event.mutable_failure()->mutable_executor_id()->set_value("E");
event.mutable_failure()->mutable_executor_id()->CopyFrom(executorId);
event.mutable_failure()->set_status(status);

Future<Nothing> executorLost;
EXPECT_CALL(sched, executorLost(&driver, executorId, slaveId, status))
.WillOnce(FutureSatisfy(&executorLost));

process::post(master.get(), frameworkPid, event);

AWAIT_READY(executorLost);

// Now, post a failure for a slave and expect a 'slaveLost'.
event.mutable_failure()->clear_executor_id();

Expand Down
17 changes: 17 additions & 0 deletions src/tests/slave_tests.cpp
Expand Up @@ -293,6 +293,7 @@ TEST_F(SlaveTest, RemoveUnregisteredTerminatedExecutor)
Future<Nothing> schedule =
FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);

EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));
// Now kill the executor.
containerizer.destroy(offers.get()[0].framework_id(), DEFAULT_EXECUTOR_ID);

Expand Down Expand Up @@ -1133,6 +1134,9 @@ TEST_F(SlaveTest, MetricsSlaveLaunchErrors)

EXPECT_CALL(sched, statusUpdate(&driver, _));

// The above injected containerizer failure also triggers executorLost.
EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));

// Try to start a task
TaskInfo task = createTask(
offer.slave_id(),
Expand Down Expand Up @@ -1396,6 +1400,10 @@ TEST_F(SlaveTest, TerminatingSlaveDoesNotReregister)
// stay in TERMINATING for a while.
DROP_PROTOBUFS(ShutdownExecutorMessage(), slave.get(), _);

Future<Nothing> executorLost;
EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _))
.WillOnce(FutureSatisfy(&executorLost));

// Send a ShutdownMessage instead of calling Stop() directly
// to avoid blocking.
post(master.get(), slave.get(), ShutdownMessage());
Expand All @@ -1405,6 +1413,8 @@ TEST_F(SlaveTest, TerminatingSlaveDoesNotReregister)
Clock::settle();
Clock::resume();

AWAIT_READY(executorLost);

// Clean up.
driver.stop();
driver.join();
Expand Down Expand Up @@ -1491,6 +1501,10 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
EXPECT_CALL(exec, killTask(_, _))
.WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));

Future<Nothing> executorLost;
EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _))
.WillOnce(FutureSatisfy(&executorLost));

// Kill one of the tasks. The failed update should result in the
// second task going lost when the container is destroyed.
driver.killTask(tasks[0].task_id());
Expand All @@ -1504,6 +1518,8 @@ TEST_F(SlaveTest, TerminalTaskContainerizerUpdateFails)
EXPECT_EQ(TaskStatus::SOURCE_SLAVE, status4->source());
EXPECT_EQ(TaskStatus::REASON_CONTAINER_UPDATE_FAILED, status4->reason());

AWAIT_READY(executorLost);

driver.stop();
driver.join();

Expand Down Expand Up @@ -1607,6 +1623,7 @@ TEST_F(SlaveTest, TaskLaunchContainerizerUpdateFails)
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
EXPECT_CALL(sched, executorLost(&driver, DEFAULT_EXECUTOR_ID, _, _));

driver.start();

Expand Down

0 comments on commit 932df48

Please sign in to comment.