diff --git a/CMakeLists.txt b/CMakeLists.txt index 3d03e8029..fdd9dcef7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -114,6 +114,7 @@ ADD_LIBRARY(cocaine-core SHARED src/service/node.cpp src/service/node/actor.cpp src/service/node/app.cpp + src/service/node/app/stats.cpp src/service/node/balancing/load.cpp src/service/node/dispatch/client.cpp src/service/node/dispatch/worker.cpp @@ -124,13 +125,12 @@ ADD_LIBRARY(cocaine-core SHARED src/service/node/slave/error.cpp src/service/node/slave/fetcher.cpp src/service/node/slave/state/active.cpp - src/service/node/slave/state/broken.cpp src/service/node/slave/state/handshaking.cpp + src/service/node/slave/state/sealing.cpp src/service/node/slave/state/spawning.cpp src/service/node/slave/state/state.cpp - src/service/node/slave/state/sealing.cpp + src/service/node/slave/state/stopped.cpp src/service/node/slave/state/terminating.cpp - src/service/node/app/stats.cpp src/service/node/manifest.cpp src/service/node/profile.cpp diff --git a/debian/changelog b/debian/changelog index 8deeb97fa..623f09362 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +cocaine-core (0.12.2.1) unstable; urgency=low + + * Bug fix: collecting zombie processes during overseers destruction. + + -- Evgeny Safronov Wed, 08 Jul 2015 15:20:37 +0300 + cocaine-core (0.12.2.0) unstable; urgency=low * Optional HPACK like headers extension. Mainly aims to ease internal protocol evolution. diff --git a/include/cocaine/api/isolate.hpp b/include/cocaine/api/isolate.hpp index baefcfb71..965d8b8ea 100644 --- a/include/cocaine/api/isolate.hpp +++ b/include/cocaine/api/isolate.hpp @@ -28,6 +28,8 @@ #include +#include + namespace cocaine { namespace api { struct handle_t { @@ -97,7 +99,7 @@ struct isolate_t { spawn(const std::string& path, const string_map_t& args, const string_map_t& environment) = 0; protected: - isolate_t(context_t&, const std::string& /* name */, const dynamic_t& /* args */) { + isolate_t(context_t&, asio::io_service&, const std::string& /* name */, const dynamic_t& /* args */) { // Empty. } }; @@ -109,21 +111,21 @@ struct category_traits { struct factory_type: public basic_factory { virtual ptr_type - get(context_t& context, const std::string& name, const dynamic_t& args) = 0; + get(context_t& context, asio::io_service& io_context, const std::string& name, const dynamic_t& args) = 0; }; template struct default_factory: public factory_type { virtual ptr_type - get(context_t& context, const std::string& name, const dynamic_t& args) { + get(context_t& context, asio::io_service& io_context, const std::string& name, const dynamic_t& args) { ptr_type instance; instances.apply([&](std::map>& instances) { auto weak_ptr = instances[name]; if((instance = weak_ptr.lock()) == nullptr) { - instance = std::make_shared(context, name, args); + instance = std::make_shared(context, io_context, name, args); instances[name] = instance; } }); diff --git a/include/cocaine/detail/isolate/process.hpp b/include/cocaine/detail/isolate/process.hpp index 039b68dbe..09f82c504 100644 --- a/include/cocaine/detail/isolate/process.hpp +++ b/include/cocaine/detail/isolate/process.hpp @@ -35,18 +35,20 @@ class process_t: public api::isolate_t { context_t& m_context; + asio::io_service& io_context; const std::unique_ptr m_log; const std::string m_name; const boost::filesystem::path m_working_directory; + const uint m_kill_timeout; #ifdef COCAINE_ALLOW_CGROUPS cgroup* m_cgroup; #endif public: - process_t(context_t& context, const std::string& name, const dynamic_t& args); + process_t(context_t& context, asio::io_service& io_context, const std::string& name, const dynamic_t& args); virtual ~process_t(); diff --git a/include/cocaine/detail/service/node/slave.hpp b/include/cocaine/detail/service/node/slave.hpp index 5f8c7b44a..3b4cd0391 100644 --- a/include/cocaine/detail/service/node/slave.hpp +++ b/include/cocaine/detail/service/node/slave.hpp @@ -32,7 +32,7 @@ namespace cocaine { class client_rpc_dispatch_t; class active_t; -class broken_t; +class stopped_t; class channel_t; class handshaking_t; class spawning_t; @@ -110,7 +110,7 @@ class state_machine_t: public std::enable_shared_from_this { friend class active_t; - friend class broken_t; + friend class stopped_t; friend class handshaking_t; friend class spawning_t; friend class terminating_t; diff --git a/include/cocaine/detail/service/node/slave/state/spawning.hpp b/include/cocaine/detail/service/node/slave/state/spawning.hpp index 86ccbeb73..1b1f40bf2 100644 --- a/include/cocaine/detail/service/node/slave/state/spawning.hpp +++ b/include/cocaine/detail/service/node/slave/state/spawning.hpp @@ -17,6 +17,7 @@ class spawning_t: std::shared_ptr slave; asio::deadline_timer timer; + std::unique_ptr handle; public: explicit @@ -39,7 +40,7 @@ class spawning_t: private: void - on_spawn(std::unique_ptr& handle, std::chrono::high_resolution_clock::time_point start); + on_spawn(std::chrono::high_resolution_clock::time_point start); void on_timeout(const std::error_code& ec); diff --git a/include/cocaine/detail/service/node/slave/state/broken.hpp b/include/cocaine/detail/service/node/slave/state/stopped.hpp similarity index 82% rename from include/cocaine/detail/service/node/slave/state/broken.hpp rename to include/cocaine/detail/service/node/slave/state/stopped.hpp index ec49c40cc..ad2dd3812 100644 --- a/include/cocaine/detail/service/node/slave/state/broken.hpp +++ b/include/cocaine/detail/service/node/slave/state/stopped.hpp @@ -4,14 +4,14 @@ namespace cocaine { -class broken_t: +class stopped_t: public state_t { std::error_code ec; public: explicit - broken_t(std::error_code ec); + stopped_t(std::error_code ec); virtual void diff --git a/src/isolate/process.cpp b/src/isolate/process.cpp index ba9c93eb3..44de53d26 100644 --- a/src/isolate/process.cpp +++ b/src/isolate/process.cpp @@ -38,6 +38,8 @@ #include +namespace ph = std::placeholders; + using namespace cocaine; using namespace cocaine::isolate; @@ -45,26 +47,203 @@ namespace fs = boost::filesystem; namespace { +class process_terminator_t: + public std::enable_shared_from_this +{ +public: + const std::unique_ptr log; + +private: + pid_t pid; + + struct { + uint kill; + uint gc; + } timeout; + + asio::deadline_timer timer; + +public: + process_terminator_t(pid_t pid_, + uint kill_timeout, + std::unique_ptr log_, + asio::io_service& loop): + log(std::move(log_)), + pid(pid_), + timer(loop) + { + timeout.kill = kill_timeout; + timeout.gc = 5; + } + + ~process_terminator_t() { + COCAINE_LOG_TRACE(log, "process terminator is destroying"); + + if (pid) { + int status = 0; + + switch (::waitpid(pid, &status, WNOHANG)) { + case -1: { + // Some error occurred, check errno. + const int ec = errno; + + COCAINE_LOG_WARNING(log, "unable to properly collect the child: %d", ec); + } + break; + case 0: + // The child is not finished yet, kill it and collect in a blocking way as as last + // resort to prevent zombies. + if (::kill(pid, SIGKILL) == 0) { + if (::waitpid(pid, &status, 0) > 0) { + COCAINE_LOG_TRACE(log, "child has been killed: %d", status); + } else { + const int ec = errno; + + COCAINE_LOG_WARNING(log, "unable to properly collect the child: %d", ec); + } + } else { + // Unable to kill for some reasons, check errno. + const int ec = errno; + + COCAINE_LOG_WARNING(log, "unable to send kill signal to the child: %d", ec); + } + break; + default: + COCAINE_LOG_TRACE(log, "child has been collected: %d", status); + } + } + } + + void + start() { + int status = 0; + + // Attempt to collect the child non-blocking way. + switch (::waitpid(pid, &status, WNOHANG)) { + case -1: { + const int ec = errno; + + COCAINE_LOG_WARNING(log, "unable to collect the child: %d", ec); + break; + } + case 0: { + // The child is not finished yet, send SIGTERM and try to collect it later after. + COCAINE_LOG_TRACE(log, "unable to terminate child right now (not ready), sending SIGTERM")( + "timeout", timeout.kill + ); + + // Ignore return code here. + ::kill(pid, SIGTERM); + + timer.expires_from_now(boost::posix_time::seconds(timeout.kill)); + timer.async_wait(std::bind(&process_terminator_t::on_kill_timer, shared_from_this(), ph::_1)); + break; + } + default: + COCAINE_LOG_TRACE(log, "child has been stopped: %d", status); + + pid = 0; + } + } + +private: + void + on_kill_timer(const std::error_code& ec) { + if(ec == asio::error::operation_aborted) { + COCAINE_LOG_TRACE(log, "process kill timer has called its completion handler: cancelled"); + return; + } else { + COCAINE_LOG_TRACE(log, "process kill timer has called its completion handler"); + } + + int status = 0; + + switch (::waitpid(pid, &status, WNOHANG)) { + case -1: { + const int ec = errno; + + COCAINE_LOG_WARNING(log, "unable to collect the child: %d", ec); + break; + } + case 0: { + COCAINE_LOG_TRACE(log, "killing the child, resuming after 5 sec"); + + // Ignore return code here too. + ::kill(pid, SIGKILL); + + timer.expires_from_now(boost::posix_time::seconds(timeout.gc)); + timer.async_wait(std::bind(&process_terminator_t::on_gc_action, shared_from_this(), ph::_1)); + break; + } + default: + COCAINE_LOG_TRACE(log, "child has been terminated: %d", status); + + pid = 0; + } + } + + void + on_gc_action(const std::error_code& ec) { + if(ec == asio::error::operation_aborted) { + COCAINE_LOG_TRACE(log, "process GC timer has called its completion handler: cancelled"); + return; + } else { + COCAINE_LOG_TRACE(log, "process GC timer has called its completion handler"); + } + + int status = 0; + + switch (::waitpid(pid, &status, WNOHANG)) { + case -1: { + const int ec = errno; + + COCAINE_LOG_WARNING(log, "unable to collect the child: %d", ec); + break; + } + case 0: { + COCAINE_LOG_TRACE(log, "child has not been killed, resuming after 5 sec"); + + timer.expires_from_now(boost::posix_time::seconds(timeout.gc)); + timer.async_wait(std::bind(&process_terminator_t::on_gc_action, shared_from_this(), ph::_1)); + break; + } + default: + COCAINE_LOG_TRACE(log, "child has been killed: %d", status); + + pid = 0; + } + } +}; + struct process_handle_t: public api::handle_t { - process_handle_t(pid_t pid, int stdout): - m_pid(pid), +private: + std::shared_ptr terminator; + + const int m_stdout; + +public: + process_handle_t(pid_t pid, + int stdout, + uint kill_timeout, + std::unique_ptr log, + asio::io_service& loop): + terminator(std::make_shared(pid, kill_timeout, std::move(log), loop)), m_stdout(stdout) - { } + { + COCAINE_LOG_TRACE(terminator->log, "process handle has been created"); + } ~process_handle_t() { terminate(); + COCAINE_LOG_TRACE(terminator->log, "process handle has been destroyed"); } virtual void terminate() { - int status = 0; - - if(::waitpid(m_pid, &status, WNOHANG) == 0) { - ::kill(m_pid, SIGTERM); - } + terminator->start(); ::close(m_stdout); } @@ -74,10 +253,6 @@ struct process_handle_t: stdout() const { return m_stdout; } - -private: - const pid_t m_pid; - const int m_stdout; }; #ifdef COCAINE_ALLOW_CGROUPS @@ -154,12 +329,14 @@ cgroup_category() -> const std::error_category& { }} // namespace cocaine::error #endif -process_t::process_t(context_t& context, const std::string& name, const dynamic_t& args): - category_type(context, name, args), +process_t::process_t(context_t& context, asio::io_service& io_context, const std::string& name, const dynamic_t& args): + category_type(context, io_context, name, args), m_context(context), + io_context(io_context), m_log(context.log(name)), m_name(name), - m_working_directory(fs::path(args.as_object().at("spool", "/var/spool/cocaine").as_string()) / name) + m_working_directory(fs::path(args.as_object().at("spool", "/var/spool/cocaine").as_string()) / name), + m_kill_timeout(args.as_object().at("kill_timeout", 30u).as_uint()) { #ifdef COCAINE_ALLOW_CGROUPS int rv = 0; @@ -245,7 +422,13 @@ process_t::spawn(const std::string& path, const api::string_map_t& args, const a ::close(pipes[pid > 0]); if(pid > 0) { - return std::make_unique(pid, pipes[0]); + return std::make_unique( + pid, + pipes[0], + m_kill_timeout, + m_context.log(format("%s/process", m_name), {{ "pid", blackhole::attribute::value_t(pid) }}), + io_context + ); } // Child initialization diff --git a/src/runtime/runtime.cpp b/src/runtime/runtime.cpp index ced0f7c27..5e19b38d4 100644 --- a/src/runtime/runtime.cpp +++ b/src/runtime/runtime.cpp @@ -83,23 +83,9 @@ stacktrace(int signum, siginfo_t* COCAINE_UNUSED_(info), void* context) { std::_Exit(EXIT_FAILURE); } -asio::io_service* loop; - -void -terminate(int signum) { - static const std::map description = { - { SIGINT, "SIGINT" }, - { SIGQUIT, "SIGQUIT" }, - { SIGTERM, "SIGTERM" } - }; - - std::cout << "[Runtime] Caught " << description.at(signum) << ", exiting." << std::endl; - - loop->stop(); -} - struct runtime_t { - runtime_t(): + runtime_t(const logging::logger_t& log_): + log(log_), m_signals(m_asio, SIGINT, SIGTERM, SIGQUIT) { m_signals.async_wait(std::bind(&runtime_t::on_signal, this, ph::_1, ph::_2)); @@ -113,15 +99,14 @@ struct runtime_t { m_alt_stack.ss_flags = 0; if(::sigaltstack(&m_alt_stack, nullptr) != 0) { - std::cerr << "ERROR: Unable to activate an alternative signal stack" << std::endl; + COCAINE_LOG_ERROR(log, "unable to activate an alternative signal stack"); } // Reroute the core-generating signals. - struct sigaction action, termaction; + struct sigaction action; std::memset(&action, 0, sizeof(action)); - std::memset(&termaction, 0, sizeof(termaction)); action.sa_sigaction = &stacktrace; action.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO; @@ -130,13 +115,6 @@ struct runtime_t { ::sigaction(SIGBUS, &action, nullptr); ::sigaction(SIGSEGV, &action, nullptr); - termaction.sa_handler = terminate; - termaction.sa_flags = 0; - - ::sigaction(SIGINT, &termaction, nullptr); - ::sigaction(SIGTERM, &termaction, nullptr); - ::sigaction(SIGQUIT, &termaction, nullptr); - // Block the deprecated signals. sigset_t sigset; @@ -145,15 +123,13 @@ struct runtime_t { sigaddset(&sigset, SIGPIPE); ::sigprocmask(SIG_BLOCK, &sigset, nullptr); - - loop = &m_asio; } ~runtime_t() { m_alt_stack.ss_flags = SS_DISABLE; if(::sigaltstack(&m_alt_stack, nullptr) != 0) { - std::cerr << "ERROR: Unable to deactivate an alternative signal stack" << std::endl; + COCAINE_LOG_ERROR(log, "unable to deactivate an alternative signal stack"); } delete[] static_cast(m_alt_stack.ss_sp); @@ -174,18 +150,20 @@ struct runtime_t { return; } - static const std::map signals = { + static const std::map description = { { SIGINT, "SIGINT" }, { SIGQUIT, "SIGQUIT" }, { SIGTERM, "SIGTERM" } }; - std::cout << "[Runtime] Caught " << signals.at(signum) << ", exiting." << std::endl; + COCAINE_LOG_INFO(log, "caught %s, exitting", description.at(signum)); m_asio.stop(); } private: + const logging::logger_t& log; + asio::io_service m_asio; asio::signal_set m_signals; @@ -311,5 +289,5 @@ main(int argc, char* argv[]) { return EXIT_FAILURE; } - return runtime_t().run(); + return runtime_t(*logger).run(); } diff --git a/src/service/node/app.cpp b/src/service/node/app.cpp index a225f07ae..bb4f4ea90 100644 --- a/src/service/node/app.cpp +++ b/src/service/node/app.cpp @@ -164,10 +164,11 @@ class spooling_t: public: template - spooling_t(context_t& context, const manifest_t& manifest, const profile_t& profile, F cb) { + spooling_t(context_t& context, asio::io_service& loop, const manifest_t& manifest, const profile_t& profile, F cb) { isolate = context.get( profile.isolate.type, context, + loop, manifest.name, profile.isolate.args ); @@ -330,6 +331,7 @@ class cocaine::service::node::app_state_t: COCAINE_LOG_TRACE(log, "app is spooling"); state.reset(new state::spooling_t( context, + *loop, manifest(), profile, std::bind(&app_state_t::on_spool, shared_from_this(), ph::_1, ph::_2) diff --git a/src/service/node/slave.cpp b/src/service/node/slave.cpp index 3985161ae..a63f4090f 100644 --- a/src/service/node/slave.cpp +++ b/src/service/node/slave.cpp @@ -11,7 +11,7 @@ #include "cocaine/detail/service/node/slave/control.hpp" #include "cocaine/detail/service/node/slave/fetcher.hpp" #include "cocaine/detail/service/node/slave/state/active.hpp" -#include "cocaine/detail/service/node/slave/state/broken.hpp" +#include "cocaine/detail/service/node/slave/state/stopped.hpp" #include "cocaine/detail/service/node/slave/state/handshaking.hpp" #include "cocaine/detail/service/node/slave/state/spawning.hpp" #include "cocaine/detail/service/node/slave/state/state.hpp" @@ -223,7 +223,7 @@ state_machine_t::shutdown(std::error_code ec) { auto state = *this->state.synchronize(); state->cancel(); - migrate(std::make_shared(ec)); + migrate(std::make_shared(ec)); fetcher->close(); fetcher.reset(); diff --git a/src/service/node/slave/state/spawning.cpp b/src/service/node/slave/state/spawning.cpp index b58cac515..1aef53476 100644 --- a/src/service/node/slave/state/spawning.cpp +++ b/src/service/node/slave/state/spawning.cpp @@ -74,13 +74,14 @@ spawning_t::spawn(unsigned long timeout) { auto isolate = slave->context.context.get( slave->context.profile.isolate.type, slave->context.context, + slave->loop, slave->context.manifest.name, slave->context.profile.isolate.args ); COCAINE_LOG_TRACE(slave->log, "spawning"); - auto handle = isolate->spawn( + handle = isolate->spawn( slave->context.manifest.executable, args, slave->context.manifest.environment @@ -91,7 +92,7 @@ spawning_t::spawn(unsigned long timeout) { // NOTE: The callback must be called from the event loop thread, otherwise the behavior // is undefined. slave->loop.post(detail::move_handler(std::bind( - &spawning_t::on_spawn, shared_from_this(), std::move(handle), std::chrono::high_resolution_clock::now() + &spawning_t::on_spawn, shared_from_this(), std::chrono::high_resolution_clock::now() ))); timer.expires_from_now(boost::posix_time::milliseconds(timeout)); @@ -106,7 +107,7 @@ spawning_t::spawn(unsigned long timeout) { } void -spawning_t::on_spawn(std::unique_ptr& handle, std::chrono::high_resolution_clock::time_point start) { +spawning_t::on_spawn(std::chrono::high_resolution_clock::time_point start) { std::error_code ec; const size_t cancelled = timer.cancel(ec); if (ec || cancelled == 0) { diff --git a/src/service/node/slave/state/broken.cpp b/src/service/node/slave/state/stopped.cpp similarity index 62% rename from src/service/node/slave/state/broken.cpp rename to src/service/node/slave/state/stopped.cpp index 683f589ae..8da8ad16c 100644 --- a/src/service/node/slave/state/broken.cpp +++ b/src/service/node/slave/state/stopped.cpp @@ -1,18 +1,18 @@ -#include "cocaine/detail/service/node/slave/state/broken.hpp" +#include "cocaine/detail/service/node/slave/state/stopped.hpp" #include "cocaine/detail/service/node/slave/error.hpp" using namespace cocaine; -broken_t::broken_t(std::error_code ec): +stopped_t::stopped_t(std::error_code ec): ec(std::move(ec)) {} void -broken_t::cancel() {} +stopped_t::cancel() {} const char* -broken_t::name() const noexcept { +stopped_t::name() const noexcept { switch (ec.value()) { case 0: case error::committed_suicide: