Skip to content

Commit

Permalink
buffer: release empty slices after commit (envoyproxy#116) (envoyprox…
Browse files Browse the repository at this point in the history
…y#128)

Description: Remove empty slices off the end of buffers after calls to OwnedImpl::commit. The slices reserved when OwnedImpl::reserve is called will sit unused in cases where the 0 bytes are commited, for example, when socket read returns 0 bytes EAGAIN. Trapped slices act like a memory leak until there is a successful read or the socket is closed.
Risk Level: low
Testing: unit
Docs Changes: n/a
Release Notes: n/a

Signed-off-by: Antonio Vicente <avd@google.com>
Signed-off-by: Asra Ali <asraa@google.com>
Signed-off-by: Yangmin Zhu <ymzhu@google.com>
  • Loading branch information
yangminzhu authored and LukeShu committed Feb 27, 2020
1 parent f5a6933 commit 7f49e25
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 13 deletions.
4 changes: 2 additions & 2 deletions include/envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ class Instance {
/**
* Commit a set of slices originally obtained from reserve(). The number of slices should match
* the number obtained from reserve(). The size of each slice can also be altered. Commit must
* occur following a reserve() without any mutating operations in between other than to the iovecs
* len_ fields.
* occur once following a reserve() without any mutating operations in between other than to the
* iovecs len_ fields.
* @param iovecs supplies the array of slices to commit.
* @param num_iovecs supplies the size of the slices array.
*/
Expand Down
13 changes: 13 additions & 0 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ void OwnedImpl::commit(RawSlice* iovecs, uint64_t num_iovecs) {
}
}

// In case an extra slice was reserved, remove empty slices from the end of the buffer.
while (!slices_.empty() && slices_.back()->dataSize() == 0) {
slices_.pop_back();
}

ASSERT(num_slices_committed > 0);
}
}
Expand Down Expand Up @@ -656,6 +661,14 @@ bool OwnedImpl::isSameBufferImpl(const Instance& rhs) const {
return usesOldImpl() == other->usesOldImpl();
}

std::vector<OwnedSlice::SliceRepresentation> OwnedImpl::describeSlicesForTest() const {
std::vector<OwnedSlice::SliceRepresentation> slices;
for (const auto& slice : slices_) {
slices.push_back(slice->describeSliceForTest());
}
return slices;
}

bool OwnedImpl::use_old_impl_ = false;

} // namespace Buffer
Expand Down
21 changes: 21 additions & 0 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,20 @@ class Slice {
return copy_size;
}

/**
* Describe the in-memory representation of the slice. For use
* in tests that want to make assertions about the specific arrangement of
* bytes in a slice.
*/
struct SliceRepresentation {
uint64_t data;
uint64_t reservable;
uint64_t capacity;
};
SliceRepresentation describeSliceForTest() const {
return SliceRepresentation{dataSize(), reservableSize(), capacity_};
}

protected:
Slice(uint64_t data, uint64_t reservable, uint64_t capacity)
: data_(data), reservable_(reservable), capacity_(capacity) {}
Expand Down Expand Up @@ -541,6 +555,13 @@ class OwnedImpl : public LibEventInstance {
*/
static void useOldImpl(bool use_old_impl);

/**
* Describe the in-memory representation of the slices in the buffer. For use
* in tests that want to make assertions about the specific arrangement of
* bytes in the buffer.
*/
std::vector<OwnedSlice::SliceRepresentation> describeSlicesForTest() const;

private:
/**
* @param rhs another buffer
Expand Down
127 changes: 116 additions & 11 deletions test/common/buffer/owned_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "gtest/gtest.h"

using testing::_;
using testing::ContainerEq;
using testing::Return;

namespace Envoy {
Expand All @@ -34,6 +35,18 @@ class OwnedImplTest : public BufferImplementationParamTest {
static void commitReservation(Buffer::RawSlice* iovecs, uint64_t num_iovecs, OwnedImpl& buffer) {
buffer.commit(iovecs, num_iovecs);
}

static void expectSlices(std::vector<std::vector<int>> buffer_list, OwnedImpl& buffer) {
if (buffer.usesOldImpl()) {
return;
}
const auto& buffer_slices = buffer.describeSlicesForTest();
for (uint64_t i = 0; i < buffer_slices.size(); i++) {
EXPECT_EQ(buffer_slices[i].data, buffer_list[i][0]);
EXPECT_EQ(buffer_slices[i].reservable, buffer_list[i][1]);
EXPECT_EQ(buffer_slices[i].capacity, buffer_list[i][2]);
}
}
};

INSTANTIATE_TEST_SUITE_P(OwnedImplTest, OwnedImplTest,
Expand Down Expand Up @@ -308,23 +321,27 @@ TEST_P(OwnedImplTest, Read) {
EXPECT_TRUE(result.ok());
EXPECT_EQ(0, result.rc_);
EXPECT_EQ(0, buffer.length());
EXPECT_THAT(buffer.describeSlicesForTest(), testing::IsEmpty());

EXPECT_CALL(os_sys_calls, readv(_, _, _)).WillOnce(Return(Api::SysCallSizeResult{-1, 0}));
result = buffer.read(io_handle, 100);
EXPECT_EQ(Api::IoError::IoErrorCode::UnknownError, result.err_->getErrorCode());
EXPECT_EQ(0, result.rc_);
EXPECT_EQ(0, buffer.length());
EXPECT_THAT(buffer.describeSlicesForTest(), testing::IsEmpty());

EXPECT_CALL(os_sys_calls, readv(_, _, _)).WillOnce(Return(Api::SysCallSizeResult{-1, EAGAIN}));
result = buffer.read(io_handle, 100);
EXPECT_EQ(Api::IoError::IoErrorCode::Again, result.err_->getErrorCode());
EXPECT_EQ(0, result.rc_);
EXPECT_EQ(0, buffer.length());
EXPECT_THAT(buffer.describeSlicesForTest(), testing::IsEmpty());

EXPECT_CALL(os_sys_calls, readv(_, _, _)).Times(0);
result = buffer.read(io_handle, 0);
EXPECT_EQ(0, result.rc_);
EXPECT_EQ(0, buffer.length());
EXPECT_THAT(buffer.describeSlicesForTest(), testing::IsEmpty());
}

TEST_P(OwnedImplTest, ReserveCommit) {
Expand Down Expand Up @@ -372,38 +389,41 @@ TEST_P(OwnedImplTest, ReserveCommit) {
// the last slice, and allow the buffer to use only one slice. This should result in the
// creation of a new slice within the buffer.
num_reserved = buffer.reserve(4096 - sizeof(OwnedSlice), iovecs, 1);
const void* slice2 = iovecs[0].mem_;
EXPECT_EQ(1, num_reserved);
EXPECT_NE(slice1, slice2);
EXPECT_NE(slice1, iovecs[0].mem_);
clearReservation(iovecs, num_reserved, buffer);

// Request the same size reservation, but allow the buffer to use multiple slices. This
// should result in the buffer splitting the reservation between its last two slices.
// should result in the buffer creating a second slice and splitting the reservation between the
// last two slices.
num_reserved = buffer.reserve(4096 - sizeof(OwnedSlice), iovecs, NumIovecs);
EXPECT_EQ(2, num_reserved);
EXPECT_EQ(slice1, iovecs[0].mem_);
EXPECT_EQ(slice2, iovecs[1].mem_);
clearReservation(iovecs, num_reserved, buffer);

// Request a reservation that too big to fit in the existing slices. This should result
// in the creation of a third slice.
expectSlices({{1, 4055, 4056}}, buffer);
buffer.reserve(4096 - sizeof(OwnedSlice), iovecs, NumIovecs);
expectSlices({{1, 4055, 4056}, {0, 4056, 4056}}, buffer);
const void* slice2 = iovecs[1].mem_;
num_reserved = buffer.reserve(8192, iovecs, NumIovecs);
expectSlices({{1, 4055, 4056}, {0, 4056, 4056}, {0, 4056, 4056}}, buffer);
EXPECT_EQ(3, num_reserved);
EXPECT_EQ(slice1, iovecs[0].mem_);
EXPECT_EQ(slice2, iovecs[1].mem_);
const void* slice3 = iovecs[2].mem_;
clearReservation(iovecs, num_reserved, buffer);

// Append a fragment to the buffer, and then request a small reservation. The buffer
// should make a new slice to satisfy the reservation; it cannot safely use any of
// the previously seen slices, because they are no longer at the end of the buffer.
expectSlices({{1, 4055, 4056}}, buffer);
buffer.addBufferFragment(fragment);
EXPECT_EQ(13, buffer.length());
num_reserved = buffer.reserve(1, iovecs, NumIovecs);
expectSlices({{1, 4055, 4056}, {12, 0, 12}, {0, 4056, 4056}}, buffer);
EXPECT_EQ(1, num_reserved);
EXPECT_NE(slice1, iovecs[0].mem_);
EXPECT_NE(slice2, iovecs[0].mem_);
EXPECT_NE(slice3, iovecs[0].mem_);
commitReservation(iovecs, num_reserved, buffer);
EXPECT_EQ(14, buffer.length());
}
Expand Down Expand Up @@ -433,18 +453,20 @@ TEST_P(OwnedImplTest, ReserveCommitReuse) {
num_reserved = buffer.reserve(16384, iovecs, NumIovecs);
EXPECT_EQ(2, num_reserved);
const void* first_slice = iovecs[0].mem_;
const void* second_slice = iovecs[1].mem_;
iovecs[0].len_ = 1;
expectSlices({{8000, 4248, 12248}, {0, 12248, 12248}}, buffer);
buffer.commit(iovecs, 1);
EXPECT_EQ(8001, buffer.length());
EXPECT_EQ(first_slice, iovecs[0].mem_);
// The second slice is now released because there's nothing in the second slice.
expectSlices({{8001, 4247, 12248}}, buffer);

// Reserve 16KB again, and check whether we get back the uncommitted
// second slice from the previous reservation.
// Reserve 16KB again.
num_reserved = buffer.reserve(16384, iovecs, NumIovecs);
expectSlices({{8001, 4247, 12248}, {0, 12248, 12248}}, buffer);
EXPECT_EQ(2, num_reserved);
EXPECT_EQ(static_cast<const uint8_t*>(first_slice) + 1,
static_cast<const uint8_t*>(iovecs[0].mem_));
EXPECT_EQ(second_slice, iovecs[1].mem_);
}

TEST_P(OwnedImplTest, ReserveReuse) {
Expand All @@ -470,6 +492,66 @@ TEST_P(OwnedImplTest, ReserveReuse) {
EXPECT_EQ(2, num_reserved);
EXPECT_EQ(first_slice, iovecs[0].mem_);
EXPECT_EQ(second_slice, iovecs[1].mem_);
expectSlices({{0, 12248, 12248}, {0, 8152, 8152}}, buffer);

// The remaining tests validate internal manipulations of the new slice
// implementation, so they're not valid for the old evbuffer implementation.
if (buffer.usesOldImpl()) {
return;
}

// Request a larger reservation, verify that the second entry is replaced with a block with a
// larger size.
num_reserved = buffer.reserve(30000, iovecs, NumIovecs);
const void* third_slice = iovecs[1].mem_;
EXPECT_EQ(2, num_reserved);
EXPECT_EQ(first_slice, iovecs[0].mem_);
EXPECT_EQ(12248, iovecs[0].len_);
EXPECT_NE(second_slice, iovecs[1].mem_);
EXPECT_EQ(30000 - iovecs[0].len_, iovecs[1].len_);
expectSlices({{0, 12248, 12248}, {0, 8152, 8152}, {0, 20440, 20440}}, buffer);

// Repeating a the reservation request for a smaller block returns the previous entry.
num_reserved = buffer.reserve(16384, iovecs, NumIovecs);
EXPECT_EQ(2, num_reserved);
EXPECT_EQ(first_slice, iovecs[0].mem_);
EXPECT_EQ(second_slice, iovecs[1].mem_);
expectSlices({{0, 12248, 12248}, {0, 8152, 8152}, {0, 20440, 20440}}, buffer);

// Repeat the larger reservation notice that it doesn't match the prior reservation for 30000
// bytes.
num_reserved = buffer.reserve(30000, iovecs, NumIovecs);
EXPECT_EQ(2, num_reserved);
EXPECT_EQ(first_slice, iovecs[0].mem_);
EXPECT_EQ(12248, iovecs[0].len_);
EXPECT_NE(second_slice, iovecs[1].mem_);
EXPECT_NE(third_slice, iovecs[1].mem_);
EXPECT_EQ(30000 - iovecs[0].len_, iovecs[1].len_);
expectSlices({{0, 12248, 12248}, {0, 8152, 8152}, {0, 20440, 20440}, {0, 20440, 20440}}, buffer);

// Commit the most recent reservation and verify the representation.
buffer.commit(iovecs, num_reserved);
expectSlices({{12248, 0, 12248}, {0, 8152, 8152}, {0, 20440, 20440}, {17752, 2688, 20440}},
buffer);

// Do another reservation.
num_reserved = buffer.reserve(16384, iovecs, NumIovecs);
EXPECT_EQ(2, num_reserved);
expectSlices({{12248, 0, 12248},
{0, 8152, 8152},
{0, 20440, 20440},
{17752, 2688, 20440},
{0, 16344, 16344}},
buffer);

// And commit.
buffer.commit(iovecs, num_reserved);
expectSlices({{12248, 0, 12248},
{0, 8152, 8152},
{0, 20440, 20440},
{20440, 0, 20440},
{13696, 2648, 16344}},
buffer);
}

TEST_P(OwnedImplTest, Search) {
Expand Down Expand Up @@ -610,6 +692,29 @@ TEST_P(OwnedImplTest, ReserveZeroCommit) {
ASSERT_EQ(::close(pipe_fds[1]), 0);
ASSERT_EQ(previous_length, buf.search(data.data(), rc, previous_length));
EXPECT_EQ("bbbbb", buf.toString().substr(0, 5));
expectSlices({{5, 0, 4056}, {1953, 2103, 4056}}, buf);
}

TEST_P(OwnedImplTest, ReadReserveAndCommit) {
BufferFragmentImpl frag("", 0, nullptr);
Buffer::OwnedImpl buf;
buf.add("bbbbb");

int pipe_fds[2] = {0, 0};
ASSERT_EQ(::pipe(pipe_fds), 0);
Network::IoSocketHandleImpl io_handle(pipe_fds[0]);
ASSERT_EQ(::fcntl(pipe_fds[0], F_SETFL, O_NONBLOCK), 0);
ASSERT_EQ(::fcntl(pipe_fds[1], F_SETFL, O_NONBLOCK), 0);

const uint32_t read_length = 32768;
std::string data = "e";
const ssize_t rc = ::write(pipe_fds[1], data.data(), data.size());
ASSERT_GT(rc, 0);
Api::IoCallUint64Result result = buf.read(io_handle, read_length);
ASSERT_EQ(result.rc_, static_cast<uint64_t>(rc));
ASSERT_EQ(::close(pipe_fds[1]), 0);
EXPECT_EQ("bbbbbe", buf.toString());
expectSlices({{6, 4050, 4056}}, buf);
}

TEST(OverflowDetectingUInt64, Arithmetic) {
Expand Down

0 comments on commit 7f49e25

Please sign in to comment.