Skip to content

Commit

Permalink
Spawn Configuration::Concurrency process managers
Browse files Browse the repository at this point in the history
... and not just one to increase checks/time.
  • Loading branch information
Al2Klimov committed Aug 16, 2023
1 parent 0d58029 commit 8a46574
Showing 1 changed file with 67 additions and 38 deletions.
105 changes: 67 additions & 38 deletions lib/base/process.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "base/process.hpp"
#include "base/exception.hpp"
#include "base/convert.hpp"
#include "base/configuration.hpp"
#include "base/array.hpp"
#include "base/objectlock.hpp"
#include "base/utility.hpp"
Expand All @@ -11,8 +12,10 @@
#include "base/utility.hpp"
#include "base/scriptglobal.hpp"
#include "base/json.hpp"
#include <algorithm>
#include <boost/algorithm/string/join.hpp>
#include <boost/thread/once.hpp>
#include <cstddef>
#include <thread>
#include <iostream>

Expand All @@ -32,6 +35,21 @@ extern char **environ;
using namespace icinga;

#define IOTHREADS 4
#define MySpawner l_ProcessControl.Spawners[decltype(l_ProcessControl.Len)(this) / sizeof(void*) % l_ProcessControl.Len]

struct Spawner
{
std::mutex Mutex;
int FD = -1;
pid_t PID = -1;

void StartSpawnProcessHelper();
void ProcessHandler();
pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3]);
int ProcessKill(pid_t pid, int signum);
int ProcessWaitPID(pid_t pid, int *status);
Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request);
};

static std::mutex l_ProcessMutex[IOTHREADS];
static std::map<Process::ProcessHandle, Process::Ptr> l_Processes[IOTHREADS];
Expand All @@ -41,9 +59,10 @@ static HANDLE l_Events[IOTHREADS];
static int l_EventFDs[IOTHREADS][2];
static std::map<Process::ConsoleHandle, Process::ProcessHandle> l_FDs[IOTHREADS];

static std::mutex l_ProcessControlMutex;
static int l_ProcessControlFD = -1;
static pid_t l_ProcessControlPID;
static struct {
Spawner* Spawners = nullptr;
size_t Len = 0;
} l_ProcessControl;
#endif /* _WIN32 */
static boost::once_flag l_ProcessOnceFlag = BOOST_ONCE_INIT;
static boost::once_flag l_SpawnHelperOnceFlag = BOOST_ONCE_INIT;
Expand Down Expand Up @@ -71,7 +90,7 @@ Process::~Process()
}

#ifndef _WIN32
static Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request)
Value Spawner::ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& request)
{
struct cmsghdr *cmsg = CMSG_FIRSTHDR(msgh);

Expand Down Expand Up @@ -146,7 +165,7 @@ static Value ProcessSpawnImpl(struct msghdr *msgh, const Dictionary::Ptr& reques
if (pid == 0) {
// child process

(void)close(l_ProcessControlFD);
(void)close(FD);

if (setsid() < 0) {
perror("setsid() failed");
Expand Down Expand Up @@ -241,13 +260,13 @@ static Value ProcessWaitPIDImpl(struct msghdr *msgh, const Dictionary::Ptr& requ
return response;
}

static void ProcessHandler()
void Spawner::ProcessHandler()
{
sigset_t mask;
sigfillset(&mask);
sigprocmask(SIG_SETMASK, &mask, nullptr);

Utility::CloseAllFDs({0, 1, 2, l_ProcessControlFD});
Utility::CloseAllFDs({0, 1, 2, FD});

for (;;) {
size_t length;
Expand All @@ -266,7 +285,7 @@ static void ProcessHandler()
msg.msg_control = cbuf;
msg.msg_controllen = sizeof(cbuf);

int rc = recvmsg(l_ProcessControlFD, &msg, 0);
int rc = recvmsg(FD, &msg, 0);

if (rc <= 0) {
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
Expand All @@ -279,7 +298,7 @@ static void ProcessHandler()

size_t count = 0;
while (count < length) {
rc = recv(l_ProcessControlFD, mbuf + count, length - count, 0);
rc = recv(FD, mbuf + count, length - count, 0);

if (rc <= 0) {
if (rc < 0 && (errno == EINTR || errno == EAGAIN))
Expand Down Expand Up @@ -317,7 +336,7 @@ static void ProcessHandler()

String jresponse = JsonEncode(response);

if (send(l_ProcessControlFD, jresponse.CStr(), jresponse.GetLength(), 0) < 0) {
if (send(FD, jresponse.CStr(), jresponse.GetLength(), 0) < 0) {
BOOST_THROW_EXCEPTION(posix_error()
<< boost::errinfo_api_function("send")
<< boost::errinfo_errno(errno));
Expand All @@ -327,13 +346,13 @@ static void ProcessHandler()
_exit(0);
}

static void StartSpawnProcessHelper()
void Spawner::StartSpawnProcessHelper()
{
if (l_ProcessControlFD != -1) {
(void)close(l_ProcessControlFD);
if (FD != -1) {
(void)close(FD);

int status;
(void)waitpid(l_ProcessControlPID, &status, 0);
(void)waitpid(PID, &status, 0);
}

int controlFDs[2];
Expand All @@ -354,7 +373,7 @@ static void StartSpawnProcessHelper()
if (pid == 0) {
(void)close(controlFDs[1]);

l_ProcessControlFD = controlFDs[0];
FD = controlFDs[0];

ProcessHandler();

Expand All @@ -363,11 +382,11 @@ static void StartSpawnProcessHelper()

(void)close(controlFDs[0]);

l_ProcessControlFD = controlFDs[1];
l_ProcessControlPID = pid;
FD = controlFDs[1];
PID = pid;
}

static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3])
pid_t Spawner::ProcessSpawn(const std::vector<String>& arguments, const Dictionary::Ptr& extraEnvironment, bool adjustPriority, int fds[3])
{
Dictionary::Ptr request = new Dictionary({
{ "command", "spawn" },
Expand All @@ -379,7 +398,7 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
String jrequest = JsonEncode(request);
size_t length = jrequest.GetLength();

std::unique_lock<std::mutex> lock(l_ProcessControlMutex);
std::unique_lock<std::mutex> lock(Mutex);

struct msghdr msg;
memset(&msg, 0, sizeof(msg));
Expand All @@ -405,14 +424,14 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
msg.msg_controllen = cmsg->cmsg_len;

do {
while (sendmsg(l_ProcessControlFD, &msg, 0) < 0) {
while (sendmsg(FD, &msg, 0) < 0) {
StartSpawnProcessHelper();
}
} while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);
} while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);

char buf[4096];

ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0);
ssize_t rc = recv(FD, buf, sizeof(buf), 0);

if (rc <= 0)
return -1;
Expand All @@ -427,7 +446,7 @@ static pid_t ProcessSpawn(const std::vector<String>& arguments, const Dictionary
return response->Get("rc");
}

static int ProcessKill(pid_t pid, int signum)
int Spawner::ProcessKill(pid_t pid, int signum)
{
Dictionary::Ptr request = new Dictionary({
{ "command", "kill" },
Expand All @@ -438,17 +457,17 @@ static int ProcessKill(pid_t pid, int signum)
String jrequest = JsonEncode(request);
size_t length = jrequest.GetLength();

std::unique_lock<std::mutex> lock(l_ProcessControlMutex);
std::unique_lock<std::mutex> lock(Mutex);

do {
while (send(l_ProcessControlFD, &length, sizeof(length), 0) < 0) {
while (send(FD, &length, sizeof(length), 0) < 0) {
StartSpawnProcessHelper();
}
} while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);
} while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);

char buf[4096];

ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0);
ssize_t rc = recv(FD, buf, sizeof(buf), 0);

if (rc <= 0)
return -1;
Expand All @@ -459,7 +478,7 @@ static int ProcessKill(pid_t pid, int signum)
return response->Get("errno");
}

static int ProcessWaitPID(pid_t pid, int *status)
int Spawner::ProcessWaitPID(pid_t pid, int *status)
{
Dictionary::Ptr request = new Dictionary({
{ "command", "waitpid" },
Expand All @@ -469,17 +488,17 @@ static int ProcessWaitPID(pid_t pid, int *status)
String jrequest = JsonEncode(request);
size_t length = jrequest.GetLength();

std::unique_lock<std::mutex> lock(l_ProcessControlMutex);
std::unique_lock<std::mutex> lock(Mutex);

do {
while (send(l_ProcessControlFD, &length, sizeof(length), 0) < 0) {
while (send(FD, &length, sizeof(length), 0) < 0) {
StartSpawnProcessHelper();
}
} while (send(l_ProcessControlFD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);
} while (send(FD, jrequest.CStr(), jrequest.GetLength(), 0) < 0);

char buf[4096];

ssize_t rc = recv(l_ProcessControlFD, buf, sizeof(buf), 0);
ssize_t rc = recv(FD, buf, sizeof(buf), 0);

if (rc <= 0)
return -1;
Expand All @@ -493,8 +512,18 @@ static int ProcessWaitPID(pid_t pid, int *status)

void Process::InitializeSpawnHelper()
{
if (l_ProcessControlFD == -1)
StartSpawnProcessHelper();
if (!l_ProcessControl.Spawners) {
auto len (std::max(1, Configuration::Concurrency));

l_ProcessControl.Spawners = new Spawner[len];
l_ProcessControl.Len = len;
}

for (Spawner *current = l_ProcessControl.Spawners, *stop = l_ProcessControl.Spawners + l_ProcessControl.Len; current < stop; ++current) {
if (current->FD == -1) {
current->StartSpawnProcessHelper();
}
}
}
#endif /* _WIN32 */

Expand Down Expand Up @@ -980,7 +1009,7 @@ void Process::Run(const std::function<void(const ProcessResult&)>& callback)
fds[1] = outfds[1];
fds[2] = outfds[1];

m_Process = ProcessSpawn(m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds);
m_Process = MySpawner.ProcessSpawn(m_Arguments, m_ExtraEnvironment, m_AdjustPriority, fds);
m_PID = m_Process;

if (m_PID == -1) {
Expand Down Expand Up @@ -1046,7 +1075,7 @@ bool Process::DoEvents()

m_OutputStream << "<Timeout exceeded.>";

int error = ProcessKill(m_Process, SIGTERM);
int error = MySpawner.ProcessKill(m_Process, SIGTERM);
if (error) {
Log(LogWarning, "Process")
<< "Couldn't terminate the process " << m_PID << " (" << PrettyPrintArguments(m_Arguments)
Expand All @@ -1070,7 +1099,7 @@ bool Process::DoEvents()
m_OutputStream << "<Timeout exceeded.>";
TerminateProcess(m_Process, 3);
#else /* _WIN32 */
int error = ProcessKill(-m_Process, SIGKILL);
int error = MySpawner.ProcessKill(-m_Process, SIGKILL);
if (error) {
Log(LogWarning, "Process")
<< "Couldn't kill the process group " << m_PID << " (" << PrettyPrintArguments(m_Arguments)
Expand Down Expand Up @@ -1124,7 +1153,7 @@ bool Process::DoEvents()
int status, exitcode;
if (could_not_kill || m_PID == -1) {
exitcode = 128;
} else if (ProcessWaitPID(m_Process, &status) != m_Process) {
} else if (MySpawner.ProcessWaitPID(m_Process, &status) != m_Process) {
exitcode = 128;

Log(LogWarning, "Process")
Expand Down

0 comments on commit 8a46574

Please sign in to comment.