Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions bin/controller/CBlockingCallCancellerThread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,30 @@ namespace controller {

CBlockingCallCancellerThread::CBlockingCallCancellerThread(core::CThread::TThreadId potentiallyBlockedThreadId,
std::istream& monitorStream)
: m_PotentiallyBlockedThreadId(potentiallyBlockedThreadId),
m_MonitorStream(monitorStream), m_Shutdown(false) {
: m_PotentiallyBlockedThreadId{potentiallyBlockedThreadId},
m_MonitorStream{monitorStream}, m_Shutdown{false}, m_HasCancelledBlockingCall{false} {
}

const std::atomic_bool& CBlockingCallCancellerThread::hasCancelledBlockingCall() const {
return m_HasCancelledBlockingCall;
}

void CBlockingCallCancellerThread::run() {
char c;
while (m_MonitorStream >> c) {
if (m_Shutdown) {
if (m_Shutdown.load()) {
return;
}
}

m_HasCancelledBlockingCall.store(true);
if (core::CThread::cancelBlockedIo(m_PotentiallyBlockedThreadId) == false) {
LOG_WARN(<< "Failed to cancel blocked IO in thread " << m_PotentiallyBlockedThreadId);
}
}

void CBlockingCallCancellerThread::shutdown() {
m_Shutdown = true;
m_Shutdown.store(true);

// This is to wake up the stream reading in the run() method of this object.
// If this has an effect then the assumption is that the program is exiting
Expand Down
15 changes: 11 additions & 4 deletions bin/controller/CBlockingCallCancellerThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <core/CThread.h>

#include <atomic>
#include <iosfwd>

namespace ml {
Expand Down Expand Up @@ -39,12 +40,14 @@ class CBlockingCallCancellerThread : public core::CThread {
CBlockingCallCancellerThread(core::CThread::TThreadId potentiallyBlockedThreadId,
std::istream& monitorStream);

const std::atomic_bool& hasCancelledBlockingCall() const;

protected:
//! Called when the thread is started.
virtual void run();
void run() override;

//! Called when the thread is stopped.
virtual void shutdown();
void shutdown() override;

private:
//! Thread ID of the thread that this object will cancel blocking IO in
Expand All @@ -54,8 +57,12 @@ class CBlockingCallCancellerThread : public core::CThread {
//! Stream to monitor for end-of-file.
std::istream& m_MonitorStream;

//! Flag to indicate the thread should shut down
volatile bool m_Shutdown;
//! Flag to indicate the monitoring thread should shut down
std::atomic_bool m_Shutdown;

//! Flag to indicate that an attempt to cancel blocking calls in the
//! monitored thread has been made
std::atomic_bool m_HasCancelledBlockingCall;
};
}
}
Expand Down
49 changes: 27 additions & 22 deletions bin/controller/Main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
* you may not use this file except in compliance with the Elastic License.
*/
//! \brief
//! Controller to start other Ml processes.
//! Controller to start other ML processes.
//!
//! DESCRIPTION:\n
//! Starts other Ml processes based on commands sent to it
//! Starts other ML processes based on commands sent to it
//! through a named pipe.
//!
//! Each command has the following format:
Expand Down Expand Up @@ -60,12 +60,12 @@
#include <string.h>

int main(int argc, char** argv) {
const std::string& defaultNamedPipePath = ml::core::CNamedPipeFactory::defaultPath();
const std::string& progName = ml::core::CProgName::progName();
const std::string& defaultNamedPipePath{ml::core::CNamedPipeFactory::defaultPath()};
const std::string& progName{ml::core::CProgName::progName()};

// Read command line options
std::string jvmPidStr = ml::core::CStringUtils::typeToString(
ml::core::CProcess::instance().parentId());
std::string jvmPidStr{ml::core::CStringUtils::typeToString(
ml::core::CProcess::instance().parentId())};
std::string logPipe;
std::string commandPipe;
if (ml::controller::CCmdLineParser::parse(argc, argv, jvmPidStr, logPipe,
Expand All @@ -89,17 +89,22 @@ int main(int argc, char** argv) {
// 4) No plugin code ever runs
// This thread will detect the death of the parent process because this
// process's STDIN will be closed.
ml::controller::CBlockingCallCancellerThread cancellerThread(
ml::core::CThread::currentThreadId(), std::cin);
ml::controller::CBlockingCallCancellerThread cancellerThread{
ml::core::CThread::currentThreadId(), std::cin};
if (cancellerThread.start() == false) {
// This log message will probably never been seen as it will go to the
// real stderr of this process rather than the log pipe...
LOG_FATAL(<< "Could not start blocking call canceller thread");
return EXIT_FAILURE;
}

if (ml::core::CLogger::instance().reconfigureLogToNamedPipe(logPipe) == false) {
LOG_FATAL(<< "Could not reconfigure logging");
if (ml::core::CLogger::instance().reconfigureLogToNamedPipe(
logPipe, cancellerThread.hasCancelledBlockingCall()) == false) {
if (cancellerThread.hasCancelledBlockingCall().load()) {
LOG_INFO(<< "Parent process died - ML controller exiting");
} else {
LOG_FATAL(<< "Could not reconfigure logging");
}
cancellerThread.stop();
return EXIT_FAILURE;
}
Expand All @@ -113,41 +118,41 @@ int main(int argc, char** argv) {
// the controller is critical to the overall system. Also its resource
// requirements should always be very low.

ml::core::CNamedPipeFactory::TIStreamP commandStream =
ml::core::CNamedPipeFactory::openPipeStreamRead(commandPipe);
ml::core::CNamedPipeFactory::TIStreamP commandStream{ml::core::CNamedPipeFactory::openPipeStreamRead(
commandPipe, cancellerThread.hasCancelledBlockingCall())};
if (commandStream == nullptr) {
LOG_FATAL(<< "Could not open command pipe");
if (cancellerThread.hasCancelledBlockingCall().load()) {
LOG_INFO(<< "Parent process died - ML controller exiting");
} else {
LOG_FATAL(<< "Could not open command pipe");
}
cancellerThread.stop();
return EXIT_FAILURE;
}

// Change directory to the directory containing this program, because the
// permitted paths all assume the current working directory contains the
// permitted programs
const std::string& progDir = ml::core::CProgName::progDir();
const std::string& progDir{ml::core::CProgName::progDir()};
if (ml::core::COsFileFuncs::chdir(progDir.c_str()) == -1) {
LOG_FATAL(<< "Could not change directory to '" << progDir
<< "': " << ::strerror(errno));
cancellerThread.stop();
return EXIT_FAILURE;
}

ml::controller::CCommandProcessor::TStrVec permittedProcessPaths;
permittedProcessPaths.push_back("./autoconfig");
permittedProcessPaths.push_back("./autodetect");
permittedProcessPaths.push_back("./categorize");
permittedProcessPaths.push_back("./data_frame_analyzer");
permittedProcessPaths.push_back("./normalize");
ml::controller::CCommandProcessor::TStrVec permittedProcessPaths{
"./autoconfig", "./autodetect", "./categorize", "./data_frame_analyzer", "./normalize"};

ml::controller::CCommandProcessor processor(permittedProcessPaths);
ml::controller::CCommandProcessor processor{permittedProcessPaths};
processor.processCommands(*commandStream);

cancellerThread.stop();

// This message makes it easier to spot process crashes in a log file - if
// this isn't present in the log for a given PID and there's no other log
// message indicating early exit then the process has probably core dumped
LOG_INFO(<< "Ml controller exiting");
LOG_INFO(<< "ML controller exiting");

return EXIT_SUCCESS;
}
19 changes: 10 additions & 9 deletions bin/controller/unittest/CBlockingCallCancellerThreadTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ namespace {

class CEofThread : public ml::core::CThread {
public:
CEofThread(ml::core::CDualThreadStreamBuf& buf) : m_Buf(buf) {}
CEofThread(ml::core::CDualThreadStreamBuf& buf) : m_Buf{buf} {}

protected:
virtual void run() {
void run() override {
ml::core::CSleep::sleep(200);

m_Buf.signalEndOfFile();
}

virtual void shutdown() {}
void shutdown() override {}

private:
ml::core::CDualThreadStreamBuf& m_Buf;
Expand All @@ -39,10 +39,10 @@ class CEofThread : public ml::core::CThread {

BOOST_AUTO_TEST_CASE(testCancelBlock) {
ml::core::CDualThreadStreamBuf buf;
std::istream monStrm(&buf);
std::istream monStrm{&buf};

ml::controller::CBlockingCallCancellerThread cancellerThread(
ml::core::CThread::currentThreadId(), monStrm);
ml::controller::CBlockingCallCancellerThread cancellerThread{
ml::core::CThread::currentThreadId(), monStrm};
BOOST_TEST_REQUIRE(cancellerThread.start());

// The CBlockingCallCancellerThread should wake up the blocking open of the
Expand All @@ -52,11 +52,12 @@ BOOST_AUTO_TEST_CASE(testCancelBlock) {
// real program this would be STDIN, but in this test another thread is the
// source, and it runs out of data after 0.2 seconds.

CEofThread eofThread(buf);
CEofThread eofThread{buf};
BOOST_TEST_REQUIRE(eofThread.start());

ml::core::CNamedPipeFactory::TIStreamP pipeStrm = ml::core::CNamedPipeFactory::openPipeStreamRead(
ml::core::CNamedPipeFactory::defaultPath() + "test_pipe");
ml::core::CNamedPipeFactory::TIStreamP pipeStrm{ml::core::CNamedPipeFactory::openPipeStreamRead(
ml::core::CNamedPipeFactory::defaultPath() + "test_pipe",
cancellerThread.hasCancelledBlockingCall())};
BOOST_TEST_REQUIRE(pipeStrm == nullptr);

BOOST_TEST_REQUIRE(cancellerThread.stop());
Expand Down
6 changes: 6 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@

//=== Regressions

== {es} version 7.8.1

=== Bug Fixes

* Better interrupt handling during named pipe connection. (See {ml-pull}1311[#1311].)

== {es} version 7.8.0

=== Enhancements
Expand Down
12 changes: 12 additions & 0 deletions include/core/CLogger.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <boost/log/sources/severity_logger.hpp>

#include <atomic>
#include <functional>
#include <iosfwd>

Expand Down Expand Up @@ -90,12 +91,23 @@ class CORE_EXPORT CLogger : private CNonCopyable {
//! If both are supplied the named pipe takes precedence.
bool reconfigure(const std::string& pipeName, const std::string& propertiesFile);

//! As above, but with a flag to indicate named pipe connection attempts
//! should be cancelled.
bool reconfigure(const std::string& pipeName,
const std::string& propertiesFile,
const std::atomic_bool& isCancelled);

//! Reconfigure to use provided stream.
bool reconfigure(boost::shared_ptr<std::ostream> streamPtr);

//! Tell the logger to log to a named pipe rather than a file.
bool reconfigureLogToNamedPipe(const std::string& pipeName);

//! As above, but with a flag to indicate named pipe connection attempts
//! should be cancelled.
bool reconfigureLogToNamedPipe(const std::string& pipeName,
const std::atomic_bool& isCancelled);

//! Tell the logger to reconfigure itself by reading a specified
//! properties file, if the file exists.
bool reconfigureFromFile(const std::string& propertiesFile);
Expand Down
19 changes: 13 additions & 6 deletions include/core/CNamedPipeFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
#include <core/ImportExport.h>
#include <core/WindowsSafe.h>

#include <atomic>
#include <iosfwd>
#include <memory>
#include <string>

#include <stdio.h>
#include <stdio.h> // fdopen() is not C++ so need the C header

namespace ml {
namespace core {
Expand Down Expand Up @@ -69,20 +70,24 @@ class CORE_EXPORT CNamedPipeFactory : private CNonInstantiatable {
//! Initialise and open a named pipe for reading, returning a C++ stream
//! that can be used to read from it. Returns a NULL pointer on
//! failure.
static TIStreamP openPipeStreamRead(const std::string& fileName);
static TIStreamP openPipeStreamRead(const std::string& fileName,
const std::atomic_bool& isCancelled);

//! Initialise and open a named pipe for writing, returning a C++ stream
//! that can be used to write to it. Returns a NULL pointer on failure.
static TOStreamP openPipeStreamWrite(const std::string& fileName);
static TOStreamP openPipeStreamWrite(const std::string& fileName,
const std::atomic_bool& isCancelled);

//! Initialise and open a named pipe for writing, returning a C FILE
//! that can be used to read from it. Returns a NULL pointer on
//! failure.
static TFileP openPipeFileRead(const std::string& fileName);
static TFileP openPipeFileRead(const std::string& fileName,
const std::atomic_bool& isCancelled);

//! Initialise and open a named pipe for writing, returning a C FILE
//! that can be used to write to it. Returns a NULL pointer on failure.
static TFileP openPipeFileWrite(const std::string& fileName);
static TFileP openPipeFileWrite(const std::string& fileName,
const std::atomic_bool& isCancelled);

//! Does the supplied file name refer to a named pipe?
static bool isNamedPipe(const std::string& fileName);
Expand All @@ -107,7 +112,9 @@ class CORE_EXPORT CNamedPipeFactory : private CNonInstantiatable {
//! file descriptor that can be used to access it. This is the core
//! implementation of the higher level encapsulations that the public
//! interface provides.
static TPipeHandle initPipeHandle(const std::string& fileName, bool forWrite);
static TPipeHandle initPipeHandle(const std::string& fileName,
bool forWrite,
const std::atomic_bool& isCancelled);
};
}
}
Expand Down
7 changes: 5 additions & 2 deletions lib/api/CIoManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <core/CLogger.h>

#include <atomic>
#include <fstream>
#include <ios>
#include <iostream>
Expand All @@ -24,7 +25,8 @@ bool setUpIStream(const std::string& fileName,
return true;
}
if (isFileNamedPipe) {
stream = core::CNamedPipeFactory::openPipeStreamRead(fileName);
std::atomic_bool dummy{false};
stream = core::CNamedPipeFactory::openPipeStreamRead(fileName, dummy);
return stream != nullptr && !stream->bad();
}
std::ifstream* fileStream(nullptr);
Expand All @@ -40,7 +42,8 @@ bool setUpOStream(const std::string& fileName,
return true;
}
if (isFileNamedPipe) {
stream = core::CNamedPipeFactory::openPipeStreamWrite(fileName);
std::atomic_bool dummy{false};
stream = core::CNamedPipeFactory::openPipeStreamWrite(fileName, dummy);
return stream != nullptr && !stream->bad();
}
std::ofstream* fileStream(nullptr);
Expand Down
Loading