Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msg/async/rdma: add perf counters to RDMA backend #13484

Merged
merged 1 commit into from Feb 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 25 additions & 8 deletions src/msg/async/rdma/RDMAConnectedSocketImpl.cc
Expand Up @@ -35,11 +35,14 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* i
my_msg.peer_qpn = 0;
my_msg.gid = infiniband->get_gid();
notify_fd = dispatcher->register_qp(qp, this);
dispatcher->perf_logger->inc(l_msgr_rdma_created_queue_pair);
dispatcher->perf_logger->inc(l_msgr_rdma_active_queue_pair);
}

RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl()
{
ldout(cct, 20) << __func__ << " destruct." << dendl;
dispatcher->perf_logger->dec(l_msgr_rdma_active_queue_pair);
worker->remove_pending_conn(this);
dispatcher->erase_qpn(my_msg.qpn);
cleanup();
Expand Down Expand Up @@ -193,8 +196,11 @@ void RDMAConnectedSocketImpl::handle_connection() {
ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl;
int r = infiniband->recv_msg(cct, tcp_fd, peer_msg);
if (r < 0) {
if (r != -EAGAIN)
if (r != -EAGAIN) {
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl;
fault();
}
return;
}

Expand All @@ -210,6 +216,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
r = infiniband->send_msg(cct, tcp_fd, my_msg);
if (r < 0) {
ldout(cct, 1) << __func__ << " send client ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
fault();
}
} else {
Expand All @@ -221,6 +228,7 @@ void RDMAConnectedSocketImpl::handle_connection() {
r = infiniband->send_msg(cct, tcp_fd, my_msg);
if (r < 0) {
ldout(cct, 1) << __func__ << " server ack failed." << dendl;
dispatcher->perf_logger->inc(l_msgr_rdma_handshake_errors);
fault();
return ;
}
Expand Down Expand Up @@ -259,13 +267,15 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl;
chunk->prepare_read(response->byte_len);
if (response->byte_len == 0) {
dispatcher->perf_logger->inc(l_msgr_rdma_rx_fin);
if (connected) {
error = ECONNRESET;
assert(infiniband->post_chunk(chunk) == 0);
ldout(cct, 20) << __func__ << " got remote close msg..." << dendl;
}
break;
}
worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len);
//assert(response->byte_len);
if (read == (ssize_t)len) {
buffers.push_back(chunk);
Expand All @@ -280,6 +290,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len)
}
}

worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size());
if (is_server && connected == 0) {
ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl;
connected = 1; //if so, we don't need the last handshake
Expand Down Expand Up @@ -402,7 +413,8 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)

int ret = worker->reserve_message_buffer(this, tx_buffers, bytes);
if (ret == 0) {
ldout(cct, 10) << __func__ << " no enough buffers in worker " << worker << dendl;
ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl;
worker->perf_logger->inc(l_msgr_rdma_tx_no_mem);
return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough
}
vector<Chunk*>::iterator current_buffer = tx_buffers.begin();
Expand All @@ -428,6 +440,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more)
assert(total <= pending_bl.length());
bufferlist swapped;
if (total < pending_bl.length()) {
worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem);
pending_bl.splice(total, pending_bl.length()-total, &swapped);
pending_bl.swap(swapped);
} else {
Expand Down Expand Up @@ -475,6 +488,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
ldout(cct, 20) << __func__ << " send_inline." << dendl;
}*/

worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
if (pre_wr)
pre_wr->next = &iswr[current_swr];
pre_wr = &iswr[current_swr];
Expand All @@ -485,11 +499,13 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)

ibv_send_wr *bad_tx_work_request;
if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) {
lderr(cct) << __func__ << " failed to send data"
<< " (most probably should be peer not ready): "
<< cpp_strerror(errno) << dendl;
ldout(cct, 1) << __func__ << " failed to send data"
<< " (most probably should be peer not ready): "
<< cpp_strerror(errno) << dendl;
worker->perf_logger->inc(l_msgr_rdma_tx_failed);
return -errno;
}
worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl;
return 0;
}
Expand All @@ -503,9 +519,10 @@ void RDMAConnectedSocketImpl::fin() {
wr.send_flags = IBV_SEND_SIGNALED;
ibv_send_wr* bad_tx_work_request;
if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) {
lderr(cct) << __func__ << " failed to send message="
<< " ibv_post_send failed(most probably should be peer not ready): "
<< cpp_strerror(errno) << dendl;
ldout(cct, 1) << __func__ << " failed to send message="
<< " ibv_post_send failed(most probably should be peer not ready): "
<< cpp_strerror(errno) << dendl;
worker->perf_logger->inc(l_msgr_rdma_tx_failed);
return ;
}
}
Expand Down
59 changes: 57 additions & 2 deletions src/msg/async/rdma/RDMAStack.cc
Expand Up @@ -55,13 +55,35 @@ RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s)
assert(rx_cc);
rx_cq = ib->create_comp_queue(c, rx_cc);
assert(rx_cq);

PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);

plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling");
plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks");

plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion");
plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion");
plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request");

plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events");
plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events");

plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors");


plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number");
plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number");

perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);

t = std::thread(&RDMADispatcher::polling, this);
cct->register_fork_watcher(this);
}

void RDMADispatcher::handle_async_event()
{
ldout(cct, 20) << __func__ << dendl;
ldout(cct, 30) << __func__ << dendl;
while (1) {
ibv_async_event async_event;
if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
Expand All @@ -70,9 +92,11 @@ void RDMADispatcher::handle_async_event()
<< " " << cpp_strerror(errno) << ")" << dendl;
return;
}
perf_logger->inc(l_msgr_rdma_total_async_events);
// FIXME: Currently we must ensure no other factor make QP in ERROR state,
// otherwise this qp can't be deleted in current cleanup flow.
if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) {
perf_logger->inc(l_msgr_rdma_async_last_wqe_events);
uint64_t qpn = async_event.element.qp->qp_num;
ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp
<< " evt: " << ibv_event_type_str(async_event.event_type) << dendl;
Expand Down Expand Up @@ -114,6 +138,7 @@ void RDMADispatcher::polling()
// for dead_queue_pairs).
// Additionally, don't delete qp while outstanding_buffers isn't empty,
// because we need to check qp's state before sending
perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
if (!inflight.load()) {
Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
while (!dead_queue_pairs.empty()) {
Expand All @@ -122,11 +147,11 @@ void RDMADispatcher::polling()
dead_queue_pairs.pop_back();
}
}
handle_async_event();
if (done)
break;

if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) {
handle_async_event();
if (!rearmed) {
// Clean up cq events after rearm notify ensure no new incoming event
// arrived between polling and rearm
Expand All @@ -140,6 +165,7 @@ void RDMADispatcher::polling()
channel_poll.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
channel_poll.revents = 0;
int r = 0;
perf_logger->set(l_msgr_rdma_polling, 0);
while (!done && r == 0) {
r = poll(&channel_poll, 1, 1);
if (r < 0) {
Expand All @@ -151,19 +177,22 @@ void RDMADispatcher::polling()
if (r > 0 && rx_cc->get_cq_event())
ldout(cct, 20) << __func__ << " got cq event." << dendl;
last_inactive = ceph_clock_now();
perf_logger->set(l_msgr_rdma_polling, 1);
rearmed = false;
}
continue;
}

ldout(cct, 20) << __func__ << " pool completion queue got " << n
<< " responses."<< dendl;
perf_logger->inc(l_msgr_rdma_rx_total_wc, n);
Mutex::Locker l(lock);//make sure connected socket alive when pass wc
for (int i = 0; i < n; ++i) {
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);

if (response->status != IBV_WC_SUCCESS) {
perf_logger->inc(l_msgr_rdma_rx_total_wc_errors);
ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
<< ib->wc_status_to_string(response->status) << dendl;
Expand Down Expand Up @@ -295,6 +324,27 @@ RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
: Worker(c, i), stack(nullptr), infiniband(NULL),
tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock"), pended(false)
{
// initialize perf_logger
char name[128];
sprintf(name, "AsyncMessenger::RDMAWorker-%u", id);
PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last);

plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions");
plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors");
plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors");
plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors");

plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer");
plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted");

plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted");
plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted");
plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted");
plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted");

perf_logger = plb.create_perf_counters();
cct->get_perfcounters_collection()->add(perf_logger);
}

RDMAWorker::~RDMAWorker()
Expand Down Expand Up @@ -441,12 +491,15 @@ void RDMAWorker::handle_tx_event()
ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl;

if (response->status != IBV_WC_SUCCESS) {
perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
if (response->status == IBV_WC_RETRY_EXC_ERR) {
ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl;
perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors);
} else if (response->status == IBV_WC_WR_FLUSH_ERR) {
ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp="
<< response->qp_num << " should be down while this WR=" << response->wr_id
<< " still in flight." << dendl;
perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors);
} else {
ldout(cct, 1) << __func__ << " send work request returned error for buffer("
<< response->wr_id << ") status(" << response->status << "): "
Expand All @@ -469,6 +522,7 @@ void RDMAWorker::handle_tx_event()
}
}

perf_logger->inc(l_msgr_rdma_tx_total_wc, cqe.size());
post_tx_buffer(tx_chunks);

ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl;
Expand All @@ -489,6 +543,7 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
w->set_ib(global_infiniband);
w->set_stack(this);
}

ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl;
}

Expand Down
46 changes: 46 additions & 0 deletions src/msg/async/rdma/RDMAStack.h
Expand Up @@ -34,6 +34,28 @@ class RDMAServerSocketImpl;
class RDMAStack;
class RDMAWorker;

enum {
l_msgr_rdma_dispatcher_first = 94000,

l_msgr_rdma_polling,
l_msgr_rdma_inflight_tx_chunks,

l_msgr_rdma_rx_total_wc,
l_msgr_rdma_rx_total_wc_errors,
l_msgr_rdma_rx_fin,

l_msgr_rdma_handshake_errors,

l_msgr_rdma_total_async_events,
l_msgr_rdma_async_last_wqe_events,

l_msgr_rdma_created_queue_pair,
l_msgr_rdma_active_queue_pair,

l_msgr_rdma_dispatcher_last,
};


class RDMADispatcher : public CephContext::ForkWatcher {
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::QueuePair QueuePair;
Expand Down Expand Up @@ -84,6 +106,8 @@ class RDMADispatcher : public CephContext::ForkWatcher {
};

public:
PerfCounters *perf_logger;

explicit RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s);
virtual ~RDMADispatcher();
void handle_async_event();
Expand All @@ -105,6 +129,26 @@ class RDMADispatcher : public CephContext::ForkWatcher {
};


enum {
l_msgr_rdma_first = 95000,

l_msgr_rdma_tx_total_wc,
l_msgr_rdma_tx_total_wc_errors,
l_msgr_rdma_tx_wc_retry_errors,
l_msgr_rdma_tx_wc_wr_flush_errors,

l_msgr_rdma_tx_no_mem,
l_msgr_rdma_tx_parital_mem,
l_msgr_rdma_tx_failed,

l_msgr_rdma_tx_chunks,
l_msgr_rdma_tx_bytes,
l_msgr_rdma_rx_chunks,
l_msgr_rdma_rx_bytes,

l_msgr_rdma_last,
};

class RDMAWorker : public Worker {
typedef Infiniband::CompletionQueue CompletionQueue;
typedef Infiniband::CompletionChannel CompletionChannel;
Expand Down Expand Up @@ -132,6 +176,7 @@ class RDMAWorker : public Worker {
};

public:
PerfCounters *perf_logger;
explicit RDMAWorker(CephContext *c, unsigned i);
virtual ~RDMAWorker();
void notify();
Expand Down Expand Up @@ -245,6 +290,7 @@ class RDMAServerSocketImpl : public ServerSocketImpl {
class RDMAStack : public NetworkStack {
vector<std::thread> threads;
RDMADispatcher *dispatcher;
PerfCounters *perf_counter;

public:
explicit RDMAStack(CephContext *cct, const string &t);
Expand Down