From 0588311df4ec50648d226f2da057faad8032467b Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Thu, 16 Oct 2025 18:55:44 +0100 Subject: [PATCH 1/6] work in progress --- include/boost/http_io/body_read_stream.hpp | 55 ++++++ .../boost/http_io/impl/body_read_stream.hpp | 187 ++++++++++++++++++ test/unit/Jamfile | 1 + test/unit/body_read_stream.cpp | 140 +++++++++++++ 4 files changed, 383 insertions(+) create mode 100644 include/boost/http_io/body_read_stream.hpp create mode 100644 include/boost/http_io/impl/body_read_stream.hpp create mode 100644 test/unit/body_read_stream.cpp diff --git a/include/boost/http_io/body_read_stream.hpp b/include/boost/http_io/body_read_stream.hpp new file mode 100644 index 0000000..d4e37f9 --- /dev/null +++ b/include/boost/http_io/body_read_stream.hpp @@ -0,0 +1,55 @@ +// +// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/vinniefalco/http_io +// + +#ifndef BOOST_HTTP_IO_BODY_READ_STREAM_HPP +#define BOOST_HTTP_IO_BODY_READ_STREAM_HPP + +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace http_io { + +template +class body_read_stream { + +public: + explicit + body_read_stream( + const rts::context& rts_ctx, + UnderlyingAsyncReadStream& und_stream, + http_proto::parser& pr); + + template< + class MutableBufferSequence, + BOOST_ASIO_COMPLETION_TOKEN_FOR( + void(system::error_code, std::size_t)) CompletionToken> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, + void(system::error_code, std::size_t)) + async_read_some( + const MutableBufferSequence& mb, + CompletionToken&& token); + +private: + const rts::context& rts_ctx_; + UnderlyingAsyncReadStream& und_stream_; + http_proto::parser& pr_; +}; + +} // http_io +} // boost + +#include + +#endif diff --git a/include/boost/http_io/impl/body_read_stream.hpp b/include/boost/http_io/impl/body_read_stream.hpp new file mode 100644 index 0000000..8175b0e --- /dev/null +++ b/include/boost/http_io/impl/body_read_stream.hpp @@ -0,0 +1,187 @@ +// +// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/vinniefalco/http_io +// + +#ifndef BOOST_HTTP_IO_IMPL_BODY_READ_STREAM_HPP +#define BOOST_HTTP_IO_IMPL_BODY_READ_STREAM_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace http_io { + +namespace detail { + +template +class body_read_stream_op : public asio::coroutine { + + UnderlyingAsyncReadStream& underlying_stream_; + const MutableBufferSequence& mb_; + http_proto::parser& pr_; + bool already_have_header_ = false; + bool some_ = false; + +public: + body_read_stream_op( + UnderlyingAsyncReadStream& s, + const MutableBufferSequence& mb, + http_proto::parser& pr, + bool some) noexcept + : underlying_stream_(s) + , mb_(mb) + , pr_(pr) + , some_(some) + { + } + + + template + void + operator()( + Self& self, + system::error_code ec = {}, + std::size_t bytes_transferred = 0) + { + BOOST_ASIO_CORO_REENTER(*this) + { + if (!pr_.is_complete()) + { + for (;;) + { + BOOST_ASIO_CORO_YIELD + { + BOOST_ASIO_HANDLER_LOCATION(( + __FILE__, __LINE__, + "async_read_some")); + underlying_stream_.async_read_some( + pr_.prepare(), + std::move(self)); + } + pr_.commit(bytes_transferred); + if (ec == asio::error::eof) + { + BOOST_ASSERT( + bytes_transferred == 0); + pr_.commit_eof(); + ec = {}; + } + else if (ec.failed()) + { + break; // genuine error + } + pr_.parse(ec); + if (ec.failed() && ec != http_proto::condition::need_more_input) + { + break; // genuine error. + } + if (already_have_header_) { + if (!ec.failed()) + { + BOOST_ASSERT( + pr_.is_complete()); + break; + } + if (some_) + { + ec = {}; + break; + } + } + if (!already_have_header_ && pr_.got_header()) + { + already_have_header_ = true; + ec = {}; // override possible need_more_input + pr_.parse(ec); // having parsed the header, callle parse again for the start of the body. + if (ec.failed() && ec != http_proto::condition::need_more_input) + { + break; // genuine error. + } + if (!ec.failed()) + { + BOOST_ASSERT( + pr_.is_complete()); + break; + } + } + } + } + + auto source_buf = pr_.pull_body(); + + std::size_t n = boost::asio::buffer_copy(mb_, source_buf); + + pr_.consume_body(n); + + ec = (n != 0) ? system::error_code{} : asio::stream_errc::eof; + + // for some reason this crashes - not sure yet why: + //asio::dispatch( + // asio::get_associated_executor(underlying_stream_), + // asio::prepend(std::move(self), ec, n)); + + self.complete(ec, n); // TODO - work out the byte count + } + } +}; + +} // detail + +//------------------------------------------------ + + // TODO: copy in Beast's stream traits to check if UnderlyingAsyncReadStream + // is an AsyncReadStream, and also static_assert that body_read_stream is too. + + + +template +body_read_stream::body_read_stream( + const rts::context& rts_ctx + , UnderlyingAsyncReadStream& und_stream + , http_proto::parser& pr) + : + rts_ctx_(rts_ctx) + , und_stream_(und_stream) + , pr_(pr) +{ +} + + +template +template< + class MutableBufferSequence, + BOOST_ASIO_COMPLETION_TOKEN_FOR( + void(system::error_code, std::size_t)) CompletionToken> +BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, + void(system::error_code, std::size_t)) +body_read_stream::async_read_some( + const MutableBufferSequence& mb + , CompletionToken&& token) +{ + return asio::async_compose< + CompletionToken, + void(system::error_code, std::size_t)>( + detail::body_read_stream_op< + MutableBufferSequence, UnderlyingAsyncReadStream>{und_stream_, mb, pr_, true}, + token, + asio::get_associated_executor(und_stream_) + ); +} + +} // http_io +} // boost + +#endif diff --git a/test/unit/Jamfile b/test/unit/Jamfile index c9066dc..c0b09d5 100644 --- a/test/unit/Jamfile +++ b/test/unit/Jamfile @@ -32,6 +32,7 @@ project ; local SOURCES = + body_read_stream.cpp buffer.cpp read.cpp sandbox.cpp diff --git a/test/unit/body_read_stream.cpp b/test/unit/body_read_stream.cpp new file mode 100644 index 0000000..fa8c23e --- /dev/null +++ b/test/unit/body_read_stream.cpp @@ -0,0 +1,140 @@ +// +// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/vinniefalco/http_io +// + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "test_suite.hpp" + +#include + +namespace boost { + +template +struct MockReadStream { + MockReadStream(Executor& ex, const std::string &data) : ex_(ex), mock_data_(data), sent_(0) + { + } + + Executor get_executor() const { return ex_; } + + //template auto async_write_some(asio::const_buffer buf, Token&& token) { + // return asio::async_initiate( // + // [&ex_](auto h, auto buf) { + // asio::dispatch(ex_, [=, h = std::move(h)]() mutable { + // std::move(h)({}, asio::buffer_size(buf)); + // }); + // }, + // token, buf); + //} + + template< + class MutableBufferSequence, + BOOST_ASIO_COMPLETION_TOKEN_FOR( + void(system::error_code, std::size_t)) CompletionToken> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, + void(system::error_code, std::size_t)) + async_read_some( + const MutableBufferSequence& buf + , CompletionToken&& token) + { + return asio::async_initiate( + [this]( + CompletionToken&& token + , const MutableBufferSequence& buf) + { + boost::string_view source_str{ + mock_data_.data() + sent_, + mock_data_.size() - sent_ }; + auto source_buf = asio::buffer(source_str); + + std::size_t chunk_size = rand() % mock_data_.size() + 1 + 100; + + std::size_t n = asio::buffer_copy(buf, source_buf, chunk_size); + + system::error_code ec = (n != 0) ? system::error_code{} : asio::stream_errc::eof; + + sent_ += n; + + std::move(token)(ec, n); + //asio::post(ex_, asio::prepend(std::move(token), ec, n)); + }, + token, buf); + } + + Executor& ex_; + std::string mock_data_; + std::size_t sent_; +}; + +namespace http_io { + +struct body_read_stream_test +{ + void + run() + { + std::string data = "HTTP/1.1 200 OK\r\n" + "Content-Type: text/html\r\n" + "Last-Modified: Thu, 09 Oct 2025 16:42:02 GMT\r\n" + "Cache-Control: max-age=86000\r\n" + "Date: Thu, 16 Oct 2025 15:09:10 GMT\r\n" + "Content-Length: 60\r\n" + "Connection: keep-alive\r\n" + "\r\n" + "Hello World\r\n"; + + asio::io_context ioc; + MockReadStream ms(ioc, data); + + std::array arr; + auto buf = asio::buffer(arr); + + rts::context rts_ctx; + http_proto::response_parser::config cfg; + cfg.body_limit = 1024 * 1024; + cfg.min_buffer = 1024 * 1024; + http_proto::install_parser_service(rts_ctx, cfg); + http_proto::response_parser pr(rts_ctx); + pr.reset(); + pr.start(); + body_read_stream brs(rts_ctx, ms, pr); + + brs.async_read_some(buf, + [this, &brs, &arr, &buf](system::error_code ec, std::size_t bytes_transferred) + { + std::cout << "Error: " << ec.failed() << " bytes " << bytes_transferred << std::endl; + std::cout << std::string(arr.data(), bytes_transferred) << std::endl; + + brs.async_read_some(buf, + [this, &brs, &arr, &buf](system::error_code ec, std::size_t bytes_transferred) + { + std::cout << "2nd Error: " << ec.failed() << " bytes " << bytes_transferred << std::endl; + std::cout << std::string(arr.data(), bytes_transferred) << std::endl; + }); + }); + ioc.run(); + } +}; + +TEST_SUITE( + body_read_stream_test, + "boost.http_io.body_read_stream"); + +} // http_io +} // boost From 84c829f28812fa861737ffc1b993590b51311df4 Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Mon, 20 Oct 2025 10:36:03 +0100 Subject: [PATCH 2/6] work in progress --- .../boost/http_io/impl/body_read_stream.hpp | 36 ++++++++++----- test/unit/body_read_stream.cpp | 44 ++++++++++++++----- 2 files changed, 58 insertions(+), 22 deletions(-) diff --git a/include/boost/http_io/impl/body_read_stream.hpp b/include/boost/http_io/impl/body_read_stream.hpp index 8175b0e..7e8651b 100644 --- a/include/boost/http_io/impl/body_read_stream.hpp +++ b/include/boost/http_io/impl/body_read_stream.hpp @@ -15,12 +15,15 @@ #include #include #include +#include #include #include #include #include #include +#include + namespace boost { namespace http_io { @@ -48,7 +51,6 @@ class body_read_stream_op : public asio::coroutine { { } - template void operator()( @@ -105,7 +107,7 @@ class body_read_stream_op : public asio::coroutine { { already_have_header_ = true; ec = {}; // override possible need_more_input - pr_.parse(ec); // having parsed the header, callle parse again for the start of the body. + pr_.parse(ec); // having parsed the header, call parse again for the start of the body. if (ec.failed() && ec != http_proto::condition::need_more_input) { break; // genuine error. @@ -119,21 +121,33 @@ class body_read_stream_op : public asio::coroutine { } } } + else { + BOOST_ASIO_CORO_YIELD + { + BOOST_ASIO_HANDLER_LOCATION(( + __FILE__, __LINE__, + "async_read_some")); + // The initiation function must return before the completion handler is called. + asio::dispatch( + asio::get_associated_executor(underlying_stream_), + std::move(self)); + } + } - auto source_buf = pr_.pull_body(); + std::size_t n = 0; - std::size_t n = boost::asio::buffer_copy(mb_, source_buf); + if (!ec.failed()) + { + auto source_buf = pr_.pull_body(); - pr_.consume_body(n); + n = boost::asio::buffer_copy(mb_, source_buf); - ec = (n != 0) ? system::error_code{} : asio::stream_errc::eof; + pr_.consume_body(n); - // for some reason this crashes - not sure yet why: - //asio::dispatch( - // asio::get_associated_executor(underlying_stream_), - // asio::prepend(std::move(self), ec, n)); + ec = (n != 0) ? system::error_code{} : asio::stream_errc::eof; + } - self.complete(ec, n); // TODO - work out the byte count + self.complete(ec, n); } } }; diff --git a/test/unit/body_read_stream.cpp b/test/unit/body_read_stream.cpp index fa8c23e..c8fed5d 100644 --- a/test/unit/body_read_stream.cpp +++ b/test/unit/body_read_stream.cpp @@ -15,14 +15,18 @@ #include #include #include +#include #include #include #include #include "test_suite.hpp" +#include #include +static int count = 0; + namespace boost { template @@ -63,16 +67,21 @@ struct MockReadStream { mock_data_.size() - sent_ }; auto source_buf = asio::buffer(source_str); - std::size_t chunk_size = rand() % mock_data_.size() + 1 + 100; + std::size_t chunk_size = std::max( + (std::size_t)(rand() % mock_data_.size()), + (std::size_t)1); std::size_t n = asio::buffer_copy(buf, source_buf, chunk_size); + count += n; + + std::cout << "Writing: (" << n << " gives " << count << ") : " << std::string(asio::buffers_begin(buf), asio::buffers_begin(buf) + n) << std::endl; + system::error_code ec = (n != 0) ? system::error_code{} : asio::stream_errc::eof; sent_ += n; - std::move(token)(ec, n); - //asio::post(ex_, asio::prepend(std::move(token), ec, n)); + asio::post(ex_, asio::prepend(std::move(token), ec, n)); }, token, buf); } @@ -100,9 +109,10 @@ struct body_read_stream_test "Hello World\r\n"; asio::io_context ioc; - MockReadStream ms(ioc, data); + auto strand = asio::make_strand(ioc); + MockReadStream ms(strand, data); - std::array arr; + std::array arr; auto buf = asio::buffer(arr); rts::context rts_ctx; @@ -113,19 +123,31 @@ struct body_read_stream_test http_proto::response_parser pr(rts_ctx); pr.reset(); pr.start(); - body_read_stream brs(rts_ctx, ms, pr); + body_read_stream brs(rts_ctx, ms, pr); brs.async_read_some(buf, [this, &brs, &arr, &buf](system::error_code ec, std::size_t bytes_transferred) { - std::cout << "Error: " << ec.failed() << " bytes " << bytes_transferred << std::endl; - std::cout << std::string(arr.data(), bytes_transferred) << std::endl; + if (ec.failed()) std::cerr << ec.message() << std::endl; + + BOOST_TEST_EQ(ec.failed(), false); + BOOST_TEST_EQ(bytes_transferred, 41); + { + std::string value(arr.data(), bytes_transferred); + BOOST_TEST_EQ(value, std::string("Hello W")); + } brs.async_read_some(buf, [this, &brs, &arr, &buf](system::error_code ec, std::size_t bytes_transferred) { - std::cout << "2nd Error: " << ec.failed() << " bytes " << bytes_transferred << std::endl; - std::cout << std::string(arr.data(), bytes_transferred) << std::endl; + if (ec.failed()) std::cerr << ec.message() << std::endl; + + BOOST_TEST_EQ(ec.failed(), false); + BOOST_TEST_EQ(bytes_transferred, 19); + { + std::string value(arr.data(), bytes_transferred); + BOOST_TEST_EQ(value, std::string("orld")); + } }); }); ioc.run(); @@ -134,7 +156,7 @@ struct body_read_stream_test TEST_SUITE( body_read_stream_test, - "boost.http_io.body_read_stream"); + "boost.http_io.body_read_stream.hello_world"); } // http_io } // boost From ca1f1f1df0a154d25e3d48d640831a3848753f5b Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Mon, 20 Oct 2025 10:56:01 +0100 Subject: [PATCH 3/6] work in progress --- include/boost/http_io/body_read_stream.hpp | 2 -- include/boost/http_io/impl/body_read_stream.hpp | 6 ++---- test/unit/body_read_stream.cpp | 8 +------- 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/include/boost/http_io/body_read_stream.hpp b/include/boost/http_io/body_read_stream.hpp index d4e37f9..6a0312f 100644 --- a/include/boost/http_io/body_read_stream.hpp +++ b/include/boost/http_io/body_read_stream.hpp @@ -27,7 +27,6 @@ class body_read_stream { public: explicit body_read_stream( - const rts::context& rts_ctx, UnderlyingAsyncReadStream& und_stream, http_proto::parser& pr); @@ -42,7 +41,6 @@ class body_read_stream { CompletionToken&& token); private: - const rts::context& rts_ctx_; UnderlyingAsyncReadStream& und_stream_; http_proto::parser& pr_; }; diff --git a/include/boost/http_io/impl/body_read_stream.hpp b/include/boost/http_io/impl/body_read_stream.hpp index 7e8651b..900f9ae 100644 --- a/include/boost/http_io/impl/body_read_stream.hpp +++ b/include/boost/http_io/impl/body_read_stream.hpp @@ -163,12 +163,10 @@ class body_read_stream_op : public asio::coroutine { template body_read_stream::body_read_stream( - const rts::context& rts_ctx - , UnderlyingAsyncReadStream& und_stream + UnderlyingAsyncReadStream& und_stream , http_proto::parser& pr) : - rts_ctx_(rts_ctx) - , und_stream_(und_stream) + und_stream_(und_stream) , pr_(pr) { } diff --git a/test/unit/body_read_stream.cpp b/test/unit/body_read_stream.cpp index c8fed5d..dab09fa 100644 --- a/test/unit/body_read_stream.cpp +++ b/test/unit/body_read_stream.cpp @@ -25,8 +25,6 @@ #include #include -static int count = 0; - namespace boost { template @@ -73,10 +71,6 @@ struct MockReadStream { std::size_t n = asio::buffer_copy(buf, source_buf, chunk_size); - count += n; - - std::cout << "Writing: (" << n << " gives " << count << ") : " << std::string(asio::buffers_begin(buf), asio::buffers_begin(buf) + n) << std::endl; - system::error_code ec = (n != 0) ? system::error_code{} : asio::stream_errc::eof; sent_ += n; @@ -123,7 +117,7 @@ struct body_read_stream_test http_proto::response_parser pr(rts_ctx); pr.reset(); pr.start(); - body_read_stream brs(rts_ctx, ms, pr); + body_read_stream brs(ms, pr); brs.async_read_some(buf, [this, &brs, &arr, &buf](system::error_code ec, std::size_t bytes_transferred) From 38cb4a98b08f80e1c70b7734233891e168088aa4 Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Mon, 20 Oct 2025 18:57:10 +0100 Subject: [PATCH 4/6] work in progress --- include/boost/http_io/body_read_stream.hpp | 202 +++++++++++++++++- .../boost/http_io/impl/body_read_stream.hpp | 159 +++++++------- 2 files changed, 266 insertions(+), 95 deletions(-) diff --git a/include/boost/http_io/body_read_stream.hpp b/include/boost/http_io/body_read_stream.hpp index 6a0312f..d810ee1 100644 --- a/include/boost/http_io/body_read_stream.hpp +++ b/include/boost/http_io/body_read_stream.hpp @@ -11,25 +11,142 @@ #define BOOST_HTTP_IO_BODY_READ_STREAM_HPP #include -#include -#include +#include #include #include -#include -#include namespace boost { namespace http_io { -template + /** A body reader for HTTP/1 messages. + + This type meets the requirements of asio's + AsyncReadStream, and is constructed with a reference to an + underlying AsyncReadStream. + + Any call to `async_read_some` initially triggers reads + from the underlying stream until all of the HTTP headers + have been read and processed. Thereafter, each subsequent + call to `async_read_some` triggers a call to the underlying + stream's `async_read_some` method, with the resulting body + data stored in the referenced MutableBufferSequence. + + All processing depends on a http_io::parser object owned + by the caller and referenced in the construction of this + object. + + @see + @ref response_parser, + @ref request_parser. + */ +template class body_read_stream { public: + + /** The type of the executor associated with the stream. + + This will be the type of executor used to invoke completion + handlers which do not have an explicit associated executor. + */ + typedef AsyncReadStream::executor_type executor_type; + + /** Get the executor associated with the object. + + This function may be used to obtain the executor object that the + stream uses to dispatch completion handlers without an assocaited + executor. + + @return A copy of the executor that stream will use to dispatch + handlers. + */ + executor_type get_executor() { + return us_.get_executor(); + } + + /** Constructor + + This constructor creates the stream by forwarding all arguments + to the underlying socket. The socket then needs to be open and + connected or accepted before data can be sent or received on it. + + @param us The underlying stream from which the HTTP message is read. + This object's executor is initialized to that of the + underlying stream. + + @param pr A http_proto::parser object which will perform the parsing + of the HTTP message and extraction of the body. This must + be initialized by the caller and ownership of the parser is + retained by the caller, which must guarantee that it remains + valid until the handler is called. + */ explicit body_read_stream( - UnderlyingAsyncReadStream& und_stream, + AsyncReadStream& us, http_proto::parser& pr); + /** Read some data asynchronously. + + This function is used to asynchronously read data from the stream. + + This call always returns immediately. The asynchronous operation + will continue until one of the following conditions is true: + + @li The HTTP headers are read in full from the underlying stream + and one or more bytes of the body are read from the stream and + stored in the buffer `mb`. + + @li An error occurs. + + The algorithm, known as a composed asynchronous operation, + is implemented in terms of calls to the underlying stream's `async_read_some` + function. The program must ensure that no other calls to @ref + `async_read_some` are performed until this operation completes. + + @param mb The buffers into which the data will be read. If the size + of the buffers is zero bytes, the operation always completes immediately + with no error. + Although the buffers object may be copied as necessary, ownership of the + underlying memory blocks is retained by the caller, which must guarantee + that they remain valid until the handler is called. + Where the mb buffer is not of sufficient size to hold the read data, the + remainder may be read by subsequent calls to this function. + + @param handler The completion handler to invoke when the operation + completes. The implementation takes ownership of the handler by + performing a decay-copy. The equivalent function signature of + the handler must be: + @code + void handler( + error_code error, // Result of operation. + std::size_t bytes_transferred // Number of bytes read. + ); + @endcode + If the handler has an associated immediate executor, + an immediate completion will be dispatched to it. + Otherwise, the handler will not be invoked from within + this function. Invocation of the handler will be performed + by dispatching to the immediate executor. If no + immediate executor is specified, this is equivalent + to using `net::post`. + + @note The `async_read_some` operation may not receive all of the requested + number of bytes. Consider using the function `net::async_read` if you need + to ensure that the requested amount of data is read before the asynchronous + operation completes. + + @par Per-Operation Cancellation + + This asynchronous operation supports cancellation for the following + net::cancellation_type values: + + @li @c net::cancellation_type::terminal + @li @c net::cancellation_type::partial + @li @c net::cancellation_type::total + + if they are also supported by the underlying stream's @c async_read_some + operation. + */ template< class MutableBufferSequence, BOOST_ASIO_COMPLETION_TOKEN_FOR( @@ -38,10 +155,79 @@ class body_read_stream { void(system::error_code, std::size_t)) async_read_some( const MutableBufferSequence& mb, - CompletionToken&& token); + CompletionToken&& handler); + + /** Read all remaining data asynchronously. + This function is used to asynchronously read data from the stream. + + This call always returns immediately. The asynchronous operation + will continue until one of the following conditions is true: + + @li The HTTP message is read in full from the underlying stream. + + @li An error occurs. + + The algorithm, known as a composed asynchronous operation, + is implemented in terms of calls to the underlying stream's `async_read_some` + function. The program must ensure that no other calls to @ref + `async_read_some` are performed until this operation completes. + + @param mb The buffers into which the body data will be read. If the size + of the buffers is zero bytes, the operation always completes immediately + with no error. + Although the buffers object may be copied as necessary, ownership of the + underlying memory blocks is retained by the caller, which must guarantee + that they remain valid until the handler is called. + Where the mb buffer is not of sufficient size to hold the read data, the + remainder may be read by subsequent calls to this function. + + @param handler The completion handler to invoke when the operation + completes. The implementation takes ownership of the handler by + performing a decay-copy. The equivalent function signature of + the handler must be: + @code + void handler( + error_code error, // Result of operation. + std::size_t bytes_transferred // Number of bytes read. + ); + @endcode + If the handler has an associated immediate executor, + an immediate completion will be dispatched to it. + Otherwise, the handler will not be invoked from within + this function. Invocation of the handler will be performed + by dispatching to the immediate executor. If no + immediate executor is specified, this is equivalent + to using `net::post`. + + @note The `async_read_some` operation may not receive all of the requested + number of bytes. Consider using the function `net::async_read` if you need + to ensure that the requested amount of data is read before the asynchronous + operation completes. + + @par Per-Operation Cancellation + + This asynchronous operation supports cancellation for the following + net::cancellation_type values: + + @li @c net::cancellation_type::terminal + @li @c net::cancellation_type::partial + @li @c net::cancellation_type::total + + if they are also supported by the underlying stream's @c async_read_some + operation. + */ + template< + class MutableBufferSequence, + BOOST_ASIO_COMPLETION_TOKEN_FOR( + void(system::error_code, std::size_t)) CompletionToken> + BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, + void(system::error_code, std::size_t)) + async_read( + const MutableBufferSequence& mb, + CompletionToken&& handler); private: - UnderlyingAsyncReadStream& und_stream_; + AsyncReadStream& us_; http_proto::parser& pr_; }; diff --git a/include/boost/http_io/impl/body_read_stream.hpp b/include/boost/http_io/impl/body_read_stream.hpp index 900f9ae..6d80971 100644 --- a/include/boost/http_io/impl/body_read_stream.hpp +++ b/include/boost/http_io/impl/body_read_stream.hpp @@ -10,41 +10,38 @@ #ifndef BOOST_HTTP_IO_IMPL_BODY_READ_STREAM_HPP #define BOOST_HTTP_IO_IMPL_BODY_READ_STREAM_HPP -#include -#include -#include -#include +#include #include #include -#include #include -#include -#include -#include +#include +#include +#include +#include +#include -#include namespace boost { namespace http_io { namespace detail { -template +template class body_read_stream_op : public asio::coroutine { - UnderlyingAsyncReadStream& underlying_stream_; + AsyncReadStream& us_; const MutableBufferSequence& mb_; http_proto::parser& pr_; - bool already_have_header_ = false; bool some_ = false; public: + body_read_stream_op( - UnderlyingAsyncReadStream& s, + AsyncReadStream& s, const MutableBufferSequence& mb, http_proto::parser& pr, bool some) noexcept - : underlying_stream_(s) + : us_(s) , mb_(mb) , pr_(pr) , some_(some) @@ -58,82 +55,49 @@ class body_read_stream_op : public asio::coroutine { system::error_code ec = {}, std::size_t bytes_transferred = 0) { + boost::ignore_unused(bytes_transferred); + BOOST_ASIO_CORO_REENTER(*this) { - if (!pr_.is_complete()) - { - for (;;) + if (!pr_.got_header()) { + BOOST_ASIO_CORO_YIELD { - BOOST_ASIO_CORO_YIELD - { - BOOST_ASIO_HANDLER_LOCATION(( - __FILE__, __LINE__, - "async_read_some")); - underlying_stream_.async_read_some( - pr_.prepare(), - std::move(self)); - } - pr_.commit(bytes_transferred); - if (ec == asio::error::eof) - { - BOOST_ASSERT( - bytes_transferred == 0); - pr_.commit_eof(); - ec = {}; - } - else if (ec.failed()) - { - break; // genuine error - } - pr_.parse(ec); - if (ec.failed() && ec != http_proto::condition::need_more_input) - { - break; // genuine error. - } - if (already_have_header_) { - if (!ec.failed()) - { - BOOST_ASSERT( - pr_.is_complete()); - break; - } - if (some_) - { - ec = {}; - break; - } - } - if (!already_have_header_ && pr_.got_header()) - { - already_have_header_ = true; - ec = {}; // override possible need_more_input - pr_.parse(ec); // having parsed the header, call parse again for the start of the body. - if (ec.failed() && ec != http_proto::condition::need_more_input) - { - break; // genuine error. - } - if (!ec.failed()) - { - BOOST_ASSERT( - pr_.is_complete()); - break; - } - } + BOOST_ASIO_HANDLER_LOCATION(( + __FILE__, __LINE__, + "async_read_header")); + http_io::async_read_header< + AsyncReadStream, + Self > ( + us_, + pr_, + std::move(self)); } + if (ec.failed()) goto upcall; } - else { - BOOST_ASIO_CORO_YIELD - { + + BOOST_ASIO_CORO_YIELD + { + if (some_) { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "async_read_some")); - // The initiation function must return before the completion handler is called. - asio::dispatch( - asio::get_associated_executor(underlying_stream_), + http_io::async_read_some( + us_, + pr_, + std::move(self)); + } + else { + BOOST_ASIO_HANDLER_LOCATION(( + __FILE__, __LINE__, + "async_read")); + http_io::async_read( + us_, + pr_, std::move(self)); } } + upcall: std::size_t n = 0; if (!ec.failed()) @@ -156,30 +120,30 @@ class body_read_stream_op : public asio::coroutine { //------------------------------------------------ - // TODO: copy in Beast's stream traits to check if UnderlyingAsyncReadStream + // TODO: copy in Beast's stream traits to check if AsyncReadStream // is an AsyncReadStream, and also static_assert that body_read_stream is too. -template -body_read_stream::body_read_stream( - UnderlyingAsyncReadStream& und_stream +template +body_read_stream::body_read_stream( + AsyncReadStream& und_stream , http_proto::parser& pr) : - und_stream_(und_stream) + us_(und_stream) , pr_(pr) { } -template +template template< class MutableBufferSequence, BOOST_ASIO_COMPLETION_TOKEN_FOR( void(system::error_code, std::size_t)) CompletionToken> BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void(system::error_code, std::size_t)) -body_read_stream::async_read_some( +body_read_stream::async_read_some( const MutableBufferSequence& mb , CompletionToken&& token) { @@ -187,9 +151,30 @@ body_read_stream::async_read_some( CompletionToken, void(system::error_code, std::size_t)>( detail::body_read_stream_op< - MutableBufferSequence, UnderlyingAsyncReadStream>{und_stream_, mb, pr_, true}, + MutableBufferSequence, AsyncReadStream>{us_, mb, pr_, true}, + token, + asio::get_associated_executor(us_) + ); +} + +template +template< + class MutableBufferSequence, + BOOST_ASIO_COMPLETION_TOKEN_FOR( + void(system::error_code, std::size_t)) CompletionToken> +BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, + void(system::error_code, std::size_t)) + body_read_stream::async_read( + const MutableBufferSequence& mb + , CompletionToken&& token) +{ + return asio::async_compose< + CompletionToken, + void(system::error_code, std::size_t)>( + detail::body_read_stream_op< + MutableBufferSequence, AsyncReadStream>{us_, mb, pr_, false}, token, - asio::get_associated_executor(und_stream_) + asio::get_associated_executor(us_) ); } From 8cd684fdc1675deb7520266d9e494367fa954335 Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Wed, 22 Oct 2025 10:39:49 +0100 Subject: [PATCH 5/6] renamed files --- include/boost/{http_io => beast2}/body_read_stream.hpp | 0 include/boost/{http_io => beast2}/impl/body_read_stream.hpp | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename include/boost/{http_io => beast2}/body_read_stream.hpp (100%) rename include/boost/{http_io => beast2}/impl/body_read_stream.hpp (100%) diff --git a/include/boost/http_io/body_read_stream.hpp b/include/boost/beast2/body_read_stream.hpp similarity index 100% rename from include/boost/http_io/body_read_stream.hpp rename to include/boost/beast2/body_read_stream.hpp diff --git a/include/boost/http_io/impl/body_read_stream.hpp b/include/boost/beast2/impl/body_read_stream.hpp similarity index 100% rename from include/boost/http_io/impl/body_read_stream.hpp rename to include/boost/beast2/impl/body_read_stream.hpp From 411ff7f072e9cd1746af82aaafafdb6bdf14cfa3 Mon Sep 17 00:00:00 2001 From: Mungo Gill Date: Wed, 22 Oct 2025 10:40:16 +0100 Subject: [PATCH 6/6] http_io->beast2 --- include/boost/beast2/body_read_stream.hpp | 8 +- .../boost/beast2/impl/body_read_stream.hpp | 16 +-- test/unit/body_read_stream.cpp | 118 +++++++++++------- 3 files changed, 84 insertions(+), 58 deletions(-) diff --git a/include/boost/beast2/body_read_stream.hpp b/include/boost/beast2/body_read_stream.hpp index d810ee1..808acd0 100644 --- a/include/boost/beast2/body_read_stream.hpp +++ b/include/boost/beast2/body_read_stream.hpp @@ -10,13 +10,13 @@ #ifndef BOOST_HTTP_IO_BODY_READ_STREAM_HPP #define BOOST_HTTP_IO_BODY_READ_STREAM_HPP -#include +#include #include #include #include namespace boost { -namespace http_io { +namespace beast2 { /** A body reader for HTTP/1 messages. @@ -231,9 +231,9 @@ class body_read_stream { http_proto::parser& pr_; }; -} // http_io +} // beast2 } // boost -#include +#include #endif diff --git a/include/boost/beast2/impl/body_read_stream.hpp b/include/boost/beast2/impl/body_read_stream.hpp index 6d80971..e342e3a 100644 --- a/include/boost/beast2/impl/body_read_stream.hpp +++ b/include/boost/beast2/impl/body_read_stream.hpp @@ -4,7 +4,7 @@ // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // -// Official repository: https://github.com/vinniefalco/http_io +// Official repository: https://github.com/vinniefalco/beast2 // #ifndef BOOST_HTTP_IO_IMPL_BODY_READ_STREAM_HPP @@ -15,14 +15,14 @@ #include #include #include -#include -#include +#include +#include #include #include namespace boost { -namespace http_io { +namespace beast2 { namespace detail { @@ -65,7 +65,7 @@ class body_read_stream_op : public asio::coroutine { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "async_read_header")); - http_io::async_read_header< + beast2::async_read_header< AsyncReadStream, Self > ( us_, @@ -81,7 +81,7 @@ class body_read_stream_op : public asio::coroutine { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "async_read_some")); - http_io::async_read_some( + beast2::async_read_some( us_, pr_, std::move(self)); @@ -90,7 +90,7 @@ class body_read_stream_op : public asio::coroutine { BOOST_ASIO_HANDLER_LOCATION(( __FILE__, __LINE__, "async_read")); - http_io::async_read( + beast2::async_read( us_, pr_, std::move(self)); @@ -178,7 +178,7 @@ BOOST_ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, ); } -} // http_io +} // beast2 } // boost #endif diff --git a/test/unit/body_read_stream.cpp b/test/unit/body_read_stream.cpp index dab09fa..d398617 100644 --- a/test/unit/body_read_stream.cpp +++ b/test/unit/body_read_stream.cpp @@ -7,7 +7,7 @@ // Official repository: https://github.com/vinniefalco/http_io // -#include +#include #include #include @@ -20,6 +20,9 @@ #include #include +#include +#include + #include "test_suite.hpp" #include @@ -29,9 +32,11 @@ namespace boost { template struct MockReadStream { - MockReadStream(Executor& ex, const std::string &data) : ex_(ex), mock_data_(data), sent_(0) + MockReadStream(Executor& ex, const std::string &data, std::size_t chunk) : ex_(ex), mock_data_(data), chunk_(chunk), sent_(0) { } + + typedef Executor executor_type; Executor get_executor() const { return ex_; } @@ -65,11 +70,11 @@ struct MockReadStream { mock_data_.size() - sent_ }; auto source_buf = asio::buffer(source_str); - std::size_t chunk_size = std::max( - (std::size_t)(rand() % mock_data_.size()), - (std::size_t)1); + //std::size_t chunk_size = std::max( + // (std::size_t)(rand() % mock_data_.size()), + // (std::size_t)1); - std::size_t n = asio::buffer_copy(buf, source_buf, chunk_size); + std::size_t n = asio::buffer_copy(buf, source_buf, chunk_); system::error_code ec = (n != 0) ? system::error_code{} : asio::stream_errc::eof; @@ -83,9 +88,10 @@ struct MockReadStream { Executor& ex_; std::string mock_data_; std::size_t sent_; + std::size_t chunk_; }; -namespace http_io { +namespace beast2 { struct body_read_stream_test { @@ -102,49 +108,69 @@ struct body_read_stream_test "\r\n" "Hello World\r\n"; - asio::io_context ioc; - auto strand = asio::make_strand(ioc); - MockReadStream ms(strand, data); - - std::array arr; - auto buf = asio::buffer(arr); - - rts::context rts_ctx; - http_proto::response_parser::config cfg; - cfg.body_limit = 1024 * 1024; - cfg.min_buffer = 1024 * 1024; - http_proto::install_parser_service(rts_ctx, cfg); - http_proto::response_parser pr(rts_ctx); - pr.reset(); - pr.start(); - body_read_stream brs(ms, pr); - - brs.async_read_some(buf, - [this, &brs, &arr, &buf](system::error_code ec, std::size_t bytes_transferred) - { - if (ec.failed()) std::cerr << ec.message() << std::endl; + std::string data2 = "HTTP/1.0 200 OK\r\n" + "Content-Type: text/html\r\n" + "Last-Modified: Thu, 09 Oct 2025 16:42:02 GMT\r\n" + "Cache-Control: max-age=86000\r\n" + "Date: Thu, 16 Oct 2025 15:09:10 GMT\r\n" + //"Content-Length: 60\r\n" + "Connection: keep-alive\r\n" + "\r\n" + ; + + std::cout << data2.size() << std::endl; + + for (std::size_t chunk = 1; chunk < 400; chunk++) + { + asio::io_context ioc; + auto strand = asio::make_strand(ioc); + MockReadStream ms(strand, data, chunk); + + std::array arr; + auto buf = asio::buffer(arr); + + rts::context rts_ctx; + http_proto::response_parser::config cfg; + cfg.body_limit = 1024 * 1024; + cfg.min_buffer = 1024 * 1024; + http_proto::install_parser_service(rts_ctx, cfg); + + http_proto::response_parser pr(rts_ctx); + pr.reset(); + pr.start(); + + body_read_stream brs(ms, pr); - BOOST_TEST_EQ(ec.failed(), false); - BOOST_TEST_EQ(bytes_transferred, 41); + brs.async_read_some(buf, + [this, &chunk, &brs, &arr, &buf](system::error_code ec, std::size_t bytes_transferred) { - std::string value(arr.data(), bytes_transferred); - BOOST_TEST_EQ(value, std::string("Hello W")); - } + if (ec.failed()) std::cerr << ec.message() << std::endl; - brs.async_read_some(buf, - [this, &brs, &arr, &buf](system::error_code ec, std::size_t bytes_transferred) + BOOST_TEST_EQ(ec.failed(), false); + BOOST_TEST_GE(bytes_transferred, 1); { - if (ec.failed()) std::cerr << ec.message() << std::endl; - - BOOST_TEST_EQ(ec.failed(), false); - BOOST_TEST_EQ(bytes_transferred, 19); - { - std::string value(arr.data(), bytes_transferred); - BOOST_TEST_EQ(value, std::string("orld")); - } - }); - }); - ioc.run(); + std::string value(arr.data(), bytes_transferred); + std::cout << chunk << ": " << value << std::endl; + //BOOST_TEST_EQ(value, std::string("Hello W")); + } + + if (!ec.failed()) { + brs.async_read_some(buf, + [this, &brs, &arr, &buf](system::error_code ec, std::size_t bytes_transferred) + { + if (ec.failed()) std::cerr << ec.message() << std::endl; + + BOOST_TEST_EQ(ec.failed(), false); + BOOST_TEST_GE(bytes_transferred, 1); + { + std::string value(arr.data(), bytes_transferred); + //BOOST_TEST_EQ(value, std::string("orld")); + } + }); + } + }); + ioc.run(); + } } };