Skip to content

Commit

Permalink
Merge pull request #16693 from alex-mikheev/wip_rdma_rxpool_1
Browse files Browse the repository at this point in the history
msg/async/rdma: improves RX buffer management

Reviewed-by: Haomai Wang <haomai@xsky.com>
  • Loading branch information
yuyuyu101 committed Aug 7, 2017
2 parents 03e6f97 + 720d044 commit bb772b0
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 176 deletions.
3 changes: 3 additions & 0 deletions src/common/legacy_config_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,10 @@ OPTION(ms_async_rdma_device_name, OPT_STR)
OPTION(ms_async_rdma_enable_hugepage, OPT_BOOL)
OPTION(ms_async_rdma_buffer_size, OPT_INT)
OPTION(ms_async_rdma_send_buffers, OPT_U32)
//size of the receive buffer pool, 0 is unlimited
OPTION(ms_async_rdma_receive_buffers, OPT_U32)
// max number of wr in srq
OPTION(ms_async_rdma_receive_queue_len, OPT_U32)
OPTION(ms_async_rdma_port_num, OPT_U32)
OPTION(ms_async_rdma_polling_us, OPT_U32)
OPTION(ms_async_rdma_local_gid, OPT_STR) // GID format: "fe80:0000:0000:0000:7efe:90ff:fe72:6efe", no zero folding
Expand Down
6 changes: 5 additions & 1 deletion src/common/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,11 @@ std::vector<Option> get_global_options() {
.set_description(""),

Option("ms_async_rdma_receive_buffers", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(1024)
.set_default(32768)
.set_description(""),

Option("ms_async_rdma_receive_queue_len", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
.set_default(4096)
.set_description(""),

Option("ms_async_rdma_port_num", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
Expand Down
241 changes: 160 additions & 81 deletions src/msg/async/rdma/Infiniband.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Infiniband::QueuePair::QueuePair(
CephContext *c, Infiniband& infiniband, ibv_qp_type type,
int port, ibv_srq *srq,
Infiniband::CompletionQueue* txcq, Infiniband::CompletionQueue* rxcq,
uint32_t max_send_wr, uint32_t max_recv_wr, uint32_t q_key)
uint32_t tx_queue_len, uint32_t rx_queue_len, uint32_t q_key)
: cct(c), infiniband(infiniband),
type(type),
ctxt(infiniband.device->ctxt),
Expand All @@ -161,8 +161,8 @@ Infiniband::QueuePair::QueuePair(
txcq(txcq),
rxcq(rxcq),
initial_psn(0),
max_send_wr(max_send_wr),
max_recv_wr(max_recv_wr),
max_send_wr(tx_queue_len),
max_recv_wr(rx_queue_len),
q_key(q_key),
dead(false)
{
Expand Down Expand Up @@ -192,7 +192,7 @@ int Infiniband::QueuePair::init()
if (qp == NULL) {
lderr(cct) << __func__ << " failed to create queue pair" << cpp_strerror(errno) << dendl;
if (errno == ENOMEM) {
lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_buffers, "
lderr(cct) << __func__ << " try reducing ms_async_rdma_receive_queue_length, "
" ms_async_rdma_send_buffers or"
" ms_async_rdma_buffer_size" << dendl;
}
Expand Down Expand Up @@ -554,11 +554,6 @@ void Infiniband::MemoryManager::Chunk::clear()
bound = 0;
}

void Infiniband::MemoryManager::Chunk::post_srq(Infiniband *ib)
{
ib->post_chunk(this);
}

Infiniband::MemoryManager::Cluster::Cluster(MemoryManager& m, uint32_t s)
: manager(m), buffer_size(s), lock("cluster_lock")
{
Expand All @@ -574,22 +569,16 @@ Infiniband::MemoryManager::Cluster::~Cluster()
}

::free(chunk_base);
if (manager.enabled_huge_page)
manager.free_huge_pages(base);
else
::free(base);
manager.free(base);
}

int Infiniband::MemoryManager::Cluster::fill(uint32_t num)
{
assert(!base);
num_chunk = num;
uint32_t bytes = buffer_size * num;
if (manager.enabled_huge_page) {
base = (char*)manager.malloc_huge_pages(bytes);
} else {
base = (char*)memalign(CEPH_PAGE_SIZE, bytes);
}

base = (char*)manager.malloc(bytes);
end = base + bytes;
assert(base);
chunk_base = static_cast<Chunk*>(::malloc(sizeof(Chunk) * num));
Expand Down Expand Up @@ -642,35 +631,104 @@ int Infiniband::MemoryManager::Cluster::get_buffers(std::vector<Chunk*> &chunks,
return r;
}

Infiniband::MemoryManager* Infiniband::MemoryManager::RxAllocator::manager = nullptr;
PerfCounters *Infiniband::MemoryManager::RxAllocator::perf_logger = nullptr;

unsigned Infiniband::MemoryManager::RxAllocator::n_bufs_allocated = 0;
unsigned Infiniband::MemoryManager::RxAllocator::max_bufs = 0;


char *Infiniband::MemoryManager::RxAllocator::malloc(const size_type bytes)
{
mem_info *m;
Chunk *ch;
size_t rx_buf_size;
unsigned nbufs;

rx_buf_size = sizeof(Chunk) + manager->cct->_conf->ms_async_rdma_buffer_size;
nbufs = bytes/rx_buf_size;

if (max_bufs > 0 && n_bufs_allocated + nbufs > max_bufs) {
return NULL;
}

m = static_cast<mem_info *>(manager->malloc(bytes + sizeof(*m)));
if (!m)
return NULL;

m->mr = ibv_reg_mr(manager->pd->pd, m->chunks, bytes, IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_LOCAL_WRITE);
assert(m->mr);
m->nbufs = nbufs;

n_bufs_allocated += nbufs;
// note that the memory can be allocated before perf logger is set
if (perf_logger)
perf_logger->inc(l_msgr_rdma_rx_bufs_total, nbufs);

/* initialize chunks */
ch = m->chunks;
for (unsigned i = 0; i < nbufs; i++) {
ch->lkey = m->mr->lkey;
ch->bytes = manager->cct->_conf->ms_async_rdma_buffer_size;
ch->offset = 0;
ch->buffer = ch->data; // TODO: refactor tx and remove buffer
ch = reinterpret_cast<Chunk *>(reinterpret_cast<char *>(ch) + rx_buf_size);
}

return reinterpret_cast<char *>(m->chunks);
}


Infiniband::MemoryManager::MemoryManager(Device *d, ProtectionDomain *p, bool hugepage)
: device(d), pd(p)
void Infiniband::MemoryManager::RxAllocator::free(char * const block)
{
mem_info *m;

m = reinterpret_cast<mem_info *>(block) - 1;
n_bufs_allocated -= m->nbufs;
if (perf_logger)
perf_logger->dec(l_msgr_rdma_rx_bufs_total, m->nbufs);
ibv_dereg_mr(m->mr);
manager->free(m);
}

Infiniband::MemoryManager::MemoryManager(CephContext *c, Device *d, ProtectionDomain *p)
: cct(c), device(d), pd(p),
rxbuf_pool(sizeof(Chunk) + c->_conf->ms_async_rdma_buffer_size,
c->_conf->ms_async_rdma_receive_buffers > 0 ?
// if possible make initial pool size 2 * receive_queue_len
// that way there will be no pool expansion upon receive of the
// first packet.
(c->_conf->ms_async_rdma_receive_buffers < 2 * c->_conf->ms_async_rdma_receive_queue_len ?
c->_conf->ms_async_rdma_receive_buffers : 2 * c->_conf->ms_async_rdma_receive_queue_len) :
// rx pool is infinite, we can set any initial size that we want
2 * c->_conf->ms_async_rdma_receive_queue_len)
{
enabled_huge_page = hugepage;
RxAllocator::set_memory_manager(this);
// remember the setting because cct may not be available when
// global infiniband is destroyed
hp_enabled = cct->_conf->ms_async_rdma_enable_hugepage;
}

Infiniband::MemoryManager::~MemoryManager()
{
if (channel)
delete channel;
if (send)
delete send;
}

void* Infiniband::MemoryManager::malloc_huge_pages(size_t size)
void* Infiniband::MemoryManager::huge_pages_malloc(size_t size)
{
size_t real_size = ALIGN_TO_PAGE_SIZE(size + HUGE_PAGE_SIZE);
char *ptr = (char *)mmap(NULL, real_size, PROT_READ | PROT_WRITE,MAP_PRIVATE | MAP_ANONYMOUS |MAP_POPULATE | MAP_HUGETLB,-1, 0);
if (ptr == MAP_FAILED) {
ptr = (char *)malloc(real_size);
ptr = (char *)std::malloc(real_size);
if (ptr == NULL) return NULL;
real_size = 0;
}
*((size_t *)ptr) = real_size;
return ptr + HUGE_PAGE_SIZE;
}

void Infiniband::MemoryManager::free_huge_pages(void *ptr)
void Infiniband::MemoryManager::huge_pages_free(void *ptr)
{
if (ptr == NULL) return;
void *real_ptr = (char *)ptr -HUGE_PAGE_SIZE;
Expand All @@ -679,15 +737,30 @@ void Infiniband::MemoryManager::free_huge_pages(void *ptr)
if (real_size != 0)
munmap(real_ptr, real_size);
else
free(real_ptr);
std::free(real_ptr);
}

void Infiniband::MemoryManager::register_rx_tx(uint32_t size, uint32_t rx_num, uint32_t tx_num)

void* Infiniband::MemoryManager::malloc(size_t size)
{
if (hp_enabled)
return huge_pages_malloc(size);
else
return std::malloc(size);
}

void Infiniband::MemoryManager::free(void *ptr)
{
if (hp_enabled)
huge_pages_free(ptr);
else
std::free(ptr);
}

void Infiniband::MemoryManager::create_tx_pool(uint32_t size, uint32_t tx_num)
{
assert(device);
assert(pd);
channel = new Cluster(*this, size);
channel->fill(rx_num);

send = new Cluster(*this, size);
send->fill(tx_num);
Expand All @@ -703,12 +776,6 @@ int Infiniband::MemoryManager::get_send_buffers(std::vector<Chunk*> &c, size_t b
return send->get_buffers(c, bytes);
}

int Infiniband::MemoryManager::get_channel_buffers(std::vector<Chunk*> &chunks, size_t bytes)
{
return channel->get_buffers(chunks, bytes);
}


Infiniband::Infiniband(CephContext *cct, const std::string &device_name, uint8_t port_num)
: cct(cct), lock("IB lock"), device_name(device_name), port_num(port_num)
{
Expand All @@ -731,33 +798,44 @@ void Infiniband::init()
pd = new ProtectionDomain(cct, device);
assert(NetHandler(cct).set_nonblock(device->ctxt->async_fd) == 0);

max_recv_wr = device->device_attr->max_srq_wr;
if (max_recv_wr > cct->_conf->ms_async_rdma_receive_buffers) {
max_recv_wr = cct->_conf->ms_async_rdma_receive_buffers;
ldout(cct, 1) << __func__ << " assigning: " << max_recv_wr << " receive buffers" << dendl;
rx_queue_len = device->device_attr->max_srq_wr;
if (rx_queue_len > cct->_conf->ms_async_rdma_receive_queue_len) {
rx_queue_len = cct->_conf->ms_async_rdma_receive_queue_len;
ldout(cct, 1) << __func__ << " receive queue length is " << rx_queue_len << " receive buffers" << dendl;
} else {
ldout(cct, 1) << __func__ << " using the max allowed receive buffers: " << max_recv_wr << dendl;
ldout(cct, 0) << __func__ << " requested receive queue length " <<
cct->_conf->ms_async_rdma_receive_queue_len <<
" is too big. Setting " << rx_queue_len << dendl;
}

// check for the misconfiguration
if (cct->_conf->ms_async_rdma_receive_buffers > 0 &&
rx_queue_len > (unsigned)cct->_conf->ms_async_rdma_receive_buffers) {
lderr(cct) << __func__ << " rdma_receive_queue_len (" <<
rx_queue_len << ") > ms_async_rdma_receive_buffers(" <<
cct->_conf->ms_async_rdma_receive_buffers << ")." << dendl;
ceph_abort();
}

max_send_wr = device->device_attr->max_qp_wr;
if (max_send_wr > cct->_conf->ms_async_rdma_send_buffers) {
max_send_wr = cct->_conf->ms_async_rdma_send_buffers;
ldout(cct, 1) << __func__ << " assigning: " << max_send_wr << " send buffers" << dendl;
MemoryManager::RxAllocator::set_max_bufs(cct->_conf->ms_async_rdma_receive_buffers);

tx_queue_len = device->device_attr->max_qp_wr;
if (tx_queue_len > cct->_conf->ms_async_rdma_send_buffers) {
tx_queue_len = cct->_conf->ms_async_rdma_send_buffers;
ldout(cct, 1) << __func__ << " assigning: " << tx_queue_len << " send buffers" << dendl;
} else {
ldout(cct, 1) << __func__ << " using the max allowed send buffers: " << max_send_wr << dendl;
ldout(cct, 0) << __func__ << " using the max allowed send buffers: " << tx_queue_len << dendl;
}

ldout(cct, 1) << __func__ << " device allow " << device->device_attr->max_cqe
<< " completion entries" << dendl;

memory_manager = new MemoryManager(device, pd,
cct->_conf->ms_async_rdma_enable_hugepage);
memory_manager->register_rx_tx(
cct->_conf->ms_async_rdma_buffer_size, max_recv_wr, max_send_wr);
memory_manager = new MemoryManager(cct, device, pd);
memory_manager->create_tx_pool(cct->_conf->ms_async_rdma_buffer_size, tx_queue_len);

srq = create_shared_receive_queue(max_recv_wr, MAX_SHARED_RX_SGE_COUNT);
post_channel_cluster();
srq = create_shared_receive_queue(rx_queue_len, MAX_SHARED_RX_SGE_COUNT);

post_chunks_to_srq(rx_queue_len); //add to srq
dispatcher->polling_start();
}

Expand All @@ -779,6 +857,8 @@ void Infiniband::set_dispatcher(RDMADispatcher *d)
assert(!d ^ !dispatcher);

dispatcher = d;
if (dispatcher != nullptr)
MemoryManager::RxAllocator::set_perf_logger(dispatcher->perf_logger);
}

/**
Expand Down Expand Up @@ -819,45 +899,44 @@ int Infiniband::get_tx_buffers(std::vector<Chunk*> &c, size_t bytes)
Infiniband::QueuePair* Infiniband::create_queue_pair(CephContext *cct, CompletionQueue *tx, CompletionQueue* rx, ibv_qp_type type)
{
Infiniband::QueuePair *qp = new QueuePair(
cct, *this, type, ib_physical_port, srq, tx, rx, max_send_wr, max_recv_wr);
cct, *this, type, ib_physical_port, srq, tx, rx, tx_queue_len, rx_queue_len);
if (qp->init()) {
delete qp;
return NULL;
}
return qp;
}

int Infiniband::post_chunk(Chunk* chunk)
void Infiniband::post_chunks_to_srq(int num)
{
ibv_sge isge;
isge.addr = reinterpret_cast<uint64_t>(chunk->buffer);
isge.length = chunk->bytes;
isge.lkey = chunk->mr->lkey;
ibv_recv_wr rx_work_request;

memset(&rx_work_request, 0, sizeof(rx_work_request));
rx_work_request.wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
rx_work_request.next = NULL;
rx_work_request.sg_list = &isge;
rx_work_request.num_sge = 1;

ibv_recv_wr *badWorkRequest;
int ret = ibv_post_srq_recv(srq, &rx_work_request, &badWorkRequest);
if (ret)
return -errno;
return 0;
}
int ret, i = 0;
ibv_sge isge[num];
Chunk *chunk;
ibv_recv_wr rx_work_request[num];

int Infiniband::post_channel_cluster()
{
vector<Chunk*> free_chunks;
int r = memory_manager->get_channel_buffers(free_chunks, 0);
assert(r > 0);
for (vector<Chunk*>::iterator iter = free_chunks.begin(); iter != free_chunks.end(); ++iter) {
r = post_chunk(*iter);
assert(r == 0);
while (i < num) {
chunk = get_memory_manager()->get_rx_buffer();

assert (chunk != NULL);

isge[i].addr = reinterpret_cast<uint64_t>(chunk->data);
isge[i].length = chunk->bytes;
isge[i].lkey = chunk->lkey;

memset(&rx_work_request[i], 0, sizeof(rx_work_request[i]));
rx_work_request[i].wr_id = reinterpret_cast<uint64_t>(chunk);// stash descriptor ptr
if (i == num - 1) {
rx_work_request[i].next = 0;
} else {
rx_work_request[i].next = &rx_work_request[i+1];
}
rx_work_request[i].sg_list = &isge[i];
rx_work_request[i].num_sge = 1;
i++;
}
return 0;
ibv_recv_wr *badworkrequest;
ret = ibv_post_srq_recv(srq, &rx_work_request[0], &badworkrequest);
assert(ret == 0);
}

Infiniband::CompletionChannel* Infiniband::create_comp_channel(CephContext *c)
Expand Down

0 comments on commit bb772b0

Please sign in to comment.