From e6a2d20688705eb77de8c664cf8d599d854b2e2c Mon Sep 17 00:00:00 2001 From: Yangmin Zhu Date: Wed, 19 Feb 2020 17:26:45 -0800 Subject: [PATCH] buffer: release empty slices after commit (#116) (#128) Description: Remove empty slices off the end of buffers after calls to OwnedImpl::commit. The slices reserved when OwnedImpl::reserve is called will sit unused in cases where the 0 bytes are commited, for example, when socket read returns 0 bytes EAGAIN. Trapped slices act like a memory leak until there is a successful read or the socket is closed. Risk Level: low Testing: unit Docs Changes: n/a Release Notes: n/a Signed-off-by: Antonio Vicente Signed-off-by: Asra Ali Signed-off-by: Yangmin Zhu --- include/envoy/buffer/buffer.h | 4 +- source/common/buffer/buffer_impl.cc | 13 +++ source/common/buffer/buffer_impl.h | 21 +++++ test/common/buffer/owned_impl_test.cc | 127 +++++++++++++++++++++++--- 4 files changed, 152 insertions(+), 13 deletions(-) diff --git a/include/envoy/buffer/buffer.h b/include/envoy/buffer/buffer.h index 1e29bf5cd953..801a5c0de9ee 100644 --- a/include/envoy/buffer/buffer.h +++ b/include/envoy/buffer/buffer.h @@ -101,8 +101,8 @@ class Instance { /** * Commit a set of slices originally obtained from reserve(). The number of slices should match * the number obtained from reserve(). The size of each slice can also be altered. Commit must - * occur following a reserve() without any mutating operations in between other than to the iovecs - * len_ fields. + * occur once following a reserve() without any mutating operations in between other than to the + * iovecs len_ fields. * @param iovecs supplies the array of slices to commit. * @param num_iovecs supplies the size of the slices array. */ diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index 005ddcd05349..fee0fa89008e 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -150,6 +150,11 @@ void OwnedImpl::commit(RawSlice* iovecs, uint64_t num_iovecs) { } } + // In case an extra slice was reserved, remove empty slices from the end of the buffer. + while (!slices_.empty() && slices_.back()->dataSize() == 0) { + slices_.pop_back(); + } + ASSERT(num_slices_committed > 0); } } @@ -656,6 +661,14 @@ bool OwnedImpl::isSameBufferImpl(const Instance& rhs) const { return usesOldImpl() == other->usesOldImpl(); } +std::vector OwnedImpl::describeSlicesForTest() const { + std::vector slices; + for (const auto& slice : slices_) { + slices.push_back(slice->describeSliceForTest()); + } + return slices; +} + bool OwnedImpl::use_old_impl_ = false; } // namespace Buffer diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index 114d14dabc94..36715ed42d1a 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -174,6 +174,20 @@ class Slice { return copy_size; } + /** + * Describe the in-memory representation of the slice. For use + * in tests that want to make assertions about the specific arrangement of + * bytes in a slice. + */ + struct SliceRepresentation { + uint64_t data; + uint64_t reservable; + uint64_t capacity; + }; + SliceRepresentation describeSliceForTest() const { + return SliceRepresentation{dataSize(), reservableSize(), capacity_}; + } + protected: Slice(uint64_t data, uint64_t reservable, uint64_t capacity) : data_(data), reservable_(reservable), capacity_(capacity) {} @@ -541,6 +555,13 @@ class OwnedImpl : public LibEventInstance { */ static void useOldImpl(bool use_old_impl); + /** + * Describe the in-memory representation of the slices in the buffer. For use + * in tests that want to make assertions about the specific arrangement of + * bytes in the buffer. + */ + std::vector describeSlicesForTest() const; + private: /** * @param rhs another buffer diff --git a/test/common/buffer/owned_impl_test.cc b/test/common/buffer/owned_impl_test.cc index d1b6935546e4..3400e6334d79 100644 --- a/test/common/buffer/owned_impl_test.cc +++ b/test/common/buffer/owned_impl_test.cc @@ -13,6 +13,7 @@ #include "gtest/gtest.h" using testing::_; +using testing::ContainerEq; using testing::Return; namespace Envoy { @@ -34,6 +35,18 @@ class OwnedImplTest : public BufferImplementationParamTest { static void commitReservation(Buffer::RawSlice* iovecs, uint64_t num_iovecs, OwnedImpl& buffer) { buffer.commit(iovecs, num_iovecs); } + + static void expectSlices(std::vector> buffer_list, OwnedImpl& buffer) { + if (buffer.usesOldImpl()) { + return; + } + const auto& buffer_slices = buffer.describeSlicesForTest(); + for (uint64_t i = 0; i < buffer_slices.size(); i++) { + EXPECT_EQ(buffer_slices[i].data, buffer_list[i][0]); + EXPECT_EQ(buffer_slices[i].reservable, buffer_list[i][1]); + EXPECT_EQ(buffer_slices[i].capacity, buffer_list[i][2]); + } + } }; INSTANTIATE_TEST_SUITE_P(OwnedImplTest, OwnedImplTest, @@ -308,23 +321,27 @@ TEST_P(OwnedImplTest, Read) { EXPECT_TRUE(result.ok()); EXPECT_EQ(0, result.rc_); EXPECT_EQ(0, buffer.length()); + EXPECT_THAT(buffer.describeSlicesForTest(), testing::IsEmpty()); EXPECT_CALL(os_sys_calls, readv(_, _, _)).WillOnce(Return(Api::SysCallSizeResult{-1, 0})); result = buffer.read(io_handle, 100); EXPECT_EQ(Api::IoError::IoErrorCode::UnknownError, result.err_->getErrorCode()); EXPECT_EQ(0, result.rc_); EXPECT_EQ(0, buffer.length()); + EXPECT_THAT(buffer.describeSlicesForTest(), testing::IsEmpty()); EXPECT_CALL(os_sys_calls, readv(_, _, _)).WillOnce(Return(Api::SysCallSizeResult{-1, EAGAIN})); result = buffer.read(io_handle, 100); EXPECT_EQ(Api::IoError::IoErrorCode::Again, result.err_->getErrorCode()); EXPECT_EQ(0, result.rc_); EXPECT_EQ(0, buffer.length()); + EXPECT_THAT(buffer.describeSlicesForTest(), testing::IsEmpty()); EXPECT_CALL(os_sys_calls, readv(_, _, _)).Times(0); result = buffer.read(io_handle, 0); EXPECT_EQ(0, result.rc_); EXPECT_EQ(0, buffer.length()); + EXPECT_THAT(buffer.describeSlicesForTest(), testing::IsEmpty()); } TEST_P(OwnedImplTest, ReserveCommit) { @@ -372,38 +389,41 @@ TEST_P(OwnedImplTest, ReserveCommit) { // the last slice, and allow the buffer to use only one slice. This should result in the // creation of a new slice within the buffer. num_reserved = buffer.reserve(4096 - sizeof(OwnedSlice), iovecs, 1); - const void* slice2 = iovecs[0].mem_; EXPECT_EQ(1, num_reserved); - EXPECT_NE(slice1, slice2); + EXPECT_NE(slice1, iovecs[0].mem_); clearReservation(iovecs, num_reserved, buffer); // Request the same size reservation, but allow the buffer to use multiple slices. This - // should result in the buffer splitting the reservation between its last two slices. + // should result in the buffer creating a second slice and splitting the reservation between the + // last two slices. num_reserved = buffer.reserve(4096 - sizeof(OwnedSlice), iovecs, NumIovecs); EXPECT_EQ(2, num_reserved); EXPECT_EQ(slice1, iovecs[0].mem_); - EXPECT_EQ(slice2, iovecs[1].mem_); clearReservation(iovecs, num_reserved, buffer); // Request a reservation that too big to fit in the existing slices. This should result // in the creation of a third slice. + expectSlices({{1, 4055, 4056}}, buffer); + buffer.reserve(4096 - sizeof(OwnedSlice), iovecs, NumIovecs); + expectSlices({{1, 4055, 4056}, {0, 4056, 4056}}, buffer); + const void* slice2 = iovecs[1].mem_; num_reserved = buffer.reserve(8192, iovecs, NumIovecs); + expectSlices({{1, 4055, 4056}, {0, 4056, 4056}, {0, 4056, 4056}}, buffer); EXPECT_EQ(3, num_reserved); EXPECT_EQ(slice1, iovecs[0].mem_); EXPECT_EQ(slice2, iovecs[1].mem_); - const void* slice3 = iovecs[2].mem_; clearReservation(iovecs, num_reserved, buffer); // Append a fragment to the buffer, and then request a small reservation. The buffer // should make a new slice to satisfy the reservation; it cannot safely use any of // the previously seen slices, because they are no longer at the end of the buffer. + expectSlices({{1, 4055, 4056}}, buffer); buffer.addBufferFragment(fragment); EXPECT_EQ(13, buffer.length()); num_reserved = buffer.reserve(1, iovecs, NumIovecs); + expectSlices({{1, 4055, 4056}, {12, 0, 12}, {0, 4056, 4056}}, buffer); EXPECT_EQ(1, num_reserved); EXPECT_NE(slice1, iovecs[0].mem_); - EXPECT_NE(slice2, iovecs[0].mem_); - EXPECT_NE(slice3, iovecs[0].mem_); commitReservation(iovecs, num_reserved, buffer); EXPECT_EQ(14, buffer.length()); } @@ -433,18 +453,20 @@ TEST_P(OwnedImplTest, ReserveCommitReuse) { num_reserved = buffer.reserve(16384, iovecs, NumIovecs); EXPECT_EQ(2, num_reserved); const void* first_slice = iovecs[0].mem_; - const void* second_slice = iovecs[1].mem_; iovecs[0].len_ = 1; + expectSlices({{8000, 4248, 12248}, {0, 12248, 12248}}, buffer); buffer.commit(iovecs, 1); EXPECT_EQ(8001, buffer.length()); + EXPECT_EQ(first_slice, iovecs[0].mem_); + // The second slice is now released because there's nothing in the second slice. + expectSlices({{8001, 4247, 12248}}, buffer); - // Reserve 16KB again, and check whether we get back the uncommitted - // second slice from the previous reservation. + // Reserve 16KB again. num_reserved = buffer.reserve(16384, iovecs, NumIovecs); + expectSlices({{8001, 4247, 12248}, {0, 12248, 12248}}, buffer); EXPECT_EQ(2, num_reserved); EXPECT_EQ(static_cast(first_slice) + 1, static_cast(iovecs[0].mem_)); - EXPECT_EQ(second_slice, iovecs[1].mem_); } TEST_P(OwnedImplTest, ReserveReuse) { @@ -470,6 +492,66 @@ TEST_P(OwnedImplTest, ReserveReuse) { EXPECT_EQ(2, num_reserved); EXPECT_EQ(first_slice, iovecs[0].mem_); EXPECT_EQ(second_slice, iovecs[1].mem_); + expectSlices({{0, 12248, 12248}, {0, 8152, 8152}}, buffer); + + // The remaining tests validate internal manipulations of the new slice + // implementation, so they're not valid for the old evbuffer implementation. + if (buffer.usesOldImpl()) { + return; + } + + // Request a larger reservation, verify that the second entry is replaced with a block with a + // larger size. + num_reserved = buffer.reserve(30000, iovecs, NumIovecs); + const void* third_slice = iovecs[1].mem_; + EXPECT_EQ(2, num_reserved); + EXPECT_EQ(first_slice, iovecs[0].mem_); + EXPECT_EQ(12248, iovecs[0].len_); + EXPECT_NE(second_slice, iovecs[1].mem_); + EXPECT_EQ(30000 - iovecs[0].len_, iovecs[1].len_); + expectSlices({{0, 12248, 12248}, {0, 8152, 8152}, {0, 20440, 20440}}, buffer); + + // Repeating a the reservation request for a smaller block returns the previous entry. + num_reserved = buffer.reserve(16384, iovecs, NumIovecs); + EXPECT_EQ(2, num_reserved); + EXPECT_EQ(first_slice, iovecs[0].mem_); + EXPECT_EQ(second_slice, iovecs[1].mem_); + expectSlices({{0, 12248, 12248}, {0, 8152, 8152}, {0, 20440, 20440}}, buffer); + + // Repeat the larger reservation notice that it doesn't match the prior reservation for 30000 + // bytes. + num_reserved = buffer.reserve(30000, iovecs, NumIovecs); + EXPECT_EQ(2, num_reserved); + EXPECT_EQ(first_slice, iovecs[0].mem_); + EXPECT_EQ(12248, iovecs[0].len_); + EXPECT_NE(second_slice, iovecs[1].mem_); + EXPECT_NE(third_slice, iovecs[1].mem_); + EXPECT_EQ(30000 - iovecs[0].len_, iovecs[1].len_); + expectSlices({{0, 12248, 12248}, {0, 8152, 8152}, {0, 20440, 20440}, {0, 20440, 20440}}, buffer); + + // Commit the most recent reservation and verify the representation. + buffer.commit(iovecs, num_reserved); + expectSlices({{12248, 0, 12248}, {0, 8152, 8152}, {0, 20440, 20440}, {17752, 2688, 20440}}, + buffer); + + // Do another reservation. + num_reserved = buffer.reserve(16384, iovecs, NumIovecs); + EXPECT_EQ(2, num_reserved); + expectSlices({{12248, 0, 12248}, + {0, 8152, 8152}, + {0, 20440, 20440}, + {17752, 2688, 20440}, + {0, 16344, 16344}}, + buffer); + + // And commit. + buffer.commit(iovecs, num_reserved); + expectSlices({{12248, 0, 12248}, + {0, 8152, 8152}, + {0, 20440, 20440}, + {20440, 0, 20440}, + {13696, 2648, 16344}}, + buffer); } TEST_P(OwnedImplTest, Search) { @@ -610,6 +692,29 @@ TEST_P(OwnedImplTest, ReserveZeroCommit) { ASSERT_EQ(::close(pipe_fds[1]), 0); ASSERT_EQ(previous_length, buf.search(data.data(), rc, previous_length)); EXPECT_EQ("bbbbb", buf.toString().substr(0, 5)); + expectSlices({{5, 0, 4056}, {1953, 2103, 4056}}, buf); +} + +TEST_P(OwnedImplTest, ReadReserveAndCommit) { + BufferFragmentImpl frag("", 0, nullptr); + Buffer::OwnedImpl buf; + buf.add("bbbbb"); + + int pipe_fds[2] = {0, 0}; + ASSERT_EQ(::pipe(pipe_fds), 0); + Network::IoSocketHandleImpl io_handle(pipe_fds[0]); + ASSERT_EQ(::fcntl(pipe_fds[0], F_SETFL, O_NONBLOCK), 0); + ASSERT_EQ(::fcntl(pipe_fds[1], F_SETFL, O_NONBLOCK), 0); + + const uint32_t read_length = 32768; + std::string data = "e"; + const ssize_t rc = ::write(pipe_fds[1], data.data(), data.size()); + ASSERT_GT(rc, 0); + Api::IoCallUint64Result result = buf.read(io_handle, read_length); + ASSERT_EQ(result.rc_, static_cast(rc)); + ASSERT_EQ(::close(pipe_fds[1]), 0); + EXPECT_EQ("bbbbbe", buf.toString()); + expectSlices({{6, 4050, 4056}}, buf); } TEST(OverflowDetectingUInt64, Arithmetic) {