Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spawn Configuration::Concurrency process managers #8269

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
105 changes: 67 additions & 38 deletions lib/base/process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "base/process.hpp"
#include "base/exception.hpp"
#include "base/convert.hpp"
#include "base/configuration.hpp"
#include "base/array.hpp"
#include "base/objectlock.hpp"
#include "base/utility.hpp"
Expand All @@ -11,8 +12,10 @@
#include "base/utility.hpp"
#include "base/scriptglobal.hpp"
#include "base/json.hpp"
#include <algorithm>
#include <boost/algorithm/string/join.hpp>
#include <boost/thread/once.hpp>
#include <cstddef>
#include <thread>
#include <iostream>

Expand All @@ -32,6 +35,21 @@ extern char **environ;
using namespace icinga;

#define IOTHREADS 4
#define MySpawner l_ProcessControl.Spawners[decltype(l_ProcessControl.Len)(this) / sizeof(void*) % l_ProcessControl.Len]

struct Spawner
{
std::mutex Mutex;
int FD = -1;
pid_t PID = -1;

void StartSpawnProcessHelper();
void ProcessHandler();
pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3]);
int ProcessKill(pid_t pid, int signum);
int ProcessWaitPID(pid_t pid, int *status);
Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request);
};

static std::mutex l_ProcessMutex[IOTHREADS];
static std::map<Process::ProcessHandle, Process::Ptr> l_Processes[IOTHREADS];
Expand All @@ -41,9 +59,10 @@ static HANDLE l_Events[IOTHREADS];
static int l_EventFDs[IOTHREADS][2];
static std::map<Process::ConsoleHandle, Process::ProcessHandle> l_FDs[IOTHREADS];

static std::mutex l_ProcessControlMutex;
static int l_ProcessControlFD = -1;
static pid_t l_ProcessControlPID;
static struct {
Spawner* Spawners = nullptr;
size_t Len = 0;
} l_ProcessControl;
#endif /* _WIN32 */
static boost::once_flag l_ProcessOnceFlag = BOOST_ONCE_INIT;
static boost::once_flag l_SpawnHelperOnceFlag = BOOST_ONCE_INIT;
Expand Down Expand Up @@ -71,7 +90,7 @@ Process::~Process()
}

#ifndef _WIN32
static Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request)
Value Spawner::ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request)
{
struct cmsghdr *cmsg = CMSG_FIRSTHDR(msgh);

Expand Down Expand Up @@ -146,7 +165,7 @@ static Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& reques
if (pid == 0) {
// child process

(void)close(l_ProcessControlFD);
(void)close(FD);

if (setsid() < 0) {
perror("setsid() failed");
Expand Down Expand Up @@ -241,13 +260,13 @@ static Value ProcessWaitPIDImpl(struct msghdr *msgh, const Dictionary::Ptr& requ
return response;
}

static void ProcessHandler()
void Spawner::ProcessHandler()
{
sigset_t mask;
sigfillset(&mask);
sigprocmask(SIG_SETMASK, &mask, nullptr);

Utility::CloseAllFDs({0, 1, 2, l_ProcessControlFD});
Utility::CloseAllFDs({0, 1, 2, FD});

for (;;) {
size_t length;
Expand All @@ -266,7 +285,7 @@ static void ProcessHandler()
msg.msg_control = cbuf;
msg.msg_controllen = sizeof(cbuf);

int rc = recvmsg(l_ProcessControlFD, &msg, 0);
int rc = recvmsg(FD, &msg, 0);

if (rc <= 0) {
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
Expand All @@ -279,7 +298,7 @@ static void ProcessHandler()

size_t count = 0;
while (count < length) {
rc = recv(l_ProcessControlFD, mbuf + count, length - count, 0);
rc = recv(FD, mbuf + count, length - count, 0);

if (rc <= 0) {
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
Expand Down Expand Up @@ -317,7 +336,7 @@ static void ProcessHandler()

String jresponse = JsonEncode(response);

if (send(l_ProcessControlFD, jresponse.CStr(), jresponse.GetLength(), 0) < 0) {
if (send(FD, jresponse.CStr(), jresponse.GetLength(), 0) < 0) {
BOOST_THROW_EXCEPTION(posix_error()
<< boost::errinfo_api_function("send")
<< boost::errinfo_errno(errno));
Expand All @@ -327,13 +346,13 @@ static void ProcessHandler()
_exit(0);
}

static void StartSpawnProcessHelper()
void Spawner::StartSpawnProcessHelper()
{
if (l_ProcessControlFD != -1) {
(void)close(l_ProcessControlFD);
if (FD != -1) {
(void)close(FD);

int status;
(void)waitpid(l_ProcessControlPID, &status, 0);
(void)waitpid(PID, &status, 0);
}

int controlFDs[2];
Expand All @@ -354,7 +373,7 @@ static void StartSpawnProcessHelper()
if (pid == 0) {
(void)close(controlFDs[1]);

l_ProcessControlFD = controlFDs[0];
FD = controlFDs[0];

ProcessHandler();

Expand All @@ -363,11 +382,11 @@ static void StartSpawnProcessHelper()

(void)close(controlFDs[0]);

l_ProcessControlFD = controlFDs[1];
l_ProcessControlPID = pid;
FD = controlFDs[1];
PID = pid;
}

static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3])
pid_t Spawner::ProcessSpawn(const std::vector<String>& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3])
{
Dictionary::Ptr request = new Dictionary({
{ "command", "spawn" },
Expand All @@ -379,7 +398,7 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
String jrequest = JsonEncode(request);
size_t length = jrequest.GetLength();

std::unique_lock<std::mutex> lock(l_ProcessControlMutex);
std::unique_lock<std::mutex> lock(Mutex);

struct msghdr msg;
memset(&msg, 0, sizeof(msg));
Expand All @@ -405,14 +424,14 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
msg.msg_controllen = cmsg->cmsg_len;

do {
while (sendmsg(l_ProcessControlFD, &msg, 0) < 0) {
while (sendmsg(FD, &msg, 0) < 0) {
StartSpawnProcessHelper();
}
} while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);
} while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);

char buf[4096];

ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0);
ssize_t rc = recv(FD, buf, sizeof(buf), 0);

if (rc <= 0)
return -1;
Expand All @@ -427,7 +446,7 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
return response->Get("rc");
}

static int ProcessKill(pid_t pid, int signum)
int Spawner::ProcessKill(pid_t pid, int signum)
{
Dictionary::Ptr request = new Dictionary({
{ "command", "kill" },
Expand All @@ -438,17 +457,17 @@ static int ProcessKill(pid_t pid, int signum)
String jrequest = JsonEncode(request);
size_t length = jrequest.GetLength();

std::unique_lock<std::mutex> lock(l_ProcessControlMutex);
std::unique_lock<std::mutex> lock(Mutex);

do {
while (send(l_ProcessControlFD, &length, sizeof(length), 0) < 0) {
while (send(FD, &length, sizeof(length), 0) < 0) {
StartSpawnProcessHelper();
}
} while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);
} while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);

char buf[4096];

ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0);
ssize_t rc = recv(FD, buf, sizeof(buf), 0);

if (rc <= 0)
return -1;
Expand All @@ -459,7 +478,7 @@ static int ProcessKill(pid_t pid, int signum)
return response->Get("errno");
}

static int ProcessWaitPID(pid_t pid, int *status)
int Spawner::ProcessWaitPID(pid_t pid, int *status)
{
Dictionary::Ptr request = new Dictionary({
{ "command", "waitpid" },
Expand All @@ -469,17 +488,17 @@ static int ProcessWaitPID(pid_t pid, int *status)
String jrequest = JsonEncode(request);
size_t length = jrequest.GetLength();

std::unique_lock<std::mutex> lock(l_ProcessControlMutex);
std::unique_lock<std::mutex> lock(Mutex);

do {
while (send(l_ProcessControlFD, &length, sizeof(length), 0) < 0) {
while (send(FD, &length, sizeof(length), 0) < 0) {
StartSpawnProcessHelper();
}
} while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);
} while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);

char buf[4096];

ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0);
ssize_t rc = recv(FD, buf, sizeof(buf), 0);

if (rc <= 0)
return -1;
Expand All @@ -493,8 +512,18 @@ static int ProcessWaitPID(pid_t pid, int *status)

void Process::InitializeSpawnHelper()
{
if (l_ProcessControlFD == -1)
StartSpawnProcessHelper();
if (!l_ProcessControl.Spawners) {
auto len (std::max(1, Configuration::Concurrency));

l_ProcessControl.Spawners = new Spawner[len];
l_ProcessControl.Len = len;
}

for (Spawner *current = l_ProcessControl.Spawners, *stop = l_ProcessControl.Spawners + l_ProcessControl.Len; current < stop; ++current) {
if (current->FD == -1) {
current->StartSpawnProcessHelper();
}
}
}
#endif /* _WIN32 */

Expand Down Expand Up @@ -980,7 +1009,7 @@ void Process::Run(const std::function<void(const ProcessResult&)>& callback)
fds[1] = outfds[1];
fds[2] = outfds[1];

m_Process = ProcessSpawn(m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds);
m_Process = MySpawner.ProcessSpawn(m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds);
m_PID = m_Process;

if (m_PID == -1) {
Expand Down Expand Up @@ -1046,7 +1075,7 @@ bool Process::DoEvents()

m_OutputStream << "<Timeout exceeded.>";

int error = ProcessKill(m_Process, SIGTERM);
int error = MySpawner.ProcessKill(m_Process, SIGTERM);
if (error) {
Log(LogWarning, "Process")
<< "Couldn't terminate the process " << m_PID << " (" << PrettyPrintArguments(m_Arguments)
Expand All @@ -1070,7 +1099,7 @@ bool Process::DoEvents()
m_OutputStream << "<Timeout exceeded.>";
TerminateProcess(m_Process, 3);
#else /* _WIN32 */
int error = ProcessKill(-m_Process, SIGKILL);
int error = MySpawner.ProcessKill(-m_Process, SIGKILL);
if (error) {
Log(LogWarning, "Process")
<< "Couldn't kill the process group " << m_PID << " (" << PrettyPrintArguments(m_Arguments)
Expand Down Expand Up @@ -1124,7 +1153,7 @@ bool Process::DoEvents()
int status, exitcode;
if (could_not_kill || m_PID == -1) {
exitcode = 128;
} else if (ProcessWaitPID(m_Process, &status) != m_Process) {
} else if (MySpawner.ProcessWaitPID(m_Process, &status) != m_Process) {
exitcode = 128;

Log(LogWarning, "Process")
Expand Down