Skip to content

Commit

Permalink
Merge pull request #13484 from yuyuyu101/wip-rdma-perf-counter
Browse files Browse the repository at this point in the history
msg/async/rdma: add perf counters to RDMA backend

Reviewed-by: Adir lev <adirl@mellanox.com>
  • Loading branch information
yuyuyu101 committed Feb 18, 2017
2 parents 408053a + 4225e2b commit a6db737
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 10 deletions.
33 changes: 25 additions & 8 deletions src/msg/async/rdma/RDMAConnectedSocketImpl.cc
Expand Up @@ -35,11 +35,14 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* i
my_msg.peer_qpn = 0;
my_msg.gid = infiniband->get_gid();
notify_fd = dispatcher->register_qp(qp, this);
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
}

RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
{
ldout(cct, 20) << __func__ << " destruct." << dendl;
dispatcher->perf_logger->dec(l_msgr_rdma_active_queue_pair);
worker->remove_pending_conn(this);
dispatcher->erase_qpn(my_msg.qpn);
cleanup();
Expand Down Expand Up @@ -193,8 +196,11 @@ void RDMAConnectedSocketImpl::handle_connection() {
ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;
int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
if (r < 0) {
if (r != -EAGAIN)
if (r != -EAGAIN) {
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
fault();
}
return;
}

Expand All @@ -210,6 +216,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
r = infiniband->send_msg(cct, tcp_fd, my_msg);
if (r < 0) {
ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
fault();
}
} else {
Expand All @@ -221,6 +228,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
r = infiniband->send_msg(cct, tcp_fd, my_msg);
if (r < 0) {
ldout(cct, 1) << __func__ << " server ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
fault();
return ;
}
Expand Down Expand Up @@ -259,13 +267,15 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
chunk->prepare_read(response->byte_len);
if (response->byte_len == 0) {
dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
if (connected) {
error = ECONNRESET;
assert(infiniband->post_chunk(chunk) == 0);
ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
}
break;
}
worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
//assert(response->byte_len);
if (read == (ssize_t)len) {
buffers.push_back(chunk);
Expand All @@ -280,6 +290,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
}
}

worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
if (is_server && connected == 0) {
ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
connected = 1; //if so, we don't need the last handshake
Expand Down Expand Up @@ -402,7 +413,8 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)

int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);
if (ret == 0) {
ldout(cct, 10) << __func__ << " no enough buffers in worker " << worker << dendl;
ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough
}
vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
Expand All @@ -428,6 +440,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
assert(total <= pending_bl.length());
bufferlist swapped;
if (total < pending_bl.length()) {
worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
pending_bl.splice(total, pending_bl.length()-total, &swapped);
pending_bl.swap(swapped);
} else {
Expand Down Expand Up @@ -475,6 +488,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
ldout(cct, 20) << __func__ << " send_inline." << dendl;
}*/

worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
if (pre_wr)
pre_wr->next = &iswr[current_swr];
pre_wr = &iswr[current_swr];
Expand All @@ -485,11 +499,13 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)

ibv_send_wr *bad_tx_work_request;
if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
lderr(cct) << __func__ << " failed to send data"
<< " (most probably should be peer not ready): "
<< cpp_strerror(errno) << dendl;
ldout(cct, 1) << __func__ << " failed to send data"
<< " (most probably should be peer not ready): "
<< cpp_strerror(errno) << dendl;
worker->perf_logger->inc(l_msgr_rdma_tx_failed);
return -errno;
}
worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl;
return 0;
}
Expand All @@ -503,9 +519,10 @@ void RDMAConnectedSocketImpl::fin() {
wr.send_flags = IBV_SEND_SIGNALED;
ibv_send_wr* bad_tx_work_request;
if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
lderr(cct) << __func__ << " failed to send message="
<< " ibv_post_send failed(most probably should be peer not ready): "
<< cpp_strerror(errno) << dendl;
ldout(cct, 1) << __func__ << " failed to send message="
<< " ibv_post_send failed(most probably should be peer not ready): "
<< cpp_strerror(errno) << dendl;
worker->perf_logger->inc(l_msgr_rdma_tx_failed);
return ;
}
}
Expand Down
59 changes: 57 additions & 2 deletions src/msg/async/rdma/RDMAStack.cc
Expand Up @@ -55,13 +55,35 @@ RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
assert(rx_cc);
rx_cq = ib->create_comp_queue(c, rx_cc);
assert(rx_cq);

PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);

plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");

plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");

plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events");
plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events");

plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors");


plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number");
plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number");

perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);

t = std::thread(&RDMADispatcher::polling, this);
cct->register_fork_watcher(this);
}

void RDMADispatcher::handle_async_event()
{
ldout(cct, 20) << __func__ << dendl;
ldout(cct, 30) << __func__ << dendl;
while (1) {
ibv_async_event async_event;
if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
Expand All @@ -70,9 +92,11 @@ void RDMADispatcher::handle_async_event()
<< " " << cpp_strerror(errno) << ")" << dendl;
return;
}
perf_logger->inc(l_msgr_rdma_total_async_events);
// FIXME: Currently we must ensure no other factor make QP in ERROR state,
// otherwise this qp can't be deleted in current cleanup flow.
if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
uint64_t qpn = async_event.element.qp->qp_num;
ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
<< " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
Expand Down Expand Up @@ -114,6 +138,7 @@ void RDMADispatcher::polling()
// for dead_queue_pairs).
// Additionally, don't delete qp while outstanding_buffers isn't empty,
// because we need to check qp's state before sending
perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
if (!inflight.load()) {
Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
while (!dead_queue_pairs.empty()) {
Expand All @@ -122,11 +147,11 @@ void RDMADispatcher::polling()
dead_queue_pairs.pop_back();
}
}
handle_async_event();
if (done)
break;

if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
handle_async_event();
if (!rearmed) {
// Clean up cq events after rearm notify ensure no new incoming event
// arrived between polling and rearm
Expand All @@ -140,6 +165,7 @@ void RDMADispatcher::polling()
channel_poll.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
channel_poll.revents = 0;
int r = 0;
perf_logger->set(l_msgr_rdma_polling, 0);
while (!done && r == 0) {
r = poll(&channel_poll, 1, 1);
if (r < 0) {
Expand All @@ -151,19 +177,22 @@ void RDMADispatcher::polling()
if (r > 0 && rx_cc->get_cq_event())
ldout(cct, 20) << __func__ << " got cq event." << dendl;
last_inactive = ceph_clock_now();
perf_logger->set(l_msgr_rdma_polling, 1);
rearmed = false;
}
continue;
}

ldout(cct, 20) << __func__ << " pool completion queue got " << n
<< " responses."<< dendl;
perf_logger->inc(l_msgr_rdma_rx_total_wc, n);
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
for (int i = 0; i < n; ++i) {
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);

if (response->status != IBV_WC_SUCCESS) {
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
<< ib->wc_status_to_string(response->status) << dendl;
Expand Down Expand Up @@ -295,6 +324,27 @@ RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
: Worker(c, i), stack(nullptr), infiniband(NULL),
tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock"), pended(false)
{
// initialize perf_logger
char name[128];
sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);

plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");

plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");

plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");
plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted");

perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);
}

RDMAWorker::~RDMAWorker()
Expand Down Expand Up @@ -441,12 +491,15 @@ void RDMAWorker::handle_tx_event()
ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl;

if (response->status != IBV_WC_SUCCESS) {
perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
if (response->status == IBV_WC_RETRY_EXC_ERR) {
ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
} else if (response->status == IBV_WC_WR_FLUSH_ERR) {
ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
<< response->qp_num << " should be down while this WR=" << response->wr_id
<< " still in flight." << dendl;
perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
} else {
ldout(cct, 1) << __func__ << " send work request returned error for buffer("
<< response->wr_id << ") status(" << response->status << "): "
Expand All @@ -469,6 +522,7 @@ void RDMAWorker::handle_tx_event()
}
}

perf_logger->inc(l_msgr_rdma_tx_total_wc, cqe.size());
post_tx_buffer(tx_chunks);

ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl;
Expand All @@ -489,6 +543,7 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
w->set_ib(global_infiniband);
w->set_stack(this);
}

ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
}

Expand Down
46 changes: 46 additions & 0 deletions src/msg/async/rdma/RDMAStack.h
Expand Up @@ -34,6 +34,28 @@ class RDMAServerSocketImpl;
class RDMAStack;
class RDMAWorker;

enum {
l_msgr_rdma_dispatcher_first = 94000,

l_msgr_rdma_polling,
l_msgr_rdma_inflight_tx_chunks,

l_msgr_rdma_rx_total_wc,
l_msgr_rdma_rx_total_wc_errors,
l_msgr_rdma_rx_fin,

l_msgr_rdma_handshake_errors,

l_msgr_rdma_total_async_events,
l_msgr_rdma_async_last_wqe_events,

l_msgr_rdma_created_queue_pair,
l_msgr_rdma_active_queue_pair,

l_msgr_rdma_dispatcher_last,
};


class RDMADispatcher : public CephContext::ForkWatcher {
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::QueuePair QueuePair;
Expand Down Expand Up @@ -84,6 +106,8 @@ class RDMADispatcher : public CephContext::ForkWatcher {
};

public:
PerfCounters *perf_logger;

explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s);
virtual ~RDMADispatcher();
void handle_async_event();
Expand All @@ -105,6 +129,26 @@ class RDMADispatcher : public CephContext::ForkWatcher {
};


enum {
l_msgr_rdma_first = 95000,

l_msgr_rdma_tx_total_wc,
l_msgr_rdma_tx_total_wc_errors,
l_msgr_rdma_tx_wc_retry_errors,
l_msgr_rdma_tx_wc_wr_flush_errors,

l_msgr_rdma_tx_no_mem,
l_msgr_rdma_tx_parital_mem,
l_msgr_rdma_tx_failed,

l_msgr_rdma_tx_chunks,
l_msgr_rdma_tx_bytes,
l_msgr_rdma_rx_chunks,
l_msgr_rdma_rx_bytes,

l_msgr_rdma_last,
};

class RDMAWorker : public Worker {
typedef Infiniband::CompletionQueue CompletionQueue;
typedef Infiniband::CompletionChannel CompletionChannel;
Expand Down Expand Up @@ -132,6 +176,7 @@ class RDMAWorker : public Worker {
};

public:
PerfCounters *perf_logger;
explicit RDMAWorker(CephContext *c, unsigned i);
virtual ~RDMAWorker();
void notify();
Expand Down Expand Up @@ -245,6 +290,7 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
class RDMAStack : public NetworkStack {
vector<std::thread> threads;
RDMADispatcher *dispatcher;
PerfCounters *perf_counter;

public:
explicit RDMAStack(CephContext *cct, const string &t);
Expand Down

0 comments on commit a6db737

Please sign in to comment.