From a8d025487756f0b5e49636009c69fa04720b9b71 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 17 Jun 2020 13:18:00 +0100 Subject: [PATCH] [7.x][ML] Better interrupt handling during named pipe connection This change fixes two related issues with named pipe connection when the controller process first starts up: 1. If the ES JVM dies before the controller connects its logging named pipe then since elastic/elasticsearch#56491 the resulting errors from the controller process will be seen in the ES stderr file. There is a change to the logging to make it clearer that the controller didn't fail, but exited due to the ES JVM disappearing. 2. Interrupted system calls while connecting the named pipes could cause the pipes to unnecessarily fail to connect. There is a change to retry the calls on getting an EINTR error unless the interrupt was caused by the functionality of controller that kills off the connection attempts if the ES JVM dies (i.e. the scenario described in point 1). Backport of #1311 --- .../CBlockingCallCancellerThread.cc | 13 +- bin/controller/CBlockingCallCancellerThread.h | 15 ++- bin/controller/Main.cc | 48 ++++---- .../CBlockingCallCancellerThreadTest.cc | 19 +-- docs/CHANGELOG.asciidoc | 6 + include/core/CLogger.h | 12 ++ include/core/CNamedPipeFactory.h | 19 ++- lib/api/CIoManager.cc | 7 +- lib/core/CLogger.cc | 17 ++- lib/core/CNamedPipeFactory.cc | 116 ++++++++++++------ lib/core/CNamedPipeFactory_Windows.cc | 89 +++++++++----- lib/core/CThread.cc | 2 +- lib/core/unittest/CNamedPipeFactoryTest.cc | 102 ++++++++------- lib/seccomp/unittest/CSystemCallFilterTest.cc | 31 ++--- 14 files changed, 316 insertions(+), 180 deletions(-) diff --git a/bin/controller/CBlockingCallCancellerThread.cc b/bin/controller/CBlockingCallCancellerThread.cc index 0b7861a903..ef183c42de 100644 --- a/bin/controller/CBlockingCallCancellerThread.cc +++ b/bin/controller/CBlockingCallCancellerThread.cc @@ -14,25 +14,30 @@ namespace controller { CBlockingCallCancellerThread::CBlockingCallCancellerThread(core::CThread::TThreadId potentiallyBlockedThreadId, std::istream& monitorStream) - : m_PotentiallyBlockedThreadId(potentiallyBlockedThreadId), - m_MonitorStream(monitorStream), m_Shutdown(false) { + : m_PotentiallyBlockedThreadId{potentiallyBlockedThreadId}, + m_MonitorStream{monitorStream}, m_Shutdown{false}, m_HasCancelledBlockingCall{false} { +} + +const std::atomic_bool& CBlockingCallCancellerThread::hasCancelledBlockingCall() const { + return m_HasCancelledBlockingCall; } void CBlockingCallCancellerThread::run() { char c; while (m_MonitorStream >> c) { - if (m_Shutdown) { + if (m_Shutdown.load()) { return; } } + m_HasCancelledBlockingCall.store(true); if (core::CThread::cancelBlockedIo(m_PotentiallyBlockedThreadId) == false) { LOG_WARN(<< "Failed to cancel blocked IO in thread " << m_PotentiallyBlockedThreadId); } } void CBlockingCallCancellerThread::shutdown() { - m_Shutdown = true; + m_Shutdown.store(true); // This is to wake up the stream reading in the run() method of this object. // If this has an effect then the assumption is that the program is exiting diff --git a/bin/controller/CBlockingCallCancellerThread.h b/bin/controller/CBlockingCallCancellerThread.h index b0c7d8e226..66fc709268 100644 --- a/bin/controller/CBlockingCallCancellerThread.h +++ b/bin/controller/CBlockingCallCancellerThread.h @@ -8,6 +8,7 @@ #include +#include #include namespace ml { @@ -39,12 +40,14 @@ class CBlockingCallCancellerThread : public core::CThread { CBlockingCallCancellerThread(core::CThread::TThreadId potentiallyBlockedThreadId, std::istream& monitorStream); + const std::atomic_bool& hasCancelledBlockingCall() const; + protected: //! Called when the thread is started. - virtual void run(); + void run() override; //! Called when the thread is stopped. - virtual void shutdown(); + void shutdown() override; private: //! Thread ID of the thread that this object will cancel blocking IO in @@ -54,8 +57,12 @@ class CBlockingCallCancellerThread : public core::CThread { //! Stream to monitor for end-of-file. std::istream& m_MonitorStream; - //! Flag to indicate the thread should shut down - volatile bool m_Shutdown; + //! Flag to indicate the monitoring thread should shut down + std::atomic_bool m_Shutdown; + + //! Flag to indicate that an attempt to cancel blocking calls in the + //! monitored thread has been made + std::atomic_bool m_HasCancelledBlockingCall; }; } } diff --git a/bin/controller/Main.cc b/bin/controller/Main.cc index bbadb5b468..e54ed9841f 100644 --- a/bin/controller/Main.cc +++ b/bin/controller/Main.cc @@ -4,10 +4,10 @@ * you may not use this file except in compliance with the Elastic License. */ //! \brief -//! Controller to start other Ml processes. +//! Controller to start other ML processes. //! //! DESCRIPTION:\n -//! Starts other Ml processes based on commands sent to it +//! Starts other ML processes based on commands sent to it //! through a named pipe. //! //! Each command has the following format: @@ -59,12 +59,12 @@ #include int main(int argc, char** argv) { - const std::string& defaultNamedPipePath = ml::core::CNamedPipeFactory::defaultPath(); - const std::string& progName = ml::core::CProgName::progName(); + const std::string& defaultNamedPipePath{ml::core::CNamedPipeFactory::defaultPath()}; + const std::string& progName{ml::core::CProgName::progName()}; // Read command line options - std::string jvmPidStr = ml::core::CStringUtils::typeToString( - ml::core::CProcess::instance().parentId()); + std::string jvmPidStr{ml::core::CStringUtils::typeToString( + ml::core::CProcess::instance().parentId())}; std::string logPipe; std::string commandPipe; if (ml::controller::CCmdLineParser::parse(argc, argv, jvmPidStr, logPipe, @@ -88,8 +88,8 @@ int main(int argc, char** argv) { // 4) No plugin code ever runs // This thread will detect the death of the parent process because this // process's STDIN will be closed. - ml::controller::CBlockingCallCancellerThread cancellerThread( - ml::core::CThread::currentThreadId(), std::cin); + ml::controller::CBlockingCallCancellerThread cancellerThread{ + ml::core::CThread::currentThreadId(), std::cin}; if (cancellerThread.start() == false) { // This log message will probably never been seen as it will go to the // real stderr of this process rather than the log pipe... @@ -97,8 +97,13 @@ int main(int argc, char** argv) { return EXIT_FAILURE; } - if (ml::core::CLogger::instance().reconfigureLogToNamedPipe(logPipe) == false) { - LOG_FATAL(<< "Could not reconfigure logging"); + if (ml::core::CLogger::instance().reconfigureLogToNamedPipe( + logPipe, cancellerThread.hasCancelledBlockingCall()) == false) { + if (cancellerThread.hasCancelledBlockingCall().load()) { + LOG_INFO(<< "Parent process died - ML controller exiting"); + } else { + LOG_FATAL(<< "Could not reconfigure logging"); + } cancellerThread.stop(); return EXIT_FAILURE; } @@ -112,10 +117,14 @@ int main(int argc, char** argv) { // the controller is critical to the overall system. Also its resource // requirements should always be very low. - ml::core::CNamedPipeFactory::TIStreamP commandStream = - ml::core::CNamedPipeFactory::openPipeStreamRead(commandPipe); + ml::core::CNamedPipeFactory::TIStreamP commandStream{ml::core::CNamedPipeFactory::openPipeStreamRead( + commandPipe, cancellerThread.hasCancelledBlockingCall())}; if (commandStream == nullptr) { - LOG_FATAL(<< "Could not open command pipe"); + if (cancellerThread.hasCancelledBlockingCall().load()) { + LOG_INFO(<< "Parent process died - ML controller exiting"); + } else { + LOG_FATAL(<< "Could not open command pipe"); + } cancellerThread.stop(); return EXIT_FAILURE; } @@ -123,7 +132,7 @@ int main(int argc, char** argv) { // Change directory to the directory containing this program, because the // permitted paths all assume the current working directory contains the // permitted programs - const std::string& progDir = ml::core::CProgName::progDir(); + const std::string& progDir{ml::core::CProgName::progDir()}; if (ml::core::COsFileFuncs::chdir(progDir.c_str()) == -1) { LOG_FATAL(<< "Could not change directory to '" << progDir << "': " << ::strerror(errno)); @@ -131,13 +140,10 @@ int main(int argc, char** argv) { return EXIT_FAILURE; } - ml::controller::CCommandProcessor::TStrVec permittedProcessPaths; - permittedProcessPaths.push_back("./autodetect"); - permittedProcessPaths.push_back("./categorize"); - permittedProcessPaths.push_back("./data_frame_analyzer"); - permittedProcessPaths.push_back("./normalize"); + ml::controller::CCommandProcessor::TStrVec permittedProcessPaths{ + "./autodetect", "./categorize", "./data_frame_analyzer", "./normalize"}; - ml::controller::CCommandProcessor processor(permittedProcessPaths); + ml::controller::CCommandProcessor processor{permittedProcessPaths}; processor.processCommands(*commandStream); cancellerThread.stop(); @@ -145,7 +151,7 @@ int main(int argc, char** argv) { // This message makes it easier to spot process crashes in a log file - if // this isn't present in the log for a given PID and there's no other log // message indicating early exit then the process has probably core dumped - LOG_INFO(<< "Ml controller exiting"); + LOG_INFO(<< "ML controller exiting"); return EXIT_SUCCESS; } diff --git a/bin/controller/unittest/CBlockingCallCancellerThreadTest.cc b/bin/controller/unittest/CBlockingCallCancellerThreadTest.cc index 1588cf2afc..fef7bbbf1c 100644 --- a/bin/controller/unittest/CBlockingCallCancellerThreadTest.cc +++ b/bin/controller/unittest/CBlockingCallCancellerThreadTest.cc @@ -21,16 +21,16 @@ namespace { class CEofThread : public ml::core::CThread { public: - CEofThread(ml::core::CDualThreadStreamBuf& buf) : m_Buf(buf) {} + CEofThread(ml::core::CDualThreadStreamBuf& buf) : m_Buf{buf} {} protected: - virtual void run() { + void run() override { ml::core::CSleep::sleep(200); m_Buf.signalEndOfFile(); } - virtual void shutdown() {} + void shutdown() override {} private: ml::core::CDualThreadStreamBuf& m_Buf; @@ -39,10 +39,10 @@ class CEofThread : public ml::core::CThread { BOOST_AUTO_TEST_CASE(testCancelBlock) { ml::core::CDualThreadStreamBuf buf; - std::istream monStrm(&buf); + std::istream monStrm{&buf}; - ml::controller::CBlockingCallCancellerThread cancellerThread( - ml::core::CThread::currentThreadId(), monStrm); + ml::controller::CBlockingCallCancellerThread cancellerThread{ + ml::core::CThread::currentThreadId(), monStrm}; BOOST_TEST_REQUIRE(cancellerThread.start()); // The CBlockingCallCancellerThread should wake up the blocking open of the @@ -52,11 +52,12 @@ BOOST_AUTO_TEST_CASE(testCancelBlock) { // real program this would be STDIN, but in this test another thread is the // source, and it runs out of data after 0.2 seconds. - CEofThread eofThread(buf); + CEofThread eofThread{buf}; BOOST_TEST_REQUIRE(eofThread.start()); - ml::core::CNamedPipeFactory::TIStreamP pipeStrm = ml::core::CNamedPipeFactory::openPipeStreamRead( - ml::core::CNamedPipeFactory::defaultPath() + "test_pipe"); + ml::core::CNamedPipeFactory::TIStreamP pipeStrm{ml::core::CNamedPipeFactory::openPipeStreamRead( + ml::core::CNamedPipeFactory::defaultPath() + "test_pipe", + cancellerThread.hasCancelledBlockingCall())}; BOOST_TEST_REQUIRE(pipeStrm == nullptr); BOOST_TEST_REQUIRE(cancellerThread.stop()); diff --git a/docs/CHANGELOG.asciidoc b/docs/CHANGELOG.asciidoc index 659bf94875..5c3dcedf1d 100644 --- a/docs/CHANGELOG.asciidoc +++ b/docs/CHANGELOG.asciidoc @@ -56,6 +56,12 @@ * Fix numerical issues leading to blow up of the model plot bounds. (See {ml-pull}1268[#1268].) +== {es} version 7.8.1 + +=== Bug Fixes + +* Better interrupt handling during named pipe connection. (See {ml-pull}1311[#1311].) + == {es} version 7.8.0 === Enhancements diff --git a/include/core/CLogger.h b/include/core/CLogger.h index 9613000f00..44ea626e8d 100644 --- a/include/core/CLogger.h +++ b/include/core/CLogger.h @@ -13,6 +13,7 @@ #include +#include #include #include @@ -90,12 +91,23 @@ class CORE_EXPORT CLogger : private CNonCopyable { //! If both are supplied the named pipe takes precedence. bool reconfigure(const std::string& pipeName, const std::string& propertiesFile); + //! As above, but with a flag to indicate named pipe connection attempts + //! should be cancelled. + bool reconfigure(const std::string& pipeName, + const std::string& propertiesFile, + const std::atomic_bool& isCancelled); + //! Reconfigure to use provided stream. bool reconfigure(boost::shared_ptr streamPtr); //! Tell the logger to log to a named pipe rather than a file. bool reconfigureLogToNamedPipe(const std::string& pipeName); + //! As above, but with a flag to indicate named pipe connection attempts + //! should be cancelled. + bool reconfigureLogToNamedPipe(const std::string& pipeName, + const std::atomic_bool& isCancelled); + //! Tell the logger to reconfigure itself by reading a specified //! properties file, if the file exists. bool reconfigureFromFile(const std::string& propertiesFile); diff --git a/include/core/CNamedPipeFactory.h b/include/core/CNamedPipeFactory.h index 9fc8b01fba..8113d1a47a 100644 --- a/include/core/CNamedPipeFactory.h +++ b/include/core/CNamedPipeFactory.h @@ -10,11 +10,12 @@ #include #include +#include #include #include #include -#include +#include // fdopen() is not C++ so need the C header namespace ml { namespace core { @@ -69,20 +70,24 @@ class CORE_EXPORT CNamedPipeFactory : private CNonInstantiatable { //! Initialise and open a named pipe for reading, returning a C++ stream //! that can be used to read from it. Returns a NULL pointer on //! failure. - static TIStreamP openPipeStreamRead(const std::string& fileName); + static TIStreamP openPipeStreamRead(const std::string& fileName, + const std::atomic_bool& isCancelled); //! Initialise and open a named pipe for writing, returning a C++ stream //! that can be used to write to it. Returns a NULL pointer on failure. - static TOStreamP openPipeStreamWrite(const std::string& fileName); + static TOStreamP openPipeStreamWrite(const std::string& fileName, + const std::atomic_bool& isCancelled); //! Initialise and open a named pipe for writing, returning a C FILE //! that can be used to read from it. Returns a NULL pointer on //! failure. - static TFileP openPipeFileRead(const std::string& fileName); + static TFileP openPipeFileRead(const std::string& fileName, + const std::atomic_bool& isCancelled); //! Initialise and open a named pipe for writing, returning a C FILE //! that can be used to write to it. Returns a NULL pointer on failure. - static TFileP openPipeFileWrite(const std::string& fileName); + static TFileP openPipeFileWrite(const std::string& fileName, + const std::atomic_bool& isCancelled); //! Does the supplied file name refer to a named pipe? static bool isNamedPipe(const std::string& fileName); @@ -107,7 +112,9 @@ class CORE_EXPORT CNamedPipeFactory : private CNonInstantiatable { //! file descriptor that can be used to access it. This is the core //! implementation of the higher level encapsulations that the public //! interface provides. - static TPipeHandle initPipeHandle(const std::string& fileName, bool forWrite); + static TPipeHandle initPipeHandle(const std::string& fileName, + bool forWrite, + const std::atomic_bool& isCancelled); }; } } diff --git a/lib/api/CIoManager.cc b/lib/api/CIoManager.cc index 62d4cb376d..6409c26787 100644 --- a/lib/api/CIoManager.cc +++ b/lib/api/CIoManager.cc @@ -7,6 +7,7 @@ #include +#include #include #include #include @@ -24,7 +25,8 @@ bool setUpIStream(const std::string& fileName, return true; } if (isFileNamedPipe) { - stream = core::CNamedPipeFactory::openPipeStreamRead(fileName); + std::atomic_bool dummy{false}; + stream = core::CNamedPipeFactory::openPipeStreamRead(fileName, dummy); return stream != nullptr && !stream->bad(); } std::ifstream* fileStream(nullptr); @@ -40,7 +42,8 @@ bool setUpOStream(const std::string& fileName, return true; } if (isFileNamedPipe) { - stream = core::CNamedPipeFactory::openPipeStreamWrite(fileName); + std::atomic_bool dummy{false}; + stream = core::CNamedPipeFactory::openPipeStreamWrite(fileName, dummy); return stream != nullptr && !stream->bad(); } std::ofstream* fileStream(nullptr); diff --git a/lib/core/CLogger.cc b/lib/core/CLogger.cc index 3ac4f36624..a0182b4f9a 100644 --- a/lib/core/CLogger.cc +++ b/lib/core/CLogger.cc @@ -266,6 +266,13 @@ const std::string& CLogger::levelToString(ELevel level) { } bool CLogger::reconfigure(const std::string& pipeName, const std::string& propertiesFile) { + std::atomic_bool dummy{false}; + return this->reconfigure(pipeName, propertiesFile, dummy); +} + +bool CLogger::reconfigure(const std::string& pipeName, + const std::string& propertiesFile, + const std::atomic_bool& isCancelled) { if (pipeName.empty()) { if (propertiesFile.empty()) { // Both empty is OK - it just means we keep logging to stderr @@ -273,7 +280,7 @@ bool CLogger::reconfigure(const std::string& pipeName, const std::string& proper } return this->reconfigureFromFile(propertiesFile); } - return this->reconfigureLogToNamedPipe(pipeName); + return this->reconfigureLogToNamedPipe(pipeName, isCancelled); } bool CLogger::reconfigure(boost::shared_ptr streamPtr) { @@ -287,12 +294,18 @@ bool CLogger::reconfigure(boost::shared_ptr streamPtr) { } bool CLogger::reconfigureLogToNamedPipe(const std::string& pipeName) { + std::atomic_bool dummy{false}; + return this->reconfigureLogToNamedPipe(pipeName, dummy); +} + +bool CLogger::reconfigureLogToNamedPipe(const std::string& pipeName, + const std::atomic_bool& isCancelled) { if (m_Reconfigured) { LOG_ERROR(<< "Cannot log to a named pipe after logger reconfiguration"); return false; } - m_PipeFile = CNamedPipeFactory::openPipeFileWrite(pipeName); + m_PipeFile = CNamedPipeFactory::openPipeFileWrite(pipeName, isCancelled); if (m_PipeFile == nullptr) { LOG_ERROR(<< "Cannot log to named pipe " << pipeName << " as it could not be opened for writing"); diff --git a/lib/core/CNamedPipeFactory.cc b/lib/core/CNamedPipeFactory.cc index fbed87e8a8..ec554a960f 100644 --- a/lib/core/CNamedPipeFactory.cc +++ b/lib/core/CNamedPipeFactory.cc @@ -44,7 +44,7 @@ bool ignoreSigPipe() { return ::sigaction(SIGPIPE, &sa, nullptr) == 0; } -const bool SIGPIPE_IGNORED(ignoreSigPipe()); +const bool SIGPIPE_IGNORED{ignoreSigPipe()}; //! \brief //! Replacement for boost::iostreams::file_descriptor_sink that retries on EINTR. @@ -78,12 +78,12 @@ class CRetryingFileDescriptorSink : private boost::iostreams::file_descriptor { //! in the event of an interrupted system call. The method signature is //! defined by Boost's Sink concept. std::streamsize write(const char* s, std::streamsize n) { - std::streamsize totalBytesWritten = 0; + std::streamsize totalBytesWritten{0}; while (n > 0) { - ssize_t ret = ::write(this->handle(), s, static_cast(n)); + ssize_t ret{::write(this->handle(), s, static_cast(n))}; if (ret == -1) { if (errno != EINTR) { - std::string reason("Failed writing to named pipe: "); + std::string reason{"Failed writing to named pipe: "}; reason += ::strerror(errno); LOG_ERROR(<< reason); // We don't usually throw exceptions, but Boost.Iostreams @@ -107,42 +107,50 @@ namespace core { // Initialise static const char CNamedPipeFactory::TEST_CHAR('\n'); -CNamedPipeFactory::TIStreamP CNamedPipeFactory::openPipeStreamRead(const std::string& fileName) { - TPipeHandle fd = CNamedPipeFactory::initPipeHandle(fileName, false); +CNamedPipeFactory::TIStreamP +CNamedPipeFactory::openPipeStreamRead(const std::string& fileName, + const std::atomic_bool& isCancelled) { + TPipeHandle fd{CNamedPipeFactory::initPipeHandle(fileName, false, isCancelled)}; if (fd == -1) { - return TIStreamP(); + return TIStreamP{}; } using TFileDescriptorSourceStream = boost::iostreams::stream; - return TIStreamP(new TFileDescriptorSourceStream( - boost::iostreams::file_descriptor_source(fd, boost::iostreams::close_handle))); + return TIStreamP{new TFileDescriptorSourceStream( + boost::iostreams::file_descriptor_source(fd, boost::iostreams::close_handle))}; } -CNamedPipeFactory::TOStreamP CNamedPipeFactory::openPipeStreamWrite(const std::string& fileName) { - TPipeHandle fd = CNamedPipeFactory::initPipeHandle(fileName, true); +CNamedPipeFactory::TOStreamP +CNamedPipeFactory::openPipeStreamWrite(const std::string& fileName, + const std::atomic_bool& isCancelled) { + TPipeHandle fd{CNamedPipeFactory::initPipeHandle(fileName, true, isCancelled)}; if (fd == -1) { - return TOStreamP(); + return TOStreamP{}; } using TRetryingFileDescriptorSinkStream = boost::iostreams::stream; - return TOStreamP(new TRetryingFileDescriptorSinkStream( - CRetryingFileDescriptorSink(fd, boost::iostreams::close_handle))); + return TOStreamP{new TRetryingFileDescriptorSinkStream( + CRetryingFileDescriptorSink(fd, boost::iostreams::close_handle))}; } -CNamedPipeFactory::TFileP CNamedPipeFactory::openPipeFileRead(const std::string& fileName) { - TPipeHandle fd = CNamedPipeFactory::initPipeHandle(fileName, false); +CNamedPipeFactory::TFileP +CNamedPipeFactory::openPipeFileRead(const std::string& fileName, + const std::atomic_bool& isCancelled) { + TPipeHandle fd{CNamedPipeFactory::initPipeHandle(fileName, false, isCancelled)}; if (fd == -1) { - return TFileP(); + return TFileP{}; } - return TFileP(::fdopen(fd, "r"), safeFClose); + return TFileP{::fdopen(fd, "r"), safeFClose}; } -CNamedPipeFactory::TFileP CNamedPipeFactory::openPipeFileWrite(const std::string& fileName) { - TPipeHandle fd = CNamedPipeFactory::initPipeHandle(fileName, true); +CNamedPipeFactory::TFileP +CNamedPipeFactory::openPipeFileWrite(const std::string& fileName, + const std::atomic_bool& isCancelled) { + TPipeHandle fd{CNamedPipeFactory::initPipeHandle(fileName, true, isCancelled)}; if (fd == -1) { - return TFileP(); + return TFileP{}; } - return TFileP(::fdopen(fd, "w"), safeFClose); + return TFileP{::fdopen(fd, "w"), safeFClose}; } bool CNamedPipeFactory::isNamedPipe(const std::string& fileName) { @@ -161,12 +169,12 @@ std::string CNamedPipeFactory::defaultPath() { // $TMPDIR is generally set on Mac OS X (to something like // /var/folders/k5/5sqcdlps5sg3cvlp783gcz740000h0/T/) and not set on other // platforms. - const char* tmpDir(::getenv("TMPDIR")); + const char* tmpDir{::getenv("TMPDIR")}; // Make sure path ends with a slash so it's ready to have a file name // appended. (_PATH_VARTMP already has this on all platforms I've seen, // but a user-defined $TMPDIR might not.) - std::string path((tmpDir == nullptr) ? _PATH_VARTMP : tmpDir); + std::string path{(tmpDir == nullptr) ? _PATH_VARTMP : tmpDir}; if (path[path.length() - 1] != '/') { path += '/'; } @@ -181,13 +189,27 @@ void CNamedPipeFactory::logDeferredWarnings() { } CNamedPipeFactory::TPipeHandle -CNamedPipeFactory::initPipeHandle(const std::string& fileName, bool forWrite) { - bool madeFifo(false); +CNamedPipeFactory::initPipeHandle(const std::string& fileName, + bool forWrite, + const std::atomic_bool& isCancelled) { + bool madeFifo{false}; + + auto retrySyscallOnInterruptUnlessCancelled = [&isCancelled](std::function fn) { + int ret{-1}; + if (isCancelled.load() == false) { + do { + ret = fn(); + } while (ret == -1 && errno == EINTR && isCancelled.load() == false); + } + return ret; + }; // If the name already exists, ensure it refers directly (i.e. not via a // symlink) to a named pipe COsFileFuncs::TStat statbuf; - if (COsFileFuncs::lstat(fileName.c_str(), &statbuf) == 0) { + if (retrySyscallOnInterruptUnlessCancelled([&fileName, &statbuf]() { + return COsFileFuncs::lstat(fileName.c_str(), &statbuf); + }) == 0) { if ((statbuf.st_mode & S_IFMT) != S_IFIFO) { LOG_ERROR(<< "Unable to create named pipe " << fileName << " - a file " @@ -201,36 +223,52 @@ CNamedPipeFactory::initPipeHandle(const std::string& fileName, bool forWrite) { return -1; } } else { - if (errno != ENOENT) { + if (errno != ENOENT && isCancelled.load() == false) { LOG_WARN(<< "lstat of named pipe " << fileName << " failed with unexpected error " << ::strerror(errno)); } // The file didn't exist, so create a new FIFO for it, with permissions // for the current user only - if (::mkfifo(fileName.c_str(), S_IRUSR | S_IWUSR) == -1) { - LOG_ERROR(<< "Unable to create named pipe " << fileName << ": " - << ::strerror(errno)); + if (retrySyscallOnInterruptUnlessCancelled([&fileName]() { + return ::mkfifo(fileName.c_str(), S_IRUSR | S_IWUSR); + }) == -1) { + if (isCancelled.load() == false) { + LOG_ERROR(<< "Unable to create named pipe " << fileName << ": " + << ::strerror(errno)); + } return -1; } madeFifo = true; } + if (isCancelled.load()) { + return -1; + } + // The open call here will block if there is no other connection to the // named pipe - int fd = COsFileFuncs::open(fileName.c_str(), forWrite ? COsFileFuncs::WRONLY - : COsFileFuncs::RDONLY); + int fd{retrySyscallOnInterruptUnlessCancelled([&fileName, forWrite]() { + return COsFileFuncs::open(fileName.c_str(), forWrite ? COsFileFuncs::WRONLY + : COsFileFuncs::RDONLY); + })}; if (fd == -1) { - LOG_ERROR(<< "Unable to open named pipe " << fileName - << (forWrite ? " for writing: " : " for reading: ") - << ::strerror(errno)); + if (isCancelled.load() == false) { + LOG_ERROR(<< "Unable to open named pipe " << fileName + << (forWrite ? " for writing: " : " for reading: ") + << ::strerror(errno)); + } } else { // Write a test character to the pipe - this is really only necessary on // Windows, but doing it on *nix too will mean the inability of the Java // code to tolerate the test character will be discovered sooner. - if (forWrite && COsFileFuncs::write(fd, &TEST_CHAR, sizeof(TEST_CHAR)) <= 0) { - LOG_ERROR(<< "Unable to test named pipe " << fileName << ": " - << ::strerror(errno)); + if (forWrite && retrySyscallOnInterruptUnlessCancelled([fd]() { + return COsFileFuncs::write(fd, &TEST_CHAR, sizeof(TEST_CHAR)); + }) <= 0) { + if (isCancelled.load() == false) { + LOG_ERROR(<< "Unable to test named pipe " << fileName << ": " + << ::strerror(errno)); + } COsFileFuncs::close(fd); fd = -1; } diff --git a/lib/core/CNamedPipeFactory_Windows.cc b/lib/core/CNamedPipeFactory_Windows.cc index 0486493c94..72749d20c8 100644 --- a/lib/core/CNamedPipeFactory_Windows.cc +++ b/lib/core/CNamedPipeFactory_Windows.cc @@ -19,59 +19,67 @@ namespace { //! fclose() doesn't check for NULL pointers, so wrap it for use as a shared_ptr //! deleter void safeFClose(FILE* file) { - if (file != 0) { + if (file != nullptr) { ::fclose(file); } } //! On Windows ALL named pipes are under this path -const std::string PIPE_PREFIX("\\\\.\\pipe\\"); +const std::string PIPE_PREFIX{"\\\\.\\pipe\\"}; } namespace ml { namespace core { // Initialise static -const char CNamedPipeFactory::TEST_CHAR('\n'); +const char CNamedPipeFactory::TEST_CHAR{'\n'}; -CNamedPipeFactory::TIStreamP CNamedPipeFactory::openPipeStreamRead(const std::string& fileName) { - TPipeHandle handle = CNamedPipeFactory::initPipeHandle(fileName, false); +CNamedPipeFactory::TIStreamP +CNamedPipeFactory::openPipeStreamRead(const std::string& fileName, + const std::atomic_bool& isCancelled) { + TPipeHandle handle{CNamedPipeFactory::initPipeHandle(fileName, false, isCancelled)}; if (handle == INVALID_HANDLE_VALUE) { - return TIStreamP(); + return TIStreamP{}; } using TFileDescriptorSourceStream = boost::iostreams::stream; - return TIStreamP(new TFileDescriptorSourceStream(boost::iostreams::file_descriptor_source( - handle, boost::iostreams::close_handle))); + return TIStreamP{new TFileDescriptorSourceStream(boost::iostreams::file_descriptor_source( + handle, boost::iostreams::close_handle))}; } -CNamedPipeFactory::TOStreamP CNamedPipeFactory::openPipeStreamWrite(const std::string& fileName) { - TPipeHandle handle = CNamedPipeFactory::initPipeHandle(fileName, true); +CNamedPipeFactory::TOStreamP +CNamedPipeFactory::openPipeStreamWrite(const std::string& fileName, + const std::atomic_bool& isCancelled) { + TPipeHandle handle{CNamedPipeFactory::initPipeHandle(fileName, true, isCancelled)}; if (handle == INVALID_HANDLE_VALUE) { - return TOStreamP(); + return TOStreamP{}; } using TFileDescriptorSinkStream = boost::iostreams::stream; - return TOStreamP(new TFileDescriptorSinkStream( - boost::iostreams::file_descriptor_sink(handle, boost::iostreams::close_handle))); + return TOStreamP{new TFileDescriptorSinkStream( + boost::iostreams::file_descriptor_sink(handle, boost::iostreams::close_handle))}; } -CNamedPipeFactory::TFileP CNamedPipeFactory::openPipeFileRead(const std::string& fileName) { - TPipeHandle handle = CNamedPipeFactory::initPipeHandle(fileName, false); +CNamedPipeFactory::TFileP +CNamedPipeFactory::openPipeFileRead(const std::string& fileName, + const std::atomic_bool& isCancelled) { + TPipeHandle handle{CNamedPipeFactory::initPipeHandle(fileName, false, isCancelled)}; if (handle == INVALID_HANDLE_VALUE) { - return TFileP(); + return TFileP{}; } - return TFileP(::fdopen(::_open_osfhandle(reinterpret_cast(handle), _O_RDONLY), "rb"), - safeFClose); + return TFileP{::fdopen(::_open_osfhandle(reinterpret_cast(handle), _O_RDONLY), "rb"), + safeFClose}; } -CNamedPipeFactory::TFileP CNamedPipeFactory::openPipeFileWrite(const std::string& fileName) { - TPipeHandle handle = CNamedPipeFactory::initPipeHandle(fileName, true); +CNamedPipeFactory::TFileP +CNamedPipeFactory::openPipeFileWrite(const std::string& fileName, + const std::atomic_bool& isCancelled) { + TPipeHandle handle{CNamedPipeFactory::initPipeHandle(fileName, true, isCancelled)}; if (handle == INVALID_HANDLE_VALUE) { - return TFileP(); + return TFileP{}; } - return TFileP(::fdopen(::_open_osfhandle(reinterpret_cast(handle), 0), "wb"), - safeFClose); + return TFileP{::fdopen(::_open_osfhandle(reinterpret_cast(handle), 0), "wb"), + safeFClose}; } bool CNamedPipeFactory::isNamedPipe(const std::string& fileName) { @@ -88,21 +96,26 @@ void CNamedPipeFactory::logDeferredWarnings() { } CNamedPipeFactory::TPipeHandle -CNamedPipeFactory::initPipeHandle(const std::string& fileName, bool forWrite) { +CNamedPipeFactory::initPipeHandle(const std::string& fileName, + bool forWrite, + const std::atomic_bool& isCancelled) { // Size of named pipe buffer - static const DWORD BUFFER_SIZE(4096); + static const DWORD BUFFER_SIZE{4096}; // If the name already exists, ensure it refers to a named pipe - HANDLE handle(CreateNamedPipe(fileName.c_str(), + HANDLE handle{CreateNamedPipe(fileName.c_str(), // Input pipes are opened as duplex so we can // write a test byte to them to work around // the Java security manager problem forWrite ? PIPE_ACCESS_OUTBOUND : PIPE_ACCESS_DUPLEX, PIPE_TYPE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS, 1, forWrite ? BUFFER_SIZE : 1, forWrite ? 1 : BUFFER_SIZE, - NMPWAIT_USE_DEFAULT_WAIT, 0)); + NMPWAIT_USE_DEFAULT_WAIT, 0)}; if (handle == INVALID_HANDLE_VALUE) { - LOG_ERROR(<< "Unable to create named pipe " << fileName << ": " << CWindowsError()); + if (isCancelled.load() == false) { + LOG_ERROR(<< "Unable to create named pipe " << fileName << ": " + << CWindowsError()); + } return INVALID_HANDLE_VALUE; } @@ -128,8 +141,8 @@ CNamedPipeFactory::initPipeHandle(const std::string& fileName, bool forWrite) { // tolerate a test character appearing at the beginning of the data it // receives. We use a newline character, as the named pipes carry ND-JSON // and it's easy to make them tolerate blank lines. - bool sufferedShortLivedConnection(false); - DWORD attempt(0); + bool sufferedShortLivedConnection{false}; + DWORD attempt{0}; do { ++attempt; // This call will block if there is no other connection to the named @@ -137,10 +150,12 @@ CNamedPipeFactory::initPipeHandle(const std::string& fileName, bool forWrite) { if (ConnectNamedPipe(handle, 0) == FALSE) { // ERROR_PIPE_CONNECTED means the pipe was already connected so // there was no need to connect it again - not a problem - DWORD errCode(GetLastError()); + DWORD errCode{GetLastError()}; if (errCode != ERROR_PIPE_CONNECTED) { - LOG_ERROR(<< "Unable to connect named pipe " << fileName << ": " - << CWindowsError(errCode)); + if (isCancelled.load() == false) { + LOG_ERROR(<< "Unable to connect named pipe " << fileName + << ": " << CWindowsError(errCode)); + } // Close the pipe (even though it was successfully opened) so // that the net effect of this failed call is nothing CloseHandle(handle); @@ -154,10 +169,16 @@ CNamedPipeFactory::initPipeHandle(const std::string& fileName, bool forWrite) { // Check that the other end of the pipe has not disconnected (which // relies on the Java side of all connections tolerating an initial // blank line) - DWORD bytesWritten(0); + DWORD bytesWritten{0}; if (WriteFile(handle, &TEST_CHAR, sizeof(TEST_CHAR), &bytesWritten, 0) == FALSE || bytesWritten == 0) { DisconnectNamedPipe(handle); + if (isCancelled.load()) { + // Close the pipe (even though it was successfully opened) so + // that the net effect of this failed call is nothing + CloseHandle(handle); + return INVALID_HANDLE_VALUE; + } sufferedShortLivedConnection = true; } else { sufferedShortLivedConnection = false; diff --git a/lib/core/CThread.cc b/lib/core/CThread.cc index ff47aa9aa2..9675201980 100644 --- a/lib/core/CThread.cc +++ b/lib/core/CThread.cc @@ -27,7 +27,7 @@ void noOpHandler(int /*sig*/) { //! Use SIGIO for waking up blocking calls. The same handler will be used in //! all threads, so there's an assumption here that having some other sort of //! handling for SIGIO is not important to any other thread in the process. -//! That will be true of Ml code. If a 3rd party library relied on SIGIO +//! That will be true of ML code. If a 3rd party library relied on SIGIO //! handling then we could change the signal we use in this class to another //! (maybe SIGURG). However, it's bad practice for reusable libraries to //! unconditionally install signal handlers, so unlikely to be a problem. diff --git a/lib/core/unittest/CNamedPipeFactoryTest.cc b/lib/core/unittest/CNamedPipeFactoryTest.cc index f9e0588859..0b09c0d832 100644 --- a/lib/core/unittest/CNamedPipeFactoryTest.cc +++ b/lib/core/unittest/CNamedPipeFactoryTest.cc @@ -15,7 +15,9 @@ #include -#include +#include +#include + #ifndef Windows #include #endif @@ -24,48 +26,55 @@ BOOST_AUTO_TEST_SUITE(CNamedPipeFactoryTest) namespace { -const uint32_t SLEEP_TIME_MS = 100; -const uint32_t PAUSE_TIME_MS = 10; -const size_t MAX_ATTEMPTS = 100; -const size_t TEST_SIZE = 10000; -const char TEST_CHAR = 'a'; +const std::uint32_t SLEEP_TIME_MS{100}; +const std::uint32_t PAUSE_TIME_MS{10}; +const std::size_t MAX_ATTEMPTS{100}; +const std::size_t TEST_SIZE{10000}; +const char TEST_CHAR{'a'}; #ifdef Windows -const char* const TEST_PIPE_NAME = "\\\\.\\pipe\\testpipe"; +const char* const TEST_PIPE_NAME{"\\\\.\\pipe\\testpipe"}; #else -const char* const TEST_PIPE_NAME = "testfiles/testpipe"; +const char* const TEST_PIPE_NAME{"testfiles/testpipe"}; #endif class CThreadBlockCanceller : public ml::core::CThread { public: CThreadBlockCanceller(ml::core::CThread::TThreadId threadId) - : m_ThreadId(threadId) {} + : m_ThreadId{threadId}, m_HasCancelledBlockingCall{false} {} + + const std::atomic_bool& hasCancelledBlockingCall() { + return m_HasCancelledBlockingCall; + } protected: - virtual void run() { + void run() override { // Wait for the file to exist ml::core::CSleep::sleep(SLEEP_TIME_MS); // Cancel the open() or read() operation on the file + m_HasCancelledBlockingCall.store(true); BOOST_TEST_REQUIRE(ml::core::CThread::cancelBlockedIo(m_ThreadId)); } - virtual void shutdown() {} + void shutdown() override {} private: ml::core::CThread::TThreadId m_ThreadId; + std::atomic_bool m_HasCancelledBlockingCall; }; } BOOST_AUTO_TEST_CASE(testServerIsCppReader) { - ml::test::CThreadDataWriter threadWriter(SLEEP_TIME_MS, TEST_PIPE_NAME, - TEST_CHAR, TEST_SIZE); + ml::test::CThreadDataWriter threadWriter{SLEEP_TIME_MS, TEST_PIPE_NAME, + TEST_CHAR, TEST_SIZE}; BOOST_TEST_REQUIRE(threadWriter.start()); - ml::core::CNamedPipeFactory::TIStreamP strm = - ml::core::CNamedPipeFactory::openPipeStreamRead(TEST_PIPE_NAME); + std::atomic_bool dummy{false}; + ml::core::CNamedPipeFactory::TIStreamP strm{ + ml::core::CNamedPipeFactory::openPipeStreamRead(TEST_PIPE_NAME, dummy)}; BOOST_TEST_REQUIRE(strm); - static const std::streamsize BUF_SIZE = 512; + static const std::streamsize BUF_SIZE{512}; std::string readData; char buffer[BUF_SIZE]; do { @@ -85,24 +94,25 @@ BOOST_AUTO_TEST_CASE(testServerIsCppReader) { } BOOST_AUTO_TEST_CASE(testServerIsCReader) { - ml::test::CThreadDataWriter threadWriter(SLEEP_TIME_MS, TEST_PIPE_NAME, - TEST_CHAR, TEST_SIZE); + ml::test::CThreadDataWriter threadWriter{SLEEP_TIME_MS, TEST_PIPE_NAME, + TEST_CHAR, TEST_SIZE}; BOOST_TEST_REQUIRE(threadWriter.start()); - ml::core::CNamedPipeFactory::TFileP file = - ml::core::CNamedPipeFactory::openPipeFileRead(TEST_PIPE_NAME); + std::atomic_bool dummy{false}; + ml::core::CNamedPipeFactory::TFileP file{ + ml::core::CNamedPipeFactory::openPipeFileRead(TEST_PIPE_NAME, dummy)}; BOOST_TEST_REQUIRE(file); - static const size_t BUF_SIZE = 512; + static const std::size_t BUF_SIZE{512}; std::string readData; char buffer[BUF_SIZE]; do { - size_t charsRead = ::fread(buffer, sizeof(char), BUF_SIZE, file.get()); - BOOST_TEST_REQUIRE(!::ferror(file.get())); + std::size_t charsRead{std::fread(buffer, sizeof(char), BUF_SIZE, file.get())}; + BOOST_TEST_REQUIRE(!std::ferror(file.get())); if (charsRead > 0) { readData.append(buffer, charsRead); } - } while (!::feof(file.get())); + } while (!std::feof(file.get())); BOOST_REQUIRE_EQUAL(TEST_SIZE, readData.length()); BOOST_REQUIRE_EQUAL(std::string(TEST_SIZE, TEST_CHAR), readData); @@ -113,15 +123,16 @@ BOOST_AUTO_TEST_CASE(testServerIsCReader) { } BOOST_AUTO_TEST_CASE(testServerIsCppWriter) { - ml::test::CThreadDataReader threadReader(PAUSE_TIME_MS, MAX_ATTEMPTS, TEST_PIPE_NAME); + ml::test::CThreadDataReader threadReader{PAUSE_TIME_MS, MAX_ATTEMPTS, TEST_PIPE_NAME}; BOOST_TEST_REQUIRE(threadReader.start()); - ml::core::CNamedPipeFactory::TOStreamP strm = - ml::core::CNamedPipeFactory::openPipeStreamWrite(TEST_PIPE_NAME); + std::atomic_bool dummy{false}; + ml::core::CNamedPipeFactory::TOStreamP strm{ + ml::core::CNamedPipeFactory::openPipeStreamWrite(TEST_PIPE_NAME, dummy)}; BOOST_TEST_REQUIRE(strm); - size_t charsLeft(TEST_SIZE); - size_t blockSize(7); + std::size_t charsLeft{TEST_SIZE}; + std::size_t blockSize{7}; while (charsLeft > 0) { if (blockSize > charsLeft) { blockSize = charsLeft; @@ -142,21 +153,22 @@ BOOST_AUTO_TEST_CASE(testServerIsCppWriter) { } BOOST_AUTO_TEST_CASE(testServerIsCWriter) { - ml::test::CThreadDataReader threadReader(PAUSE_TIME_MS, MAX_ATTEMPTS, TEST_PIPE_NAME); + ml::test::CThreadDataReader threadReader{PAUSE_TIME_MS, MAX_ATTEMPTS, TEST_PIPE_NAME}; BOOST_TEST_REQUIRE(threadReader.start()); - ml::core::CNamedPipeFactory::TFileP file = - ml::core::CNamedPipeFactory::openPipeFileWrite(TEST_PIPE_NAME); + std::atomic_bool dummy{false}; + ml::core::CNamedPipeFactory::TFileP file{ + ml::core::CNamedPipeFactory::openPipeFileWrite(TEST_PIPE_NAME, dummy)}; BOOST_TEST_REQUIRE(file); - size_t charsLeft(TEST_SIZE); - size_t blockSize(7); + std::size_t charsLeft{TEST_SIZE}; + std::size_t blockSize{7}; while (charsLeft > 0) { if (blockSize > charsLeft) { blockSize = charsLeft; } - BOOST_TEST_REQUIRE( - ::fputs(std::string(blockSize, TEST_CHAR).c_str(), file.get()) >= 0); + BOOST_TEST_REQUIRE(std::fputs(std::string(blockSize, TEST_CHAR).c_str(), + file.get()) >= 0); charsLeft -= blockSize; } @@ -171,19 +183,20 @@ BOOST_AUTO_TEST_CASE(testServerIsCWriter) { } BOOST_AUTO_TEST_CASE(testCancelBlock) { - CThreadBlockCanceller cancellerThread(ml::core::CThread::currentThreadId()); + CThreadBlockCanceller cancellerThread{ml::core::CThread::currentThreadId()}; BOOST_TEST_REQUIRE(cancellerThread.start()); - ml::core::CNamedPipeFactory::TOStreamP strm = - ml::core::CNamedPipeFactory::openPipeStreamWrite(TEST_PIPE_NAME); + ml::core::CNamedPipeFactory::TOStreamP strm{ml::core::CNamedPipeFactory::openPipeStreamWrite( + TEST_PIPE_NAME, cancellerThread.hasCancelledBlockingCall())}; BOOST_TEST_REQUIRE(strm == nullptr); BOOST_TEST_REQUIRE(cancellerThread.stop()); } BOOST_AUTO_TEST_CASE(testErrorIfRegularFile) { - ml::core::CNamedPipeFactory::TIStreamP strm = - ml::core::CNamedPipeFactory::openPipeStreamRead("Main.cc"); + std::atomic_bool dummy{false}; + ml::core::CNamedPipeFactory::TIStreamP strm{ + ml::core::CNamedPipeFactory::openPipeStreamRead("Main.cc", dummy)}; BOOST_TEST_REQUIRE(strm == nullptr); } @@ -196,7 +209,7 @@ BOOST_AUTO_TEST_CASE(testErrorIfSymlink) { // Suppress the error about no assertions in this case BOOST_REQUIRE(BOOST_IS_DEFINED(Windows)); #else - static const char* const TEST_SYMLINK_NAME = "test_symlink"; + static const char* const TEST_SYMLINK_NAME{"test_symlink"}; // Remove any files left behind by a previous failed test, but don't check // the return codes as these calls will usually fail @@ -206,8 +219,9 @@ BOOST_AUTO_TEST_CASE(testErrorIfSymlink) { BOOST_REQUIRE_EQUAL(0, ::mkfifo(TEST_PIPE_NAME, S_IRUSR | S_IWUSR)); BOOST_REQUIRE_EQUAL(0, ::symlink(TEST_PIPE_NAME, TEST_SYMLINK_NAME)); - ml::core::CNamedPipeFactory::TIStreamP strm = - ml::core::CNamedPipeFactory::openPipeStreamRead(TEST_SYMLINK_NAME); + std::atomic_bool dummy{false}; + ml::core::CNamedPipeFactory::TIStreamP strm{ + ml::core::CNamedPipeFactory::openPipeStreamRead(TEST_SYMLINK_NAME, dummy)}; BOOST_TEST_REQUIRE(strm == nullptr); BOOST_REQUIRE_EQUAL(0, ::unlink(TEST_SYMLINK_NAME)); diff --git a/lib/seccomp/unittest/CSystemCallFilterTest.cc b/lib/seccomp/unittest/CSystemCallFilterTest.cc index f5d7453cfc..f1b5ddf938 100644 --- a/lib/seccomp/unittest/CSystemCallFilterTest.cc +++ b/lib/seccomp/unittest/CSystemCallFilterTest.cc @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -27,10 +28,10 @@ BOOST_AUTO_TEST_SUITE(CSystemCallFilterTest) namespace { -const uint32_t SLEEP_TIME_MS = 100; -const size_t TEST_SIZE = 10000; -const size_t MAX_ATTEMPTS = 20; -const char TEST_CHAR = 'a'; +const std::uint32_t SLEEP_TIME_MS{100}; +const std::size_t TEST_SIZE{10000}; +const std::size_t MAX_ATTEMPTS{20}; +const char TEST_CHAR{'a'}; // CTestTmpDir::tmpDir() fails to get the current user after the system call // filter is installed, so cache the value early const std::string TMP_DIR{ml::test::CTestTmpDir::tmpDir()}; @@ -48,14 +49,15 @@ bool systemCall() { void openPipeAndRead(const std::string& filename) { - ml::test::CThreadDataWriter threadWriter(SLEEP_TIME_MS, filename, TEST_CHAR, TEST_SIZE); + ml::test::CThreadDataWriter threadWriter{SLEEP_TIME_MS, filename, TEST_CHAR, TEST_SIZE}; BOOST_TEST_REQUIRE(threadWriter.start()); - ml::core::CNamedPipeFactory::TIStreamP strm = - ml::core::CNamedPipeFactory::openPipeStreamRead(filename); + std::atomic_bool dummy{false}; + ml::core::CNamedPipeFactory::TIStreamP strm{ + ml::core::CNamedPipeFactory::openPipeStreamRead(filename, dummy)}; BOOST_TEST_REQUIRE(strm); - static const std::streamsize BUF_SIZE = 512; + static const std::streamsize BUF_SIZE{512}; std::string readData; readData.reserve(TEST_SIZE); char buffer[BUF_SIZE]; @@ -76,15 +78,16 @@ void openPipeAndRead(const std::string& filename) { } void openPipeAndWrite(const std::string& filename) { - ml::test::CThreadDataReader threadReader(SLEEP_TIME_MS, MAX_ATTEMPTS, filename); + ml::test::CThreadDataReader threadReader{SLEEP_TIME_MS, MAX_ATTEMPTS, filename}; BOOST_TEST_REQUIRE(threadReader.start()); - ml::core::CNamedPipeFactory::TOStreamP strm = - ml::core::CNamedPipeFactory::openPipeStreamWrite(filename); + std::atomic_bool dummy{false}; + ml::core::CNamedPipeFactory::TOStreamP strm{ + ml::core::CNamedPipeFactory::openPipeStreamWrite(filename, dummy)}; BOOST_TEST_REQUIRE(strm); - size_t charsLeft(TEST_SIZE); - size_t blockSize(7); + std::size_t charsLeft{TEST_SIZE}; + std::size_t blockSize{7}; while (charsLeft > 0) { if (blockSize > charsLeft) { blockSize = charsLeft; @@ -106,7 +109,7 @@ void openPipeAndWrite(const std::string& filename) { void makeAndRemoveDirectory(const std::string& dirname) { - boost::filesystem::path temporaryFolder(dirname); + boost::filesystem::path temporaryFolder{dirname}; temporaryFolder /= "test-directory"; boost::system::error_code errorCode;