Skip to content

Commit

Permalink
Added health check support for docker command tasks.
Browse files Browse the repository at this point in the history
  • Loading branch information
haosdent authored and adam-mesos committed Sep 19, 2015
1 parent c22ef05 commit 09e367c
Show file tree
Hide file tree
Showing 5 changed files with 428 additions and 34 deletions.
42 changes: 27 additions & 15 deletions src/docker/docker.cpp
Expand Up @@ -96,6 +96,30 @@ static Future<Nothing> checkError(const string& cmd, const Subprocess& s)
}


Try<Nothing> Docker::validateVersion(const Version& minVersion) const
{
// Validate the version (and that we can use Docker at all).
Future<Version> version = this->version();

if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) {
return Error("Timed out getting docker version");
}

if (version.isFailed()) {
return Error("Failed to get docker version: " + version.failure());
}

if (version.get() < minVersion) {
string msg = "Insufficient version '" + stringify(version.get()) +
"' of Docker. Please upgrade to >=' " +
stringify(minVersion) + "'";
return Error(msg);
}

return Nothing();
}


Try<Docker*> Docker::create(const string& path, bool validate)
{
Docker* docker = new Docker(path);
Expand All @@ -116,22 +140,10 @@ Try<Docker*> Docker::create(const string& path, bool validate)
}
#endif // __linux__

// Validate the version (and that we can use Docker at all).
Future<Version> version = docker->version();

if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) {
delete docker;
return Error("Timed out getting docker version");
}

if (version.isFailed()) {
delete docker;
return Error("Failed to get docker version: " + version.failure());
}

if (version.get() < Version(1, 0, 0)) {
Try<Nothing> validateVersion = docker->validateVersion(Version(1, 0, 0));
if (validateVersion.isError()) {
delete docker;
return Error("Insufficient version of Docker. Please upgrade to >= 1.0.0");
return Error(validateVersion.error());
}

return docker;
Expand Down
8 changes: 8 additions & 0 deletions src/docker/docker.hpp
Expand Up @@ -149,6 +149,14 @@ class Docker
const std::string& image,
bool force = false) const;

// Validate current docker version is not less than minVersion.
virtual Try<Nothing> validateVersion(const Version& minVersion) const;

virtual std::string getPath()
{
return path;
}

protected:
// Uses the specified path to the Docker CLI tool.
Docker(const std::string& _path) : path(_path) {};
Expand Down
165 changes: 161 additions & 4 deletions src/docker/executor.cpp
Expand Up @@ -30,6 +30,7 @@
#include <process/owned.hpp>

#include <stout/flags.hpp>
#include <stout/protobuf.hpp>
#include <stout/os.hpp>

#include "common/status_utils.hpp"
Expand All @@ -40,10 +41,13 @@
#include "logging/flags.hpp"
#include "logging/logging.hpp"

#include "messages/messages.hpp"

using std::cerr;
using std::cout;
using std::endl;
using std::string;
using std::vector;

namespace mesos {
namespace internal {
Expand All @@ -70,8 +74,12 @@ class DockerExecutorProcess : public ProtobufProcess<DockerExecutorProcess>
const string& containerName,
const string& sandboxDirectory,
const string& mappedDirectory,
const Duration& stopTimeout)
const Duration& stopTimeout,
const string& healthCheckDir)
: killed(false),
killedByHealthCheck(false),
healthPid(-1),
healthCheckDir(healthCheckDir),
docker(docker),
containerName(containerName),
sandboxDirectory(sandboxDirectory),
Expand Down Expand Up @@ -168,12 +176,19 @@ class DockerExecutorProcess : public ProtobufProcess<DockerExecutorProcess>

return Nothing();
}));

inspect.onReady(
defer(self(), &Self::launchHealthCheck, containerName, task));
}

void killTask(ExecutorDriver* driver, const TaskID& taskId)
{
cout << "Killing docker task" << endl;
shutdown(driver);
if (healthPid != -1) {
// Cleanup health check process.
::kill(healthPid, SIGKILL);
}
}

void frameworkMessage(ExecutorDriver* driver, const string& data) {}
Expand All @@ -196,6 +211,40 @@ class DockerExecutorProcess : public ProtobufProcess<DockerExecutorProcess>

void error(ExecutorDriver* driver, const string& message) {}

protected:
virtual void initialize()
{
install<TaskHealthStatus>(
&Self::taskHealthUpdated,
&TaskHealthStatus::task_id,
&TaskHealthStatus::healthy,
&TaskHealthStatus::kill_task);
}

void taskHealthUpdated(
const TaskID& taskID,
const bool& healthy,
const bool& initiateTaskKill)
{
if (driver.isNone()) {
return;
}

cout << "Received task health update, healthy: "
<< stringify(healthy) << endl;

TaskStatus status;
status.mutable_task_id()->CopyFrom(taskID);
status.set_healthy(healthy);
status.set_state(TASK_RUNNING);
driver.get()->sendStatusUpdate(status);

if (initiateTaskKill) {
killedByHealthCheck = true;
killTask(driver.get(), taskID);
}
}

private:
void reaped(
ExecutorDriver* _driver,
Expand Down Expand Up @@ -233,6 +282,9 @@ class DockerExecutorProcess : public ProtobufProcess<DockerExecutorProcess>
taskStatus.mutable_task_id()->CopyFrom(taskId);
taskStatus.set_state(state);
taskStatus.set_message(message);
if (killed && killedByHealthCheck) {
taskStatus.set_healthy(false);
}

driver.get()->sendStatusUpdate(taskStatus);

Expand All @@ -247,7 +299,104 @@ class DockerExecutorProcess : public ProtobufProcess<DockerExecutorProcess>
}));
}

void launchHealthCheck(const string& containerName, const TaskInfo& task)
{
if (!killed && task.has_health_check()) {
HealthCheck healthCheck = task.health_check();

// Wrap the original health check command in "docker exec".
if (healthCheck.has_command()) {
CommandInfo command = healthCheck.command();

// "docker exec" require docker version greater than 1.3.0.
Try<Nothing> validateVersion =
docker->validateVersion(Version(1, 3, 0));
if (validateVersion.isError()) {
cerr << "Unable to launch health process: "
<< validateVersion.error() << endl;
return;
}

vector<string> argv;
argv.push_back(docker->getPath());
argv.push_back("exec");
argv.push_back(containerName);

if (command.shell()) {
if (!command.has_value()) {
cerr << "Unable to launch health process: "
<< "Shell command is not specified." << endl;
return;
}

argv.push_back("sh");
argv.push_back("-c");
argv.push_back("\"");
argv.push_back(command.value());
argv.push_back("\"");
} else {
if (!command.has_value()) {
cerr << "Unable to launch health process: "
<< "Executable path is not specified." << endl;
return;
}

argv.push_back(command.value());
foreach (const string& argument, command.arguments()) {
argv.push_back(argument);
}
}

command.set_shell(true);
command.clear_arguments();
command.set_value(strings::join(" ", argv));
healthCheck.mutable_command()->CopyFrom(command);
} else {
cerr << "Unable to launch health process: "
<< "Only command health check is supported now." << endl;
return;
}

JSON::Object json = JSON::Protobuf(healthCheck);

// Launch the subprocess using 'exec' style so that quotes can
// be properly handled.
vector<string> argv;
string path = path::join(healthCheckDir, "mesos-health-check");
argv.push_back(path);
argv.push_back("--executor=" + stringify(self()));
argv.push_back("--health_check_json=" + stringify(json));
argv.push_back("--task_id=" + task.task_id().value());

string cmd = strings::join(" ", argv);
cout << "Launching health check process: " << cmd << endl;

Try<Subprocess> healthProcess =
process::subprocess(
path,
argv,
// Intentionally not sending STDIN to avoid health check
// commands that expect STDIN input to block.
Subprocess::PATH("/dev/null"),
Subprocess::FD(STDOUT_FILENO),
Subprocess::FD(STDERR_FILENO));

if (healthProcess.isError()) {
cerr << "Unable to launch health process: "
<< healthProcess.error() << endl;
} else {
healthPid = healthProcess.get().pid();

cout << "Health check process launched at pid: "
<< stringify(healthPid) << endl;
}
}
}

bool killed;
bool killedByHealthCheck;
pid_t healthPid;
string healthCheckDir;
Owned<Docker> docker;
string containerName;
string sandboxDirectory;
Expand All @@ -268,14 +417,16 @@ class DockerExecutor : public Executor
const string& container,
const string& sandboxDirectory,
const string& mappedDirectory,
const Duration& stopTimeout)
const Duration& stopTimeout,
const string& healthCheckDir)
{
process = Owned<DockerExecutorProcess>(new DockerExecutorProcess(
docker,
container,
sandboxDirectory,
mappedDirectory,
stopTimeout));
stopTimeout,
healthCheckDir));

spawn(process.get());
}
Expand Down Expand Up @@ -412,12 +563,18 @@ int main(int argc, char** argv)
return EXIT_FAILURE;
}

const Option<string> envPath = os::getenv("MESOS_LAUNCHER_DIR");
string path =
envPath.isSome() ? envPath.get()
: os::realpath(Path(argv[0]).dirname()).get();

mesos::internal::docker::DockerExecutor executor(
process::Owned<Docker>(docker.get()),
flags.container.get(),
flags.sandbox_directory.get(),
flags.mapped_directory.get(),
flags.stop_timeout.get());
flags.stop_timeout.get(),
path);

mesos::MesosExecutorDriver driver(&executor);
return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
Expand Down
16 changes: 4 additions & 12 deletions src/slave/containerizer/docker.cpp
Expand Up @@ -129,18 +129,10 @@ Try<DockerContainerizer*> DockerContainerizer::create(
Shared<Docker> docker(create.get());

if (flags.docker_mesos_image.isSome()) {
Future<Version> version = docker->version();
if (!version.await(DOCKER_VERSION_WAIT_TIMEOUT)) {
return Error("Timed out waiting for docker version");
}

if (version.isFailed()) {
return Error(version.failure());
}

if (version.get() < Version(1, 5, 0)) {
string message = "Docker with mesos images requires docker 1.5+, found ";
message += stringify(version.get());
Try<Nothing> validateResult = docker->validateVersion(Version(1, 5, 0));
if (validateResult.isError()) {
string message = "Docker with mesos images requires docker 1.5+";
message += validateResult.error();
return Error(message);
}
}
Expand Down

0 comments on commit 09e367c

Please sign in to comment.