Skip to content

Commit

Permalink
Merge pull request #23 from uchenily/main
Browse files Browse the repository at this point in the history
[WIP] Update BufReader
  • Loading branch information
8sileus committed May 14, 2024
2 parents 8e8aa4b + fd1fbfe commit e7dd3f4
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 40 deletions.
2 changes: 1 addition & 1 deletion zedio/common/concepts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ concept is_awaiter = requires(IOAwaiter awaiter) {
};

template <typename C>
concept constructible_to_char_splice = requires(C c) { std::span<const char>{c}; };
concept constructible_to_char_slice = requires(C c) { std::span<const char>{c}; };

} // namespace zedio
12 changes: 6 additions & 6 deletions zedio/io/buf/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class StreamBuffer {
}

[[nodiscard]]
auto r_splice() const noexcept -> std::span<const char> {
auto r_slice() const noexcept -> std::span<const char> {
return {r_begin(), r_end()};
}

Expand All @@ -78,7 +78,7 @@ class StreamBuffer {
}

[[nodiscard]]
auto w_splice() noexcept -> std::span<char> {
auto w_slice() noexcept -> std::span<char> {
return {w_begin(), w_end()};
}

Expand Down Expand Up @@ -112,8 +112,8 @@ class StreamBuffer {
}

[[nodiscard]]
auto find_flag_and_return_splice(std::string_view end_str) noexcept -> std::span<const char> {
auto pos = std::string_view{r_splice()}.find(end_str);
auto find_flag_and_return_slice(std::string_view end_str) noexcept -> std::span<const char> {
auto pos = std::string_view{r_slice()}.find(end_str);
if (pos == std::string_view::npos) {
return {};
} else {
Expand All @@ -122,8 +122,8 @@ class StreamBuffer {
}

[[nodiscard]]
auto find_flag_and_return_splice(char end_char) noexcept -> std::span<const char> {
auto pos = std::string_view{r_splice()}.find(end_char);
auto find_flag_and_return_slice(char end_char) noexcept -> std::span<const char> {
auto pos = std::string_view{r_slice()}.find(end_char);
if (pos == std::string_view::npos) {
return {};
} else {
Expand Down
4 changes: 2 additions & 2 deletions zedio/io/impl/impl_async_read.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct ImplAsyncRead {
}

template <typename... Ts>
requires(constructible_to_char_splice<Ts> && ...)
requires(constructible_to_char_slice<Ts> && ...)
[[REMEMBER_CO_AWAIT]]
auto read_vectored(Ts &&...bufs) const noexcept {
constexpr std::size_t N = sizeof...(Ts);
Expand Down Expand Up @@ -65,4 +65,4 @@ struct ImplAsyncRead {
}
};

} // namespace zedio::io::detail
} // namespace zedio::io::detail
4 changes: 2 additions & 2 deletions zedio/io/impl/impl_async_write.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ struct ImplAsyncWrite {
}

template <typename... Ts>
requires(constructible_to_char_splice<Ts> && ...)
requires(constructible_to_char_slice<Ts> && ...)
[[REMEMBER_CO_AWAIT]]
auto write_vectored(Ts &&...bufs) noexcept {
constexpr std::size_t N = sizeof...(Ts);
Expand Down Expand Up @@ -65,4 +65,4 @@ struct ImplAsyncWrite {
}
};

} // namespace zedio::io::detail
} // namespace zedio::io::detail
55 changes: 33 additions & 22 deletions zedio/io/impl/impl_buf_read.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class ImplBufRead {
public:
[[nodiscard]]
auto buffer() noexcept {
return static_cast<B *>(this)->r_stream_.r_splice();
return static_cast<B *>(this)->r_stream_.r_slice();
}

[[nodiscard]]
Expand All @@ -25,12 +25,13 @@ class ImplBufRead {
[[REMEMBER_CO_AWAIT]]
auto fill_buf() -> zedio::async::Task<Result<void>> {
auto ret = co_await io::read(static_cast<B *>(this)->io_.fd(),
static_cast<B *>(this)->r_stream_.w_splice().data(),
static_cast<B *>(this)->r_stream_.w_splice().size_bytes(),
static_cast<B *>(this)->r_stream_.w_slice().data(),
static_cast<B *>(this)->r_stream_.w_slice().size_bytes(),
static_cast<uint64_t>(-1));
if (!ret) [[unlikely]] {
co_return std::unexpected{ret.error()};
}
static_cast<B *>(this)->r_stream_.w_increase(ret.value());
co_return Result<void>{};
}

Expand All @@ -56,30 +57,29 @@ class ImplBufRead {
-> zedio::async::Task<Result<std::size_t>> {
Result<std::size_t> ret;
while (true) {
if (auto splice
= static_cast<B *>(this)->r_stream_.find_flag_and_return_splice(end_flag);
!splice.empty()) {
if (auto slice = static_cast<B *>(this)->r_stream_.find_flag_and_return_slice(end_flag);
!slice.empty()) {
if constexpr (std::is_same_v<std::string, C>) {
buf.append(splice.begin(), splice.end());
buf.append(slice.begin(), slice.end());
} else {
buf.insert(buf.end(), splice.begin(), splice.end());
buf.insert(buf.end(), slice.begin(), slice.end());
}
static_cast<B *>(this)->r_stream_.r_increase(splice.size_bytes());
static_cast<B *>(this)->r_stream_.r_increase(slice.size_bytes());
break;
} else {
if constexpr (std::is_same_v<std::string, C>) {
buf.append(static_cast<B *>(this)->r_stream_.r_splice().begin(),
static_cast<B *>(this)->r_stream_.r_splice().end());
buf.append(static_cast<B *>(this)->r_stream_.r_slice().begin(),
static_cast<B *>(this)->r_stream_.r_slice().end());
} else {
buf.insert(buf.end(),
static_cast<B *>(this)->r_stream_.r_splice().begin(),
static_cast<B *>(this)->r_stream_.r_splice().end());
static_cast<B *>(this)->r_stream_.r_slice().begin(),
static_cast<B *>(this)->r_stream_.r_slice().end());
}
static_cast<B *>(this)->r_stream_.reset_pos();
}

ret = co_await static_cast<B *>(this)->io_.read(
static_cast<B *>(this)->r_stream_.w_splice());
static_cast<B *>(this)->r_stream_.w_slice());
if (!ret) [[unlikely]] {
co_return std::unexpected{ret.error()};
}
Expand All @@ -104,23 +104,34 @@ class ImplBufRead {
if (static_cast<B *>(this)->r_stream_.capacity() < buf.size_bytes()) {
auto len = static_cast<B *>(this)->r_stream_.write_to(buf);
buf = buf.subspan(len, buf.size_bytes() - len);
co_return co_await static_cast<B *>(this)->io_.read(buf);
if constexpr (eof_is_error) {
auto ret = co_await static_cast<B *>(this)->io_.read_exact(buf);
if (ret) {
co_return len + buf.size_bytes();
}
} else {
auto ret = co_await static_cast<B *>(this)->io_.read(buf);
if (ret) {
ret.value() += len;
}
co_return ret;
}
}

if (pred()) {
co_return static_cast<B *>(this)->r_stream_.write_to(buf);
}

if (static_cast<B *>(this)->r_stream_.w_remaining() < buf.size_bytes()) {
static_cast<B *>(this)->r_stream_.reset_data();
}

assert(static_cast<B *>(this)->r_stream_.w_remaining() >= buf.size_bytes());

Result<std::size_t> ret;
while (true) {
if (static_cast<B *>(this)->r_stream_.r_remaining()
+ static_cast<B *>(this)->r_stream_.w_remaining()
< buf.size_bytes()) {
static_cast<B *>(this)->r_stream_.reset_data();
}

ret = co_await static_cast<B *>(this)->io_.read(
static_cast<B *>(this)->r_stream_.w_splice());
static_cast<B *>(this)->r_stream_.w_slice());
if (!ret) [[unlikely]] {
co_return std::unexpected{ret.error()};
}
Expand Down
12 changes: 6 additions & 6 deletions zedio/io/impl/impl_buf_write.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class ImplBufWrite {
public:
[[nodiscard]]
auto buffer() noexcept {
return static_cast<B *>(this)->w_stream_.r_splice();
return static_cast<B *>(this)->w_stream_.r_slice();
}

[[nodiscard]]
Expand All @@ -19,18 +19,18 @@ class ImplBufWrite {

[[REMEMBER_CO_AWAIT]]
auto flush() -> zedio::async::Task<Result<void>> {
auto splice = buffer();
auto slice = buffer();
Result<std::size_t> ret;
while (!splice.empty()) {
ret = co_await static_cast<B *>(this)->io_.write(splice);
while (!slice.empty()) {
ret = co_await static_cast<B *>(this)->io_.write(slice);
if (!ret) [[unlikely]] {
co_return std::unexpected{ret.error()};
}
if (ret.value() == 0) {
co_return std::unexpected{make_zedio_error(Error::WriteZero)};
}
static_cast<B *>(this)->w_stream_.r_increase(ret.value());
splice = splice.subspan(ret.value(), splice.size_bytes() - ret.value());
slice = slice.subspan(ret.value(), slice.size_bytes() - ret.value());
}
static_cast<B *>(this)->w_stream_.reset_pos();
co_return Result<void>{};
Expand All @@ -56,7 +56,7 @@ class ImplBufWrite {
Result<std::size_t> ret;
do {
ret = co_await static_cast<B *>(this)->io_.write_vectored(
static_cast<B *>(this)->w_stream_.r_splice(),
static_cast<B *>(this)->w_stream_.r_slice(),
buf);
if (!ret) {
co_return std::unexpected{ret.error()};
Expand Down
2 changes: 1 addition & 1 deletion zedio/socket/impl/impl_stream_write.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ struct ImplStreamWrite {
}

template <typename... Ts>
requires(constructible_to_char_splice<Ts> && ...)
requires(constructible_to_char_slice<Ts> && ...)
[[REMEMBER_CO_AWAIT]]
auto write_vectored(Ts &&...bufs) noexcept {
constexpr auto N = sizeof...(Ts);
Expand Down

0 comments on commit e7dd3f4

Please sign in to comment.