Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@
.DS_Store
./doc-gen

__pycache__/
__pycache__/
./runme
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,19 @@ All tests pass on linux & mac. Most pass under mingw & MSVC.

# Changelog

# 0.6.0 TBA

**Breaking Changes**

- `[[nodiscard]]`` added to pipe_x functions. Most likely an error on your part
if the return value is ignored.

**Non-breaking changes**
- Fixed: threads ignoring return value of pipe_write and never terminating
- Changed: internal threads block SIGPIPE on a thread basis. Threads terminate
and close pipes as needed on pipe errors. This breaks the pipe chains which
prevents potential hangs with deep pipe chains.

# 0.5.0 2025-12-09

**Breaking Changes**
Expand Down
18 changes: 17 additions & 1 deletion src/cpp/subprocess.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,20 @@
#include "subprocess/pipe.hpp"
#include "subprocess/ProcessBuilder.hpp"
#include "subprocess/shell_utils.hpp"
#include "subprocess/environ.hpp"
#include "subprocess/environ.hpp"

#ifdef SUBPROCESS_AMALGAMATE_SOURCES
/* To regen:
(for file in *.cpp; do; echo '#include "'subprocess/$file'"'; done;) | sort
*/
#include "subprocess/CowData.cpp"
#include "subprocess/environ.cpp"
#include "subprocess/pipe.cpp"
#include "subprocess/PipeVar.cpp"
#include "subprocess/ProcessBuilder.cpp"
#include "subprocess/ProcessBuilder_posix.cpp"
#include "subprocess/ProcessBuilder_windows.cpp"
#include "subprocess/shell_utils.cpp"
#include "subprocess/utf8_to_utf16.cpp"

#endif
70 changes: 58 additions & 12 deletions src/cpp/subprocess/ProcessBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,25 @@ namespace subprocess {
message += std::strerror(errno_code);
throw OSError(message);
}
struct NoSigPipe {
#ifndef _WIN32
NoSigPipe() {
// get the current state
sigprocmask(SIG_BLOCK, NULL, &old_state);

sigset_t set = old_state;
sigaddset(&set, SIGPIPE);
sigprocmask(SIG_BLOCK, &set, NULL);
}

~NoSigPipe() {
sigprocmask(SIG_BLOCK, &old_state, NULL);
}

private:
sigset_t old_state;
#endif
};
}
double monotonic_seconds() {
static bool needs_init = true;
Expand Down Expand Up @@ -91,6 +110,7 @@ namespace subprocess {
};
std::thread pipe_thread(PipeHandle input, std::ostream* output) {
return std::thread([=]() {
details::NoSigPipe noSigPipe;
AutoClosePipe autoclose(input);
std::vector<char> buffer(2048);
while (true) {
Expand All @@ -102,58 +122,83 @@ namespace subprocess {
});
}

[[nodiscard]]
static ssize_t fwrite_fully(FILE* output, const void* buffer, size_t size) {
const uint8_t* cursor = reinterpret_cast<const uint8_t*>(buffer);
ssize_t total = 0;
while (total < size) {
auto transferred = fwrite(cursor, 1, size - total, output);
if (transferred == 0)
break;
cursor += transferred;
total += transferred;
}
return total;
}

std::thread pipe_thread(PipeHandle input, FILE* output) {
return std::thread([=]() {
details::NoSigPipe noSigPipe;
AutoClosePipe autoclose(input);
std::vector<char> buffer(2048);
while (true) {
ssize_t transfered = pipe_read(input, &buffer[0], buffer.size());
if (transfered <= 0)
ssize_t transferred = pipe_read(input, &buffer[0], buffer.size());
if (transferred <= 0)
break;
transferred = fwrite_fully(output, &buffer[0], transferred);
if (transferred <= 0)
break;
fwrite(&buffer[0], 1, transfered, output);
}
});
}

std::thread pipe_thread(FILE* input, PipeHandle output) {
return std::thread([=]() {
details::NoSigPipe noSigPipe;
AutoClosePipe autoclose(output);
std::vector<char> buffer(2048);
while (true) {
ssize_t transfered = fread(&buffer[0], 1, buffer.size(), input);
if (transfered <= 0)
ssize_t transferred = fread(&buffer[0], 1, buffer.size(), input);
if (transferred <= 0)
break;
transferred = pipe_write_fully(output, &buffer[0], transferred);
if (transferred <= 0)
break;
pipe_write(output, &buffer[0], transfered);
}
});
}
std::thread pipe_thread(std::string& input, PipeHandle output) {
return std::thread([input(move(input)), output]() {
details::NoSigPipe noSigPipe;
AutoClosePipe autoclose(output);

std::size_t pos = 0;
while (pos < input.size()) {
ssize_t transfered = pipe_write(output, input.c_str()+pos, input.size() - pos);
if (transfered <= 0)
ssize_t transferred = pipe_write_fully(output, input.c_str()+pos, input.size() - pos);
if (transferred <= 0)
break;
pos += transfered;
pos += transferred;
}
});
}
std::thread pipe_thread(std::istream* input, PipeHandle output) {
return std::thread([=]() {
details::NoSigPipe noSigPipe;
AutoClosePipe autoclose(output);
std::vector<char> buffer(2048);
while (true) {
input->read(&buffer[0], buffer.size());
ssize_t transfered = input->gcount();
ssize_t transferred = input->gcount();
if (input->bad())
break;
if (transfered <= 0) {
if (transferred <= 0) {
if (input->eof())
break;
continue;
}
pipe_write(output, &buffer[0], transfered);
transferred = pipe_write_fully(output, &buffer[0], transferred);
if (transferred <= 0)
break;
}
});
}
Expand Down Expand Up @@ -279,6 +324,7 @@ namespace subprocess {
Popen::~Popen() {
close();
}

void Popen::close() {
if (cin_thread.joinable())
cin_thread.join();
Expand Down
2 changes: 1 addition & 1 deletion src/cpp/subprocess/ProcessBuilder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ namespace subprocess {
/** equivalent to send_signal(SIGKILL) */
bool kill();

/** Destructs the object and initializes to basic state */
/** Destructs the object and initializes to base state */
void close();
/** Closes the cin pipe */
void close_cin() {
Expand Down
17 changes: 17 additions & 0 deletions src/cpp/subprocess/pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,23 @@ namespace subprocess {
return second + 1;
}

ssize_t pipe_write_fully(PipeHandle handle, const void* buffer, size_t size) {
ssize_t transferred = 0;
ssize_t total = 0;
const uint8_t* cursor = reinterpret_cast<const uint8_t*>(buffer);
while (total < size) {
ssize_t transferred = pipe_write(handle, cursor, size - total);
if (transferred < 0)
return -total - 1;
if (transferred == 0)
break;
cursor += transferred;
total += transferred;
}

return total;
}

PipeHandle pipe_file(const char* filename, const char* mode) {
using std::strchr;
#ifdef _WIN32
Expand Down
17 changes: 17 additions & 0 deletions src/cpp/subprocess/pipe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ namespace subprocess {
};

/** Peak into how many bytes available in pipe to read. */
[[nodiscard]]
ssize_t pipe_peak_bytes(PipeHandle pipe);

/** Closes a pipe handle.
Expand Down Expand Up @@ -76,6 +77,7 @@ namespace subprocess {
to make both ends to be inheritble at creation then you likely have
a bug.
*/
[[nodiscard]]
PipePair pipe_create(bool inheritable = false);

/** Set the pipe to be inheritable or not for subprocess.
Expand All @@ -90,13 +92,24 @@ namespace subprocess {
@returns -1 on error. if 0 it could be the end, or perhaps wait for
more data.
*/
[[nodiscard]]
ssize_t pipe_read(PipeHandle, void* buffer, size_t size);

/**
@returns -1 on error. if 0 it could be full, or perhaps wait for
more data.
*/
[[nodiscard]]
ssize_t pipe_write(PipeHandle, const void* buffer, size_t size);

/** Like pipe_write but keep writing while return value is > 0.

@returns bytes written on success, (-total_transferred - 1) on error,
if 0 it could be full,
*/
[[nodiscard]]
ssize_t pipe_write_fully(PipeHandle, const void* buffer, size_t size);

/** Sets the blocking bit.

The handle state is first queried as to only change the blocking bit.
Expand All @@ -120,6 +133,7 @@ namespace subprocess {
@return all data read from pipe as a string object. This works fine
with binary data.
*/
[[nodiscard]]
std::string pipe_read_all(PipeHandle handle);

/** Waits for the pipes to be change state.
Expand All @@ -133,12 +147,14 @@ namespace subprocess {
@param seconds
The timeout in seconds to wait for. -1 for indefinate
*/
[[nodiscard]]
int pipe_wait_for_read(
PipeHandle pipe,
double seconds
);

/** Will read up to size and not block until buffer is filled. */
[[nodiscard]]
ssize_t pipe_read_some(PipeHandle, void* buffer, size_t size);

/** Opens a file and returns the handle.
Expand All @@ -157,6 +173,7 @@ namespace subprocess {

@returns the handle to the opened file, or kBadPipeValue on error
*/
[[nodiscard]]
PipeHandle pipe_file(const char* filename, const char* mode);

#if 0
Expand Down
Loading