Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 134 additions & 29 deletions src/dplx/dp/legacy/chunked_output_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ class chunked_output_stream_base : public output_buffer
reset(mCurrentChunk.data(), mCurrentChunk.size());
return outcome::success();
}
mDecomissionThreshold = static_cast<std::int8_t>(size());
reset(static_cast<std::byte *>(mSmallBuffer), small_buffer_size);
if (requestedSize > small_buffer_size)
{
return errc::buffer_size_exceeded;
}
mDecomissionThreshold = static_cast<std::int8_t>(size());
reset(static_cast<std::byte *>(mSmallBuffer), small_buffer_size);
return outcome::success();
}

Expand All @@ -96,34 +96,38 @@ class chunked_output_stream_base : public output_buffer
data() - static_cast<std::byte *>(mSmallBuffer));
if (consumedSize < static_cast<std::size_t>(mDecomissionThreshold))
{
if (requestedSize > small_buffer_size)
{
return errc::buffer_size_exceeded;
}
std::memcpy(chunkPart.data(),
static_cast<std::byte *>(mSmallBuffer), consumedSize);

reset(static_cast<std::byte *>(mSmallBuffer), small_buffer_size);
mDecomissionThreshold -= static_cast<std::int8_t>(consumedSize);
if (requestedSize > small_buffer_size)
{
return errc::buffer_size_exceeded;
}
return outcome::success();
}

std::memcpy(chunkPart.data(), static_cast<std::byte *>(mSmallBuffer),
chunkPart.size());

DPLX_TRY(acquire_next_chunk());
if (!chunkPart.empty()) [[likely]]
{
std::memcpy(chunkPart.data(),
static_cast<std::byte *>(mSmallBuffer),
chunkPart.size());
}

auto const overlap = consumedSize
- static_cast<std::size_t>(mDecomissionThreshold);
std::memcpy(
mCurrentChunk.data(),
static_cast<std::byte *>(mSmallBuffer)
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ mDecomissionThreshold,
overlap);
if (auto acquireRx = acquire_next_chunk(); acquireRx.has_error())
[[unlikely]]
{
move_remaining_small_buffer_to_front(overlap);
return std::move(acquireRx).assume_error();
}

mCurrentChunk = mCurrentChunk.subspan(overlap);
mDecomissionThreshold = -1;
if (!try_move_small_buffer_to_next_chunk(overlap) && mRemaining == 0)
{
return errc::end_of_stream;
}
return ensure_size(requestedSize);
}
auto do_bulk_write(std::byte const *src, std::size_t writeAmount) noexcept
Expand All @@ -137,18 +141,25 @@ class chunked_output_stream_base : public output_buffer
static_cast<std::byte *>(mSmallBuffer),
chunkPart.size());

DPLX_TRY(acquire_next_chunk());
// we can use small_buffer_size here as bulk_write is guaranteed to
// fill the buffer completely
// => small_buffer_size == data() - mSmallBuffer
auto const overlap
= small_buffer_size
- static_cast<std::size_t>(mDecomissionThreshold);

auto const overlap = small_buffer_size
- static_cast<unsigned>(mDecomissionThreshold);
std::memcpy(
mCurrentChunk.data(),
static_cast<std::byte *>(mSmallBuffer)
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ chunkPart.size(),
overlap);
mCurrentChunk = mCurrentChunk.subspan(overlap);
mDecomissionThreshold = -1;
if (auto acquireRx = acquire_next_chunk(); acquireRx.has_error())
[[unlikely]]
{
move_remaining_small_buffer_to_front(overlap);
return std::move(acquireRx).assume_error();
}

if (!try_move_small_buffer_to_next_chunk(overlap)) [[unlikely]]
{
DPLX_TRY(acquire_next_chunk());
}
reset();
}
else
{
Expand Down Expand Up @@ -177,6 +188,100 @@ class chunked_output_stream_base : public output_buffer
reset(mCurrentChunk.data(), mCurrentChunk.size());
return outcome::success();
}

auto do_sync_output() noexcept -> result<void> override
{
if (mDecomissionThreshold < 0)
{
// small buffer is not in use => nothing to do
return outcome::success();
}

auto const chunkPart = mCurrentChunk.last(
static_cast<std::size_t>(mDecomissionThreshold));
auto const consumedSize = static_cast<std::size_t>(
data() - static_cast<std::byte *>(mSmallBuffer));
if (consumedSize <= static_cast<std::size_t>(mDecomissionThreshold))
{
// written data still does not exceed current chunk
std::memcpy(chunkPart.data(),
static_cast<std::byte *>(mSmallBuffer), consumedSize);
if (consumedSize == static_cast<std::size_t>(mDecomissionThreshold))
{
// avoid acquiring a new chunk as sync_output is usually called
// as a cleanup operation and a new chunk would go to waste
// and/or an illegal operation in case of a pre-sized stream
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
reset(mCurrentChunk.data() + mCurrentChunk.size(), 0U);
mDecomissionThreshold = -1;
}
else
{
reset(static_cast<std::byte *>(mSmallBuffer),
small_buffer_size);
mDecomissionThreshold -= static_cast<std::int8_t>(consumedSize);
}
return outcome::success();
}

std::memcpy(chunkPart.data(), static_cast<std::byte *>(mSmallBuffer),
chunkPart.size());
auto const overlap = consumedSize
- static_cast<std::size_t>(mDecomissionThreshold);
if (auto acquireRx = acquire_next_chunk(); acquireRx.has_error())
[[unlikely]]
{
move_remaining_small_buffer_to_front(overlap);
return std::move(acquireRx).assume_error();
}

if (!try_move_small_buffer_to_next_chunk(overlap))
{
return errc::end_of_stream;
}
return outcome::success();
}

auto
try_move_small_buffer_to_next_chunk(std::size_t const remaining) noexcept
-> bool
{
auto const copyAmount = std::min(remaining, mCurrentChunk.size());
if (mCurrentChunk.data() != nullptr) [[likely]]
{
std::memcpy(
mCurrentChunk.data(),
static_cast<std::byte *>(mSmallBuffer)
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ mDecomissionThreshold,
remaining);
mCurrentChunk = mCurrentChunk.subspan(copyAmount);
}
if (remaining != copyAmount) [[unlikely]]
{
mDecomissionThreshold += static_cast<std::int8_t>(copyAmount);
move_remaining_small_buffer_to_front(remaining - copyAmount);
return false;
}
reset(mCurrentChunk.data(), mCurrentChunk.size());
mDecomissionThreshold = -1;
return true;
}

void
move_remaining_small_buffer_to_front(std::size_t const remaining) noexcept
{
std::memmove(
static_cast<std::byte *>(mSmallBuffer),
static_cast<std::byte *>(mSmallBuffer)
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
+ mDecomissionThreshold,
remaining);
// NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic)
reset(static_cast<std::byte *>(mSmallBuffer) + remaining,
small_buffer_size - remaining);
mDecomissionThreshold = 0;
}
};

} // namespace dplx::dp::legacy
68 changes: 66 additions & 2 deletions src/dplx/dp/legacy/chunked_output_stream.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <catch2/catch_test_macros.hpp>

#include "blob_matcher.hpp"
#include "test_utils.hpp"

namespace dp_tests
Expand All @@ -37,13 +38,13 @@ class test_legacy_chunked_output_stream final
static constexpr auto partition
= dp::minimum_guaranteed_write_size * 2U - 1U;

explicit test_legacy_chunked_output_stream(unsigned int streamSize)
explicit test_legacy_chunked_output_stream(unsigned streamSize)
: base_type({}, streamSize)
, mChunks()
, mNext(0U)

{
constexpr auto invalidItem = std::byte{0xFEU};
constexpr auto invalidItem = std::byte{0xfeU};
assert(streamSize > partition);

mChunks[0].resize(partition);
Expand All @@ -53,6 +54,15 @@ class test_legacy_chunked_output_stream final
std::fill(mChunks[1].begin(), mChunks[1].end(), invalidItem);
}

[[nodiscard]] auto content() const -> std::vector<std::byte>
{
std::vector<std::byte> result;
result.reserve(mChunks[0].size() + mChunks[1].size());
result.insert(result.end(), mChunks[0].begin(), mChunks[0].end());
result.insert(result.end(), mChunks[1].begin(), mChunks[1].end());
return result;
}

private:
auto acquire_next_chunk_impl() -> result<std::span<std::byte>>
{
Expand All @@ -79,4 +89,58 @@ TEST_CASE("legacy_chunked_output_stream smoke tests")
CHECK(subject.size() == subject.partition - 2);
}

// NOLINTBEGIN(cppcoreguidelines-pro-bounds-pointer-arithmetic)
TEST_CASE("legacy_chunked_output_stream wraps correctly with do_grow")
{
constexpr unsigned streamSize = dp::minimum_guaranteed_write_size * 4 - 1;
test_legacy_chunked_output_stream subject(streamSize);

REQUIRE(subject.ensure_size(1U));

std::byte buffer[streamSize] = {};
constexpr std::size_t offset
= test_legacy_chunked_output_stream::partition - 1;
REQUIRE(subject.bulk_write(std::span(buffer).first(offset)));

REQUIRE(subject.ensure_size(3U));
auto *out = subject.data();
buffer[offset] = out[0] = std::byte{'a'};
buffer[offset + 1] = out[1] = std::byte{'b'};
buffer[offset + 2] = out[2] = std::byte{'c'};
subject.commit_written(3U);

CHECK(subject.mChunks[0].back() == std::byte{0xfeU});
REQUIRE(subject.sync_output());
CHECK(subject.mChunks[0].back() == std::byte{'a'});

constexpr auto invalidItem = std::byte{0xfeU};
std::ranges::fill_n(static_cast<std::byte *>(buffer) + offset + 3,
streamSize - offset - 3, invalidItem);

REQUIRE_BLOB_EQ(subject.content(), buffer);
}

TEST_CASE("legacy_chunked_output_stream wraps correctly with bulk_write")
{
constexpr unsigned streamSize = dp::minimum_guaranteed_write_size * 4;
test_legacy_chunked_output_stream subject(streamSize);

std::byte buffer[streamSize] = {};
constexpr std::size_t offset1 = 2U;
REQUIRE(subject.bulk_write(std::span(buffer).first(offset1)));
REQUIRE(subject.ensure_size(test_legacy_chunked_output_stream::partition
- 1));
constexpr auto offset2 = dp::minimum_guaranteed_write_size * 3;

REQUIRE(subject.bulk_write(std::span(buffer).first(offset2)));

constexpr auto invalidItem = std::byte{0xfeU};
std::ranges::fill_n(static_cast<std::byte *>(buffer) + offset1 + offset2,
streamSize - (offset1 + offset2), invalidItem);

REQUIRE_BLOB_EQ(subject.content(), buffer);
}

// NOLINTEND(cppcoreguidelines-pro-bounds-pointer-arithmetic)

} // namespace dp_tests