diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp index fe0627cb5fe..f2c8be45640 100644 --- a/src/docker/docker.cpp +++ b/src/docker/docker.cpp @@ -96,6 +96,30 @@ static Future checkError(const string& cmd, const Subprocess& s) } +Try Docker::validateVersion(const Version& minVersion) const +{ + // Validate the version (and that we can use Docker at all). + Future version = this->version(); + + if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) { + return Error("Timed out getting docker version"); + } + + if (version.isFailed()) { + return Error("Failed to get docker version: " + version.failure()); + } + + if (version.get() < minVersion) { + string msg = "Insufficient version '" + stringify(version.get()) + + "' of Docker. Please upgrade to >=' " + + stringify(minVersion) + "'"; + return Error(msg); + } + + return Nothing(); +} + + Try Docker::create(const string& path, bool validate) { Docker* docker = new Docker(path); @@ -116,22 +140,10 @@ Try Docker::create(const string& path, bool validate) } #endif // __linux__ - // Validate the version (and that we can use Docker at all). - Future version = docker->version(); - - if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) { - delete docker; - return Error("Timed out getting docker version"); - } - - if (version.isFailed()) { - delete docker; - return Error("Failed to get docker version: " + version.failure()); - } - - if (version.get() < Version(1, 0, 0)) { + Try validateVersion = docker->validateVersion(Version(1, 0, 0)); + if (validateVersion.isError()) { delete docker; - return Error("Insufficient version of Docker. Please upgrade to >= 1.0.0"); + return Error(validateVersion.error()); } return docker; diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp index fbae7bd382b..c143f1ea2fd 100644 --- a/src/docker/docker.hpp +++ b/src/docker/docker.hpp @@ -139,6 +139,14 @@ class Docker const std::string& image, bool force = false) const; + // Validate current docker version is not less than minVersion. + virtual Try validateVersion(const Version& minVersion) const; + + virtual std::string getPath() + { + return path; + } + protected: // Uses the specified path to the Docker CLI tool. Docker(const std::string& _path) : path(_path) {}; diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp index cdcd8ee7ad0..27ad1280230 100644 --- a/src/docker/executor.cpp +++ b/src/docker/executor.cpp @@ -30,6 +30,7 @@ #include #include +#include #include #include "common/status_utils.hpp" @@ -40,10 +41,13 @@ #include "logging/flags.hpp" #include "logging/logging.hpp" +#include "messages/messages.hpp" + using std::cerr; using std::cout; using std::endl; using std::string; +using std::vector; namespace mesos { namespace internal { @@ -70,8 +74,12 @@ class DockerExecutorProcess : public ProtobufProcess const string& containerName, const string& sandboxDirectory, const string& mappedDirectory, - const Duration& stopTimeout) + const Duration& stopTimeout, + const string& healthCheckDir) : killed(false), + killedByHealthCheck(false), + healthPid(-1), + healthCheckDir(healthCheckDir), docker(docker), containerName(containerName), sandboxDirectory(sandboxDirectory), @@ -163,12 +171,19 @@ class DockerExecutorProcess : public ProtobufProcess return Nothing(); })); + + inspect.onReady( + defer(self(), &Self::launchHealthCheck, containerName, task)); } void killTask(ExecutorDriver* driver, const TaskID& taskId) { cout << "Killing docker task" << endl; shutdown(driver); + if (healthPid != -1) { + // Cleanup health check process. + ::kill(healthPid, SIGKILL); + } } void frameworkMessage(ExecutorDriver* driver, const string& data) {} @@ -191,6 +206,40 @@ class DockerExecutorProcess : public ProtobufProcess void error(ExecutorDriver* driver, const string& message) {} +protected: + virtual void initialize() + { + install( + &Self::taskHealthUpdated, + &TaskHealthStatus::task_id, + &TaskHealthStatus::healthy, + &TaskHealthStatus::kill_task); + } + + void taskHealthUpdated( + const TaskID& taskID, + const bool& healthy, + const bool& initiateTaskKill) + { + if (driver.isNone()) { + return; + } + + cout << "Received task health update, healthy: " + << stringify(healthy) << endl; + + TaskStatus status; + status.mutable_task_id()->CopyFrom(taskID); + status.set_healthy(healthy); + status.set_state(TASK_RUNNING); + driver.get()->sendStatusUpdate(status); + + if (initiateTaskKill) { + killedByHealthCheck = true; + killTask(driver.get(), taskID); + } + } + private: void reaped( ExecutorDriver* _driver, @@ -228,6 +277,9 @@ class DockerExecutorProcess : public ProtobufProcess taskStatus.mutable_task_id()->CopyFrom(taskId); taskStatus.set_state(state); taskStatus.set_message(message); + if (killed && killedByHealthCheck) { + taskStatus.set_healthy(false); + } driver.get()->sendStatusUpdate(taskStatus); @@ -242,7 +294,104 @@ class DockerExecutorProcess : public ProtobufProcess })); } + void launchHealthCheck(const string& containerName, const TaskInfo& task) + { + if (!killed && task.has_health_check()) { + HealthCheck healthCheck = task.health_check(); + + // Wrap the original health check command in "docker exec". + if (healthCheck.has_command()) { + CommandInfo command = healthCheck.command(); + + // "docker exec" require docker version greater than 1.3.0. + Try validateVersion = + docker->validateVersion(Version(1, 3, 0)); + if (validateVersion.isError()) { + cerr << "Unable to launch health process: " + << validateVersion.error() << endl; + return; + } + + vector argv; + argv.push_back(docker->getPath()); + argv.push_back("exec"); + argv.push_back(containerName); + + if (command.shell()) { + if (!command.has_value()) { + cerr << "Unable to launch health process: " + << "Shell command is not specified." << endl; + return; + } + + argv.push_back("sh"); + argv.push_back("-c"); + argv.push_back("\""); + argv.push_back(command.value()); + argv.push_back("\""); + } else { + if (!command.has_value()) { + cerr << "Unable to launch health process: " + << "Executable path is not specified." << endl; + return; + } + + argv.push_back(command.value()); + foreach (const string& argument, command.arguments()) { + argv.push_back(argument); + } + } + + command.set_shell(true); + command.clear_arguments(); + command.set_value(strings::join(" ", argv)); + healthCheck.mutable_command()->CopyFrom(command); + } else { + cerr << "Unable to launch health process: " + << "Only command health check is supported now." << endl; + return; + } + + JSON::Object json = JSON::Protobuf(healthCheck); + + // Launch the subprocess using 'exec' style so that quotes can + // be properly handled. + vector argv; + string path = path::join(healthCheckDir, "mesos-health-check"); + argv.push_back(path); + argv.push_back("--executor=" + stringify(self())); + argv.push_back("--health_check_json=" + stringify(json)); + argv.push_back("--task_id=" + task.task_id().value()); + + string cmd = strings::join(" ", argv); + cout << "Launching health check process: " << cmd << endl; + + Try healthProcess = + process::subprocess( + path, + argv, + // Intentionally not sending STDIN to avoid health check + // commands that expect STDIN input to block. + Subprocess::PATH("/dev/null"), + Subprocess::FD(STDOUT_FILENO), + Subprocess::FD(STDERR_FILENO)); + + if (healthProcess.isError()) { + cerr << "Unable to launch health process: " + << healthProcess.error() << endl; + } else { + healthPid = healthProcess.get().pid(); + + cout << "Health check process launched at pid: " + << stringify(healthPid) << endl; + } + } + } + bool killed; + bool killedByHealthCheck; + pid_t healthPid; + string healthCheckDir; Owned docker; string containerName; string sandboxDirectory; @@ -263,14 +412,16 @@ class DockerExecutor : public Executor const string& container, const string& sandboxDirectory, const string& mappedDirectory, - const Duration& stopTimeout) + const Duration& stopTimeout, + const string& healthCheckDir) { process = Owned(new DockerExecutorProcess( docker, container, sandboxDirectory, mappedDirectory, - stopTimeout)); + stopTimeout, + healthCheckDir)); spawn(process.get()); } @@ -407,12 +558,18 @@ int main(int argc, char** argv) return EXIT_FAILURE; } + const Option envPath = os::getenv("MESOS_LAUNCHER_DIR"); + string path = + envPath.isSome() ? envPath.get() + : os::realpath(Path(argv[0]).dirname()).get(); + mesos::internal::docker::DockerExecutor executor( process::Owned(docker.get()), flags.container.get(), flags.sandbox_directory.get(), flags.mapped_directory.get(), - flags.stop_timeout.get()); + flags.stop_timeout.get(), + path); mesos::MesosExecutorDriver driver(&executor); return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE; diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index cfb60177fe4..9e57245768b 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -129,18 +129,10 @@ Try DockerContainerizer::create( Shared docker(create.get()); if (flags.docker_mesos_image.isSome()) { - Future version = docker->version(); - if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) { - return Error("Timed out waiting for docker version"); - } - - if (version.isFailed()) { - return Error(version.failure()); - } - - if (version.get() < Version(1, 5, 0)) { - string message = "Docker with mesos images requires docker 1.5+, found "; - message += stringify(version.get()); + Try validateResult = docker->validateVersion(Version(1, 5, 0)); + if (validateResult.isError()) { + string message = "Docker with mesos images requires docker 1.5+"; + message += validateResult.error(); return Error(message); } } diff --git a/src/tests/health_check_tests.cpp b/src/tests/health_check_tests.cpp index 157a56aa066..2405917751a 100644 --- a/src/tests/health_check_tests.cpp +++ b/src/tests/health_check_tests.cpp @@ -21,10 +21,14 @@ #include #include +#include #include +#include "docker/docker.hpp" + #include "slave/slave.hpp" +#include "slave/containerizer/docker.hpp" #include "slave/containerizer/fetcher.hpp" #include "tests/containerizer.hpp" @@ -35,6 +39,7 @@ using mesos::internal::master::Master; using mesos::internal::slave::Containerizer; +using mesos::internal::slave::DockerContainerizer; using mesos::internal::slave::Fetcher; using mesos::internal::slave::MesosContainerizer; using mesos::internal::slave::MesosContainerizerProcess; @@ -42,6 +47,7 @@ using mesos::internal::slave::Slave; using process::Clock; using process::Future; +using process::Owned; using process::PID; using testing::_; @@ -68,7 +74,8 @@ class HealthCheckTest : public MesosTest const Offer& offer, int gracePeriodSeconds = 0, const Option& consecutiveFailures = None(), - const Option >& env = None()) + const Option>& env = None(), + const Option& containerInfo = None()) { CommandInfo healthCommand; healthCommand.set_value(healthCmd); @@ -79,7 +86,8 @@ class HealthCheckTest : public MesosTest offer, gracePeriodSeconds, consecutiveFailures, - env); + env, + containerInfo); } vector populateTasks( @@ -88,7 +96,8 @@ class HealthCheckTest : public MesosTest const Offer& offer, int gracePeriodSeconds = 0, const Option& consecutiveFailures = None(), - const Option >& env = None()) + const Option>& env = None(), + const Option& containerInfo = None()) { TaskInfo task; task.set_name(""); @@ -109,6 +118,10 @@ class HealthCheckTest : public MesosTest task.mutable_command()->CopyFrom(command); + if (containerInfo.isSome()) { + task.mutable_container()->CopyFrom(containerInfo.get()); + } + HealthCheck healthCheck; if (env.isSome()) { @@ -238,6 +251,96 @@ TEST_F(HealthCheckTest, HealthyTask) } +// Testing a healthy task reporting one healthy status to scheduler for +// docker executor. +TEST_F(HealthCheckTest, ROOT_DOCKER_DockerHealthyTask) +{ + Owned docker(Docker::create(tests::flags.docker, false).get()); + Try validateResult = docker->validateVersion(Version(1, 3, 0)); + ASSERT_SOME(validateResult) + << "-------------------------------------------------------------\n" + << "We cannot run this test because of 'docker exec' command \n" + << "require docker version greater than '1.3.0'. You won't be \n" + << "able to use the docker exec method, but feel free to disable\n" + << "this test.\n" + << "-------------------------------------------------------------"; + + Try> master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + + Fetcher fetcher; + + Try containerizer = + DockerContainerizer::create(flags, &fetcher); + CHECK_SOME(containerizer); + + Try> slave = StartSlave(containerizer.get()); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)) + .Times(1); + + Future> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + ContainerInfo containerInfo; + containerInfo.set_type(ContainerInfo::DOCKER); + + // TODO(tnachen): Use local image to test if possible. + ContainerInfo::DockerInfo dockerInfo; + dockerInfo.set_image("busybox"); + containerInfo.mutable_docker()->CopyFrom(dockerInfo); + + vector tasks = populateTasks( + "sleep 120", "exit 0", offers.get()[0], 0, None(), None(), containerInfo); + + Future statusRunning; + Future statusHealth; + + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusRunning)) + .WillOnce(FutureArg<1>(&statusHealth)); + + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY(statusRunning); + EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + + AWAIT_READY(statusHealth); + EXPECT_EQ(TASK_RUNNING, statusHealth.get().state()); + EXPECT_TRUE(statusHealth.get().has_healthy()); + EXPECT_TRUE(statusHealth.get().healthy()); + + driver.stop(); + driver.join(); + + Shutdown(); + + Future> containers = + docker->ps(true, slave::DOCKER_NAME_PREFIX); + + AWAIT_READY(containers); + + // Cleanup all mesos launched containers. + foreach (const Docker::Container& container, containers.get()) { + AWAIT_READY_FOR(docker->rm(container.id, true), Seconds(30)); + } +} + + // Same as above, but use the non-shell version of the health command. TEST_F(HealthCheckTest, HealthyTaskNonShell) { @@ -395,6 +498,128 @@ TEST_F(HealthCheckTest, HealthStatusChange) } +// Testing health status change reporting to scheduler for docker executor. +TEST_F(HealthCheckTest, ROOT_DOCKER_DockerHealthStatusChange) +{ + Owned docker(Docker::create(tests::flags.docker, false).get()); + Try validateResult = docker->validateVersion(Version(1, 3, 0)); + ASSERT_SOME(validateResult) + << "-------------------------------------------------------------\n" + << "We cannot run this test because of 'docker exec' command \n" + << "require docker version greater than '1.3.0'. You won't be \n" + << "able to use the docker exec method, but feel free to disable\n" + << "this test.\n" + << "-------------------------------------------------------------"; + + Try> master = StartMaster(); + ASSERT_SOME(master); + + slave::Flags flags = CreateSlaveFlags(); + + Fetcher fetcher; + + Try containerizer = + DockerContainerizer::create(flags, &fetcher); + CHECK_SOME(containerizer); + + Try> slave = StartSlave(containerizer.get()); + ASSERT_SOME(slave); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + ContainerInfo containerInfo; + containerInfo.set_type(ContainerInfo::DOCKER); + + // TODO(tnachen): Use local image to test if possible. + ContainerInfo::DockerInfo dockerInfo; + dockerInfo.set_image("busybox"); + containerInfo.mutable_docker()->CopyFrom(dockerInfo); + + // Create a temporary file in host and then we could this file to make sure + // the health check command is run in docker container. + string tmpPath = path::join(os::getcwd(), "foobar"); + ASSERT_SOME(os::write(tmpPath, "bar")); + + // This command fails every other invocation. + // For all runs i in Nat0, the following case i % 2 applies: + // + // Case 0: + // - Attempt to remove the nonexistent temporary file. + // - Create the temporary file. + // - Exit with a non-zero status. + // + // Case 1: + // - Remove the temporary file. + string alt = "rm " + tmpPath + " || (mkdir -p " + os::getcwd() + + " && echo foo >" + tmpPath + " && exit 1)"; + + vector tasks = populateTasks( + "sleep 120", alt, offers.get()[0], 0, 3, None(), containerInfo); + + Future statusRunning; + Future statusHealth1; + Future statusHealth2; + Future statusHealth3; + + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusRunning)) + .WillOnce(FutureArg<1>(&statusHealth1)) + .WillOnce(FutureArg<1>(&statusHealth2)) + .WillOnce(FutureArg<1>(&statusHealth3)); + + driver.launchTasks(offers.get()[0].id(), tasks); + + AWAIT_READY(statusRunning); + EXPECT_EQ(TASK_RUNNING, statusRunning.get().state()); + + AWAIT_READY(statusHealth1); + EXPECT_EQ(TASK_RUNNING, statusHealth1.get().state()); + EXPECT_FALSE(statusHealth1.get().healthy()); + + AWAIT_READY(statusHealth2); + EXPECT_EQ(TASK_RUNNING, statusHealth2.get().state()); + EXPECT_TRUE(statusHealth2.get().healthy()); + + AWAIT_READY(statusHealth3); + EXPECT_EQ(TASK_RUNNING, statusHealth3.get().state()); + EXPECT_FALSE(statusHealth3.get().healthy()); + + // Check the temporary file created in host still exists and the content + // don't change. + ASSERT_SOME(os::read(tmpPath)); + EXPECT_EQ("bar", os::read(tmpPath).get()); + + driver.stop(); + driver.join(); + + Shutdown(); + + Future> containers = + docker->ps(true, slave::DOCKER_NAME_PREFIX); + + AWAIT_READY(containers); + + // Cleanup all mesos launched containers. + foreach (const Docker::Container& container, containers.get()) { + AWAIT_READY_FOR(docker->rm(container.id, true), Seconds(30)); + } +} + + // Testing killing task after number of consecutive failures. // Temporarily disabled due to MESOS-1613. TEST_F(HealthCheckTest, DISABLED_ConsecutiveFailures)