diff --git a/include/boost/http_proto/serializer.hpp b/include/boost/http_proto/serializer.hpp index b968430d..a9ef83a1 100644 --- a/include/boost/http_proto/serializer.hpp +++ b/include/boost/http_proto/serializer.hpp @@ -434,19 +434,21 @@ class serializer::stream return *this; } - /** Returns `true` if the stream is open + /** Return true if the stream is open */ BOOST_HTTP_PROTO_DECL bool is_open() const noexcept; - /** Returns the available capacity + /** Return the available capacity + + @throw std::logic_error if `!is_open()`. */ BOOST_HTTP_PROTO_DECL std::size_t - capacity() const noexcept; + capacity() const; - /** Prepares a buffer for writing + /** Prepare a buffer for writing Use @ref commit to make the written data available to the serializer. @@ -454,12 +456,14 @@ class serializer::stream @return An object of type @ref mutable_buffers_type that satisfies MutableBufferSequence requirements, the underlying memory is owned by the serializer. + + @throw std::logic_error if `!is_open()`. */ BOOST_HTTP_PROTO_DECL mutable_buffers_type - prepare() noexcept; + prepare(); - /** Commits data to the serializer + /** Commit data to the serializer @param n Number of bytes to commit. @@ -470,12 +474,19 @@ class serializer::stream void commit(std::size_t n); - /** Closes the stream + /** Close the stream if open */ BOOST_HTTP_PROTO_DECL void close(); + /** Destructor + + Closes the stream if open. + */ + BOOST_HTTP_PROTO_DECL + ~stream(); + private: friend class serializer; @@ -494,7 +505,7 @@ class serializer::stream class serializer::const_buf_gen_base { public: - // Returns the next non-empty buffer, + // Return the next non-empty buffer, // or an empty buffer if none remain. virtual buffers::const_buffer @@ -510,7 +521,7 @@ class serializer::const_buf_gen_base std::size_t count() const = 0; - // Returns true when there is no buffer or + // Return true when there is no buffer or // the remaining buffers are empty virtual bool @@ -526,6 +537,7 @@ class serializer::const_buf_gen ConstBufferSequence cbs_; it_t current_; + public: using const_buffer = buffers::const_buffer; @@ -556,8 +568,10 @@ class serializer::const_buf_gen current_, buffers::end(cbs_), std::size_t{}, - [](std::size_t sum, const_buffer cb) { - return sum + cb.size(); }); + [](std::size_t sum, const_buffer cb) + { + return sum + cb.size(); + }); } std::size_t @@ -566,8 +580,10 @@ class serializer::const_buf_gen return std::count_if( current_, buffers::end(cbs_), - [](const_buffer cb) { - return cb.size() != 0; }); + [](const_buffer cb) + { + return cb.size() != 0; + }); } bool @@ -576,8 +592,10 @@ class serializer::const_buf_gen return std::all_of( current_, buffers::end(cbs_), - [](const_buffer cb) { - return cb.size() == 0; }); + [](const_buffer cb) + { + return cb.size() == 0; + }); } }; diff --git a/src/detail/filter.cpp b/src/detail/filter.cpp index 9550f7d7..26ede36c 100644 --- a/src/detail/filter.cpp +++ b/src/detail/filter.cpp @@ -23,26 +23,25 @@ filter:: process( buffers::mutable_buffer_subspan out, buffers::const_buffer_pair in, - bool more, - bool partial_flush) -> results + bool more) -> results { results rv; - auto flush = filter::flush::none; + bool p_more = true; for(;;) { - if(!more && flush != filter::flush::finish && in[1].size() == 0) + if(!more && p_more && in[1].size() == 0) { if(buffers::size(out) < min_out_buffer()) { rv.out_short = true; return rv; } - flush = filter::flush::finish; + p_more = false; } auto ob = buffers::front(out); auto ib = buffers::front(in); - auto rs = do_process(ob, ib, flush); + auto rs = do_process(ob, ib, p_more); rv.in_bytes += rs.in_bytes; rv.out_bytes += rs.out_bytes; @@ -66,14 +65,7 @@ process( return rv; if(buffers::size(in) == 0 && rs.out_bytes < ob.size()) - { - if(partial_flush && rv.out_bytes == 0) - { - flush = filter::flush::partial; - continue; - } return rv; - } } } diff --git a/src/detail/filter.hpp b/src/detail/filter.hpp index 30bed1c1..20262647 100644 --- a/src/detail/filter.hpp +++ b/src/detail/filter.hpp @@ -64,17 +64,9 @@ class filter process( buffers::mutable_buffer_subspan out, buffers::const_buffer_pair in, - bool more, - bool partial_flush = false); + bool more); protected: - enum class flush - { - none, - partial, - finish - }; - virtual std::size_t min_out_buffer() const noexcept @@ -87,7 +79,7 @@ class filter do_process( buffers::mutable_buffer, buffers::const_buffer, - flush) noexcept = 0; + bool) noexcept = 0; }; } // detail diff --git a/src/detail/zlib_filter_base.hpp b/src/detail/zlib_filter_base.hpp index 37156212..71564872 100644 --- a/src/detail/zlib_filter_base.hpp +++ b/src/detail/zlib_filter_base.hpp @@ -37,24 +37,6 @@ class zlib_filter_base : public filter protected: rts::zlib::stream strm_; - static - rts::zlib::flush - translate(filter::flush flush) noexcept - { - switch(flush) - { - case filter::flush::none: - return rts::zlib::flush::no_flush; - - case filter::flush::partial: - return rts::zlib::flush::block; - - case filter::flush::finish: - default: - return rts::zlib::flush::finish; - } - } - static unsigned int saturate_cast(std::size_t n) noexcept diff --git a/src/parser.cpp b/src/parser.cpp index 7b7f6460..8b9ba8dd 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -322,7 +322,7 @@ class zlib_filter do_process( buffers::mutable_buffer out, buffers::const_buffer in, - filter::flush flush) noexcept override + bool more) noexcept override { strm_.next_out = static_cast(out.data()); strm_.avail_out = saturate_cast(out.size()); @@ -330,7 +330,9 @@ class zlib_filter strm_.avail_in = saturate_cast(in.size()); auto rs = static_cast( - svc_.inflate(strm_, translate(flush))); + svc_.inflate( + strm_, + more ? rts::zlib::no_flush : rts::zlib::finish)); results rv; rv.out_bytes = saturate_cast(out.size()) - strm_.avail_out; @@ -374,7 +376,7 @@ class brotli_filter do_process( buffers::mutable_buffer out, buffers::const_buffer in, - filter::flush flush) noexcept override + bool more) noexcept override { auto* next_in = reinterpret_cast(in.data()); auto available_in = in.size(); @@ -394,7 +396,7 @@ class brotli_filter rv.out_bytes = out.size() - available_out; rv.finished = svc_.is_finished(state_); - if(rs == rts::brotli::decoder_result::needs_more_input && flush == filter::flush::finish) + if(!more && rs == rts::brotli::decoder_result::needs_more_input) rv.ec = BOOST_HTTP_PROTO_ERR(error::bad_payload); if(rs == rts::brotli::decoder_result::error) diff --git a/src/serializer.cpp b/src/serializer.cpp index 3c972ceb..9e13e769 100644 --- a/src/serializer.cpp +++ b/src/serializer.cpp @@ -236,7 +236,7 @@ class zlib_filter do_process( buffers::mutable_buffer out, buffers::const_buffer in, - filter::flush flush) noexcept override + bool more) noexcept override { strm_.next_out = static_cast(out.data()); strm_.avail_out = saturate_cast(out.size()); @@ -244,7 +244,9 @@ class zlib_filter strm_.avail_in = saturate_cast(in.size()); auto rs = static_cast( - svc_.deflate(strm_, translate(flush))); + svc_.deflate( + strm_, + more ? rts::zlib::no_flush : rts::zlib::finish)); results rv; rv.out_bytes = saturate_cast(out.size()) - strm_.avail_out; @@ -263,7 +265,7 @@ class brotli_filter { rts::brotli::encode_service& svc_; rts::brotli::encoder_state* state_; - bool flushing_ = false; + public: brotli_filter( rts::context& ctx, @@ -292,29 +294,19 @@ class brotli_filter do_process( buffers::mutable_buffer out, buffers::const_buffer in, - filter::flush flush) noexcept override + bool more) noexcept override { - if(flushing_) - { - // restore partial flush type - flush = filter::flush::partial; - in = {}; - } - else if(flush == filter::flush::partial) - { - // the Brotli api requires continued flushing - // until all output is fully drained. - flushing_ = true; - } - auto* next_in = reinterpret_cast(in.data()); auto available_in = in.size(); auto* next_out = reinterpret_cast(out.data()); auto available_out = out.size(); + using encoder_operation = + rts::brotli::encoder_operation; + bool rs = svc_.compress_stream( state_, - translate(flush), + more ? encoder_operation::process : encoder_operation::finish, &available_in, &next_in, &available_out, @@ -326,34 +318,12 @@ class brotli_filter rv.out_bytes = out.size() - available_out; rv.finished = svc_.is_finished(state_); - if(flushing_ && !svc_.has_more_output(state_)) - flushing_ = false; - // TODO: use proper error code if(rs == false) rv.ec = error::bad_payload; return rv; } - -private: - static - rts::brotli::encoder_operation - translate(filter::flush flush) noexcept - { - switch(flush) - { - case filter::flush::none: - return rts::brotli::encoder_operation::process; - - case filter::flush::partial: - return rts::brotli::encoder_operation::flush; - - case filter::flush::finish: - default: - return rts::brotli::encoder_operation::finish; - } - } }; } // namespace @@ -525,7 +495,7 @@ prepare() -> } case style::stream: - if(is_header_done_ && cb0_.size() == 0) + if(cb0_.size() == 0 && is_header_done_ && more_input_) BOOST_HTTP_PROTO_RETURN_EC( error::need_data); break; @@ -619,40 +589,31 @@ prepare() -> case style::stream: { - appender apndr(cb0_, is_chunked_); - - if(apndr.is_full() || filter_done_) - break; - - // The stream object is expected to - // have already populated cb1_ - if(more_input_ && cb1_.size() == 0) { - if(!prepped_.empty()) + appender apndr(cb0_, is_chunked_); + if(apndr.is_full() || filter_done_) break; - BOOST_HTTP_PROTO_RETURN_EC( - error::need_data); - } - - const auto rs = filter_->process( - buffers::mutable_buffer_span(apndr.prepare()), - cb1_.data(), - more_input_, - prepped_.empty()); // force_flush - - if(rs.ec.failed()) - { - is_done_ = true; - return rs.ec; - } + const auto rs = filter_->process( + buffers::mutable_buffer_span(apndr.prepare()), + cb1_.data(), + more_input_); - cb1_.consume(rs.in_bytes); - apndr.commit(rs.out_bytes, !rs.finished); + if(rs.ec.failed()) + { + is_done_ = true; + return rs.ec; + } - if(rs.finished) - filter_done_ = true; + cb1_.consume(rs.in_bytes); + apndr.commit(rs.out_bytes, !rs.finished); + if(rs.finished) + filter_done_ = true; + } + if(cb0_.size() == 0 && is_header_done_ && more_input_) + BOOST_HTTP_PROTO_RETURN_EC( + error::need_data); break; } } @@ -665,9 +626,6 @@ prepare() -> if(cbp[1].size() != 0) prepped_.append(cbp[1]); - BOOST_ASSERT( - buffers::size(prepped_) > 0); - return const_buffers_type( prepped_.begin(), prepped_.size()); @@ -679,7 +637,7 @@ consume( std::size_t n) { // Precondition violation - if(is_done_ && n != 0) + if(is_done_) detail::throw_logic_error(); if(!is_header_done_) @@ -1051,19 +1009,17 @@ serializer:: stream:: is_open() const noexcept { - if(sr_ == nullptr) - return false; - - return sr_->more_input_; + return sr_ != nullptr; } std::size_t serializer:: stream:: -capacity() const noexcept +capacity() const { + // Precondition violation if(!is_open()) - return 0; + detail::throw_logic_error(); if(sr_->filter_) return sr_->cb1_.capacity(); @@ -1082,11 +1038,12 @@ capacity() const noexcept auto serializer:: stream:: -prepare() noexcept -> +prepare() -> mutable_buffers_type { + // Precondition violation if(!is_open()) - return {}; + detail::throw_logic_error(); if(sr_->filter_) return sr_->cb1_.prepare( @@ -1109,12 +1066,9 @@ commit(std::size_t n) if(!is_open()) detail::throw_logic_error(); + // Precondition violation if(n > capacity()) - { - // n can't be greater than size of - // the buffers returned by prepare() detail::throw_invalid_argument(); - } if(sr_->filter_) return sr_->cb1_.commit(n); @@ -1149,19 +1103,24 @@ close() if(!is_open()) return; // no-op; - sr_->more_input_ = false; - - if(sr_->filter_) - return; + // chunked with no filter + if(!sr_->filter_ && sr_->is_chunked_) + { + write_final_chunk( + sr_->cb0_.prepare( + final_chunk_len)); + sr_->cb0_.commit(final_chunk_len); + } - if(!sr_->is_chunked_) - return; + sr_->more_input_ = false; + sr_ = nullptr; +} - // chunked with no filter - write_final_chunk( - sr_->cb0_.prepare( - final_chunk_len)); - sr_->cb0_.commit(final_chunk_len); +serializer:: +stream:: +~stream() +{ + close(); } } // http_proto diff --git a/test/unit/compression.cpp b/test/unit/compression.cpp index edc87deb..f3dbb89a 100644 --- a/test/unit/compression.cpp +++ b/test/unit/compression.cpp @@ -259,24 +259,20 @@ struct zlib_test auto n = buffers::copy(b, body_); body_ = buffers::sans_prefix(body_, n); rs.bytes = n; - rs.ec = {}; rs.finished = (body_.size() == 0); return rs; } }; - sr.start(res, body); - do { - auto cbs = sr.prepare().value(); - BOOST_TEST_GT(buffers::size(cbs), 0); - auto n = buffers::copy( - out.prepare(buffers::size(cbs)), cbs); - BOOST_TEST_EQ(n, buffers::size(cbs)); - out.commit(n); + auto cbs = sr.prepare(); + auto n = buffers::size(cbs.value()); + BOOST_TEST_GT(n, 0); + buffers::copy(out.prepare(n), cbs.value()); sr.consume(n); + out.commit(n); } while(!sr.is_done()); } @@ -289,33 +285,32 @@ struct zlib_test buffers::string_buffer out) { auto stream = sr.start_stream(res); - - int stream_closed = 0; do { - if(stream_closed == 0) + if(stream.is_open()) { auto mbs = stream.prepare(); - auto n1 = buffers::copy(mbs, body); - body = buffers::sans_prefix(body, n1); - stream.commit(n1); + auto n = buffers::copy(mbs, body); + body = buffers::sans_prefix(body, n); + stream.commit(n); if(body.size() == 0) - stream_closed = 1; + stream.close(); } - auto cbs = sr.prepare().value(); - BOOST_TEST_GT(buffers::size(cbs), 0); - auto n2 = buffers::copy(out.prepare(buffers::size(cbs)), cbs); - BOOST_TEST_EQ(n2, buffers::size(cbs)); - sr.consume(n2); - out.commit(n2); - - if(stream_closed == 1) + auto cbs = sr.prepare(); + if(cbs.has_error()) { - stream_closed = 2; - stream.close(); + BOOST_ASSERT( + cbs.error() == error::need_data); + } + else + { + auto n = buffers::size(cbs.value()); + BOOST_TEST_GT(n, 0); + buffers::copy(out.prepare(n), cbs.value()); + sr.consume(n); + out.commit(n); } - } while(!sr.is_done()); } @@ -328,7 +323,6 @@ struct zlib_test buffers::string_buffer out) { std::vector buf_seq; - do { auto buf_size = std::min(body.size() / 23, body.size()); @@ -339,16 +333,14 @@ struct zlib_test } while(body.size() != 0); sr.start(res, buf_seq); - do { - auto cbs = sr.prepare().value(); - BOOST_TEST_GT(buffers::size(cbs), 0); - auto n = buffers::copy( - out.prepare(buffers::size(cbs)), cbs); - BOOST_TEST_EQ(n, buffers::size(cbs)); - out.commit(n); + auto cbs = sr.prepare(); + auto n = buffers::size(cbs.value()); + BOOST_TEST_GT(n, 0); + buffers::copy(out.prepare(n), cbs.value()); sr.consume(n); + out.commit(n); }while(!sr.is_done()); } diff --git a/test/unit/serializer.cpp b/test/unit/serializer.cpp index ecfb2435..d193823f 100644 --- a/test/unit/serializer.cpp +++ b/test/unit/serializer.cpp @@ -760,6 +760,58 @@ struct serializer_test BOOST_TEST(!stream.is_open()); } + // Empty commits + { + core::string_view sv = + "HTTP/1.1 200 OK\r\n" + "\r\n"; + response res(sv); + rts::context ctx; + install_serializer_service(ctx, {}); + serializer sr(ctx); + auto stream = sr.start_stream(res); + + // consume whole header + { + auto cbs = sr.prepare(); + BOOST_TEST_EQ( + buffers::size(cbs.value()), + sv.size()); + sr.consume(sv.size()); + } + + // error::need_data + { + auto cbs = sr.prepare(); + BOOST_TEST_EQ( + cbs.error(), + error::need_data); + } + + // commit 0 + { + stream.prepare(); + stream.commit(0); + auto cbs = sr.prepare(); + BOOST_TEST_EQ( + cbs.error(), + error::need_data); + } + + // close empty + { + BOOST_TEST(!sr.is_done()); + stream.close(); + auto cbs = sr.prepare(); + BOOST_TEST_EQ( + buffers::size(cbs.value()), + 0); + BOOST_TEST(!sr.is_done()); + sr.consume(0); + BOOST_TEST(sr.is_done()); + } + } + { core::string_view sv = "HTTP/1.1 200 OK\r\n" @@ -782,7 +834,7 @@ struct serializer_test stream.commit(buffers::size(mbs) + 1), std::invalid_argument); - // commit 0 bytes must be possible + // commiting 0 bytes must be possible stream.commit(0); auto mcbs = sr.prepare(); @@ -799,10 +851,14 @@ struct serializer_test stream.close(); BOOST_TEST(!stream.is_open()); - BOOST_TEST(stream.capacity() == 0); - BOOST_TEST(buffers::size(stream.prepare()) == 0); BOOST_TEST_THROWS( - stream.commit(1), + stream.prepare(), + std::logic_error); + BOOST_TEST_THROWS( + stream.capacity(), + std::logic_error); + BOOST_TEST_THROWS( + stream.commit(0), std::logic_error); stream.close(); // fine no-op