Skip to content

pipe.hpp

Alairion edited this page May 8, 2021 · 9 revisions

Pipe

Not Enough Standards' pipes utilities are defined in header <nes/pipe.hpp>

Description

A pipe is a raw byte stream between a reader and a writer. Named pipes are useful for inter-process communication, as they are accessible through their names.

Operations on pipes are blocking: when you try to read some bytes from a pipe, the calling thread is blocked until some data is available. And on the other side, write operations are also blocking, but not in every cases: on many systems, pipes have a buffer for write operations, so, they return immediately even is the reader is not waiting for data. You must be careful when using multiple pipes, and even more if you use multiple pipes in order to do bidirectional communication, because it can easily turn into deadlocks.

When an end of a pipe is closed, and the other end is blocked in an operation, then the other ends operation is cancelled and the stream indicates "end of file".

nes::basic_pipe_streambuf derives from std::basic_streambuf. It is the raw pipe device. You should use it only to implement specific behaviour on read and write operations. If you just want basic pipes then you should use nes::basic_pipe_istream and nes::basic_pipe_ostream.

nes::basic_pipe_istream is a std::basic_istream that represents the read end of a pipe. It can open the read end of a named pipe.

nes::basic_pipe_istream is a std::basic_ostream that represents the write end of a pipe. It can open the write end of a named pipe.

nes::make_anonymous_pipe() create a pair of nes::basic_pipe_istream and nes::basic_pipe_ostream that are linked together. Anonymous pipes can only be used in the creating process.

Synopsis

namespace nes
{

static constexpr const char pipe_root[] = /*implementation-defined*/;

template<typename CharT, typename Traits = std::char_traits<CharT>>
class basic_pipe_streambuf : public std::basic_streambuf<CharT, Traits>
{
public:
    using char_type    = CharT;
    using traits_type  = Traits;
    using int_type     = typename Traits::int_type;
    using pos_type     = typename Traits::pos_type;
    using off_type     = typename Traits::off_type;

public:
    static constexpr std::size_t buf_size{/*implementation-defined*/};

public:
    basic_pipe_streambuf() = default;

    explicit basic_pipe_streambuf(const std::string& name, std::ios_base::openmode mode);

    virtual ~basic_pipe_streambuf();

    basic_pipe_streambuf(const basic_pipe_streambuf&) = delete;
    basic_pipe_streambuf& operator=(const basic_pipe_streambuf&) = delete;

    basic_pipe_streambuf(basic_pipe_streambuf&& other) noexcept;
    basic_pipe_streambuf& operator=(basic_pipe_streambuf&& other) noexcept;
    
    void open(const std::string& name, std::ios_base::openmode mode);
    bool is_open() const noexcept;
    void close();

protected:
    virtual int sync() override;
    
    virtual int_type overflow(int_type c = traits_type::eof()) override;
    virtual std::streamsize xsputn(const char_type* s, std::streamsize count) override;
    
    virtual int_type underflow() override;
    virtual std::streamsize xsgetn(char_type* s, std::streamsize count) override;
};

template<typename CharT, typename Traits = std::char_traits<CharT>>
class basic_pipe_istream : public std::basic_istream<CharT, Traits>
{
public:
    using char_type    = CharT;
    using traits_type  = Traits;
    using int_type     = typename Traits::int_type;
    using pos_type     = typename Traits::pos_type;
    using off_type     = typename Traits::off_type;

public:
    basic_pipe_istream() = default;

    explicit basic_pipe_istream(const std::string& name, std::ios_base::openmode mode = std::ios_base::in);

    virtual ~basic_pipe_istream() = default;
    
    basic_pipe_istream(const basic_pipe_istream&) = delete;
    basic_pipe_istream& operator=(const basic_pipe_istream&) = delete;

    basic_pipe_istream(basic_pipe_istream&& other) noexcept;
    basic_pipe_istream& operator=(basic_pipe_istream&& other) noexcept;

    void open(const std::string& name, std::ios_base::openmode mode = std::ios_base::in);
    bool is_open() const noexcept;
    void close();

    basic_pipe_streambuf<char_type, traits_type>* rdbuf() const noexcept;
};

template<typename CharT, typename Traits = std::char_traits<CharT>>
class basic_pipe_ostream : public std::basic_ostream<CharT, Traits>
{
public:
    using char_type    = CharT;
    using traits_type  = Traits;
    using int_type     = typename Traits::int_type;
    using pos_type     = typename Traits::pos_type;
    using off_type     = typename Traits::off_type;

public:
    basic_pipe_ostream() = default;

    explicit basic_pipe_ostream(const std::string& name, std::ios_base::openmode mode = std::ios_base::out);

    virtual ~basic_pipe_ostream() = default;
    
    basic_pipe_ostream(const basic_pipe_ostream&) = delete;
    basic_pipe_ostream& operator=(const basic_pipe_ostream&) = delete;

    basic_pipe_ostream(basic_pipe_ostream&& other) noexcept;
    basic_pipe_ostream& operator=(basic_pipe_ostream&& other) noexcept;

    void open(const std::string& name, std::ios_base::openmode mode = std::ios_base::out);
    bool is_open() const noexcept;
    void close();

    basic_pipe_streambuf<char_type, traits_type>* rdbuf() const noexcept;
};

template<typename CharT = char, typename Traits = std::char_traits<CharT>>
std::pair<basic_pipe_istream<CharT, Traits>, basic_pipe_ostream<CharT, Traits>> make_anonymous_pipe();

using pipe_streambuf = basic_pipe_streambuf<char>;
using pipe_istream = basic_pipe_istream<char>;
using pipe_ostream = basic_pipe_ostream<char>;

}

Example

Here is an example in which we send data of different types to another thread.
The main thread is the writer, the other is the reader.
In this example, the pipe is used both as a binary and a text stream.

#include <iostream>
#include <thread>
#include <string>

#include <nes/pipe.hpp>

//Fixed size enum used to know the type of a block of data
enum class data_type : std::uint32_t
{
    uint32 = 1,
    float64,
    string
};

void a_thread(nes::basic_pipe_istream<char>& is) noexcept
{
    while(is)
    {
        //We block the thread until a new block of data is available.
        data_type type{};
        is.read(reinterpret_cast<char*>(&type), sizeof(data_type));
        
        //Check the type of the block of data
        if(type == data_type::uint32)
        {
            std::uint32_t value{};
            is.read(reinterpret_cast<char*>(&value), sizeof(std::uint32_t));

            std::cout << "Received an unsigned integer: " << value << std::endl;
        }
        else if(type == data_type::float64)
        {
            double value{};
            is.read(reinterpret_cast<char*>(&value), sizeof(double));

            std::cout << "Received a double: " << value << std::endl;
        }
        else if(type == data_type::string)
        {
            std::uint64_t size{};
            is.read(reinterpret_cast<char*>(&size), sizeof(std::uint64_t));
            std::string str{};
            str.resize(size);
            is.read(reinterpret_cast<char*>(std::data(str)), static_cast<std::streamsize>(size));

            std::cout << "Received a string: " << str << std::endl;
        }
    }
}

int main()
{
    //Create an anonymous pipe
    //"is" is the read end of the pipe (it is a "nes::pipe_istream")
    //"os" is the write end of the pipe (it is a "nes::pipe_ostream")
    auto&&[is, os] = nes::make_anonymous_pipe();

    std::thread thread{a_thread, std::ref(is)};

    for(std::uint32_t i{1}; i < 20; ++i)
    {
        //We send data to the other thread as blocks.
        //Each block start with a "data_type" which is a "std::uint32_t"
        //Depending on the type, the block of data has a variable size
        if((i % 3) == 0)
        {
            const data_type type{data_type::uint32};
            os.write(reinterpret_cast<const char*>(&type), sizeof(data_type));

            os.write(reinterpret_cast<const char*>(&i), sizeof(std::uint32_t));
        }
        else if((i % 3) == 1)
        {
            const data_type type{data_type::float64};
            os.write(reinterpret_cast<const char*>(&type), sizeof(data_type));

            const double value{1.0 / i};
            os.write(reinterpret_cast<const char*>(&value), sizeof(double));
        }
        else if((i % 3) == 2)
        {
            const data_type type{data_type::string};
            os.write(reinterpret_cast<const char*>(&type), sizeof(data_type));

            const std::string str{"Hello " + std::to_string(i) + "!"};
            const std::uint64_t size{std::size(str)};
            
            //Write the size of the string, and the string itself right after
            os.write(reinterpret_cast<const char*>(&size), sizeof(std::uint64_t));
            os.write(std::data(str), static_cast<std::streamsize>(size));
        }
    }

    //Close this end. The other end's operation is cancelled and the stream indicates "eof".
    os.close();

    if(thread.joinable())
        thread.join();
}

Output

Received a double: 1
Received a string: Hello 2!
Received an unsigned integer: 3
Received a double: 0.25
Received a string: Hello 5!
Received an unsigned integer: 6
Received a double: 0.142857
Received a string: Hello 8!
Received an unsigned integer: 9
Received a double: 0.1
Received a string: Hello 11!
Received an unsigned integer: 12
Received a double: 0.0769231
Received a string: Hello 14!
Received an unsigned integer: 15
Received a double: 0.0625
Received a string: Hello 17!
Received an unsigned integer: 18
Received a double: 0.0526316
Clone this wiki locally