Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer: raw_combined allocations buffer and ref count together #7612

Merged
merged 12 commits into from Mar 2, 2016
Merged
91 changes: 80 additions & 11 deletions src/common/buffer.cc
Expand Up @@ -39,6 +39,9 @@
#include <ostream>
namespace ceph {

#define CEPH_BUFFER_ALLOC_UNIT (MIN(CEPH_PAGE_SIZE, 4096))
#define CEPH_BUFFER_APPEND_SIZE (CEPH_BUFFER_ALLOC_UNIT - sizeof(raw_combined))

#ifdef BUFFER_DEBUG
static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
# define bdout { simple_spin_lock(&buffer_debug_lock); std::cout
Expand Down Expand Up @@ -235,6 +238,55 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
}
};

/*
* raw_combined is always placed within a single allocation along
* with the data buffer. the data goes at the beginning, and
* raw_combined at the end.
*/
class buffer::raw_combined : public buffer::raw {
size_t alignment;
public:
raw_combined(char *dataptr, unsigned l, unsigned align=0)
: raw(dataptr, l),
alignment(align) {
inc_total_alloc(len);
}
~raw_combined() {
dec_total_alloc(len);
}
raw* clone_empty() {
return create(len, alignment);
}

static raw_combined *create(unsigned len, unsigned align=0) {
if (!align)
align = sizeof(size_t);
size_t rawlen = ROUND_UP_TO(sizeof(buffer::raw_combined),
alignof(buffer::raw_combined));
size_t datalen = ROUND_UP_TO(len, alignof(buffer::raw_combined));

#ifdef DARWIN
char *ptr = (char *) valloc(rawlen + datalen);
#else
char *ptr = 0;
int r = ::posix_memalign((void**)(void*)&ptr, align, rawlen + datalen);
if (r)
throw bad_alloc();
#endif /* DARWIN */
if (!ptr)
throw bad_alloc();

// actual data first, since it has presumably larger alignment restriction
// then put the raw_combined at the end
return new (ptr + datalen) raw_combined(ptr, len, align);
}

static void operator delete(void *ptr) {
raw_combined *raw = (raw_combined *)ptr;
::free((void *)raw->data);
}
};

class buffer::raw_malloc : public buffer::raw {
public:
explicit raw_malloc(unsigned l) : raw(l) {
Expand Down Expand Up @@ -635,12 +687,13 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
#endif /* HAVE_XIO */

buffer::raw* buffer::copy(const char *c, unsigned len) {
raw* r = new raw_char(len);
raw* r = buffer::create_aligned(len, sizeof(size_t));
memcpy(r->data, c, len);
return r;
}

buffer::raw* buffer::create(unsigned len) {
return new raw_char(len);
return buffer::create_aligned(len, sizeof(size_t));
}
buffer::raw* buffer::claim_char(unsigned len, char *buf) {
return new raw_char(len, buf);
Expand All @@ -654,14 +707,28 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
buffer::raw* buffer::create_static(unsigned len, char *buf) {
return new raw_static(buf, len);
}

buffer::raw* buffer::create_aligned(unsigned len, unsigned align) {
// If alignment is a page multiple, use a separate buffer::raw to
// avoid fragmenting the heap.
//
// Somewhat unexpectedly, I see consistently better performance
// from raw_combined than from raw even when the allocation size is
// a page multiple (but alignment is not).
//
// I also see better performance from a separate buffer::raw once the
// size passes 8KB.
if ((align & ~CEPH_PAGE_MASK) == 0 ||
len >= CEPH_PAGE_SIZE * 2) {
#ifndef __CYGWIN__
//return new raw_mmap_pages(len);
return new raw_posix_aligned(len, align);
return new raw_posix_aligned(len, align);
#else
return new raw_hack_aligned(len, align);
return new raw_hack_aligned(len, align);
#endif
}
return raw_combined::create(len, align);
}

buffer::raw* buffer::create_page_aligned(unsigned len) {
return create_aligned(len, CEPH_PAGE_SIZE);
}
Expand Down Expand Up @@ -1590,7 +1657,7 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
unsigned gap = append_buffer.unused_tail_length();
if (!gap) {
// make a new append_buffer!
append_buffer = create_aligned(CEPH_BUFFER_APPEND_SIZE, CEPH_BUFFER_APPEND_SIZE);
append_buffer = raw_combined::create(CEPH_BUFFER_APPEND_SIZE);
append_buffer.set_length(0); // unused, so far.
}
append(append_buffer, append_buffer.append(c) - 1, 1); // add segment to the list
Expand All @@ -1612,9 +1679,12 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
if (len == 0)
break; // done!

// make a new append_buffer!
unsigned alen = CEPH_BUFFER_APPEND_SIZE * (((len-1) / CEPH_BUFFER_APPEND_SIZE) + 1);
append_buffer = create_aligned(alen, CEPH_BUFFER_APPEND_SIZE);
// make a new append_buffer. fill out a complete page, factoring in the
// raw_combined overhead.
size_t need = ROUND_UP_TO(len, sizeof(size_t)) + sizeof(raw_combined);
size_t alen = ROUND_UP_TO(need, CEPH_BUFFER_ALLOC_UNIT) -
sizeof(raw_combined);
append_buffer = raw_combined::create(alen);
append_buffer.set_length(0); // unused, so far.
}
}
Expand Down Expand Up @@ -1945,8 +2015,7 @@ ssize_t buffer::list::read_fd(int fd, size_t len)
// available for raw_pipe until we actually inspect the data
return 0;
}
int s = ROUND_UP_TO(len, CEPH_BUFFER_APPEND_SIZE);
bufferptr bp = buffer::create_aligned(s, CEPH_BUFFER_APPEND_SIZE);
bufferptr bp = buffer::create(len);
ssize_t ret = safe_read(fd, (void*)bp.c_str(), len);
if (ret >= 0) {
bp.set_length(ret);
Expand Down
7 changes: 5 additions & 2 deletions src/include/buffer.h
Expand Up @@ -70,8 +70,6 @@ class XioDispatchHook;

namespace ceph {

const static int CEPH_BUFFER_APPEND_SIZE(4096);

namespace buffer CEPH_BUFFER_API {
/*
* exceptions
Expand Down Expand Up @@ -136,6 +134,7 @@ namespace buffer CEPH_BUFFER_API {
class raw_char;
class raw_pipe;
class raw_unshareable; // diagnostic, unshareable char buffer
class raw_combined;


class xio_mempool;
Expand Down Expand Up @@ -386,6 +385,10 @@ namespace buffer CEPH_BUFFER_API {
return *this;
}

unsigned get_num_buffers() const { return _buffers.size(); }
const ptr& front() const { return _buffers.front(); }
const ptr& back() const { return _buffers.back(); }

unsigned get_memcopy_count() const {return _memcopy_count; }
const std::list<ptr>& buffers() const { return _buffers; }
void swap(list& other);
Expand Down
4 changes: 2 additions & 2 deletions src/include/encoding.h
Expand Up @@ -233,8 +233,8 @@ inline void decode(buffer::ptr& bp, bufferlist::iterator& p)
p.copy(len, s);

if (len) {
if (s.buffers().size() == 1)
bp = s.buffers().front();
if (s.get_num_buffers() == 1)
bp = s.front();
else
bp = buffer::copy(s.c_str(), s.length());
}
Expand Down
2 changes: 1 addition & 1 deletion src/os/filestore/FileJournal.cc
Expand Up @@ -1422,7 +1422,7 @@ int FileJournal::write_aio_bl(off64_t& pos, bufferlist& bl, uint64_t seq)
dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl;

while (bl.length() > 0) {
int max = MIN(bl.buffers().size(), IOV_MAX-1);
int max = MIN(bl.get_num_buffers(), IOV_MAX-1);
iovec *iov = new iovec[max];
int n = 0;
unsigned len = 0;
Expand Down
4 changes: 2 additions & 2 deletions src/rbd_replay/BufferReader.cc
Expand Up @@ -16,9 +16,9 @@ BufferReader::BufferReader(int fd, size_t min_bytes, size_t max_bytes)
int BufferReader::fetch(bufferlist::iterator **it) {
if (m_bl_it.get_remaining() < m_min_bytes) {
ssize_t bytes_to_read = ROUND_UP_TO(m_max_bytes - m_bl_it.get_remaining(),
CEPH_BUFFER_APPEND_SIZE);
CEPH_PAGE_SIZE);
while (!m_eof_reached && bytes_to_read > 0) {
int r = m_bl.read_fd(m_fd, CEPH_BUFFER_APPEND_SIZE);
int r = m_bl.read_fd(m_fd, CEPH_PAGE_SIZE);
if (r < 0) {
return r;
}
Expand Down