diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc index 45683ee2705004..fa04045960791e 100644 --- a/src/msg/async/rdma/RDMAConnectedSocketImpl.cc +++ b/src/msg/async/rdma/RDMAConnectedSocketImpl.cc @@ -35,10 +35,13 @@ RDMAConnectedSocketImpl::RDMAConnectedSocketImpl(CephContext *cct, Infiniband* i my_msg.peer_qpn = 0; my_msg.gid = infiniband->get_gid(); notify_fd = dispatcher->register_qp(qp, this); + worker->perf_logger->inc(l_msgr_rdma_created_queue_pair); + worker->perf_logger->inc(l_msgr_rdma_active_queue_pair); } RDMAConnectedSocketImpl::~RDMAConnectedSocketImpl() { + worker->perf_logger->dec(l_msgr_rdma_active_queue_pair); worker->remove_pending_conn(this); dispatcher->erase_qpn(my_msg.qpn); cleanup(); @@ -192,8 +195,11 @@ void RDMAConnectedSocketImpl::handle_connection() { ldout(cct, 20) << __func__ << " QP: " << my_msg.qpn << " tcp_fd: " << tcp_fd << " fd: " << notify_fd << dendl; int r = infiniband->recv_msg(cct, tcp_fd, peer_msg); if (r < 0) { - if (r != -EAGAIN) + if (r != -EAGAIN) { + worker->perf_logger->inc(l_msgr_rdma_handshake_errors); + ldout(cct, 1) << __func__ << " recv handshake msg failed." << dendl; fault(); + } return; } @@ -209,6 +215,7 @@ void RDMAConnectedSocketImpl::handle_connection() { r = infiniband->send_msg(cct, tcp_fd, my_msg); if (r < 0) { ldout(cct, 1) << __func__ << " send client ack failed." << dendl; + worker->perf_logger->inc(l_msgr_rdma_handshake_errors); fault(); } } else { @@ -220,6 +227,7 @@ void RDMAConnectedSocketImpl::handle_connection() { r = infiniband->send_msg(cct, tcp_fd, my_msg); if (r < 0) { ldout(cct, 1) << __func__ << " server ack failed." << dendl; + worker->perf_logger->inc(l_msgr_rdma_handshake_errors); fault(); return ; } @@ -258,6 +266,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) ldout(cct, 25) << __func__ << " chunk length: " << response->byte_len << " bytes." << chunk << dendl; chunk->prepare_read(response->byte_len); if (response->byte_len == 0) { + worker->perf_logger->inc(l_msgr_rdma_rx_fin); if (connected) { error = ECONNRESET; assert(infiniband->post_chunk(chunk) == 0); @@ -265,6 +274,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) } break; } + worker->perf_logger->inc(l_msgr_rdma_rx_bytes, response->byte_len); //assert(response->byte_len); if (read == (ssize_t)len) { buffers.push_back(chunk); @@ -279,6 +289,7 @@ ssize_t RDMAConnectedSocketImpl::read(char* buf, size_t len) } } + worker->perf_logger->inc(l_msgr_rdma_rx_chunks, cqe.size()); if (is_server && connected == 0) { ldout(cct, 20) << __func__ << " we do not need last handshake, QP: " << my_msg.qpn << " peer QP: " << peer_msg.qpn << dendl; connected = 1; //if so, we don't need the last handshake @@ -401,7 +412,8 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) int ret = worker->reserve_message_buffer(this, tx_buffers, bytes); if (ret == 0) { - ldout(cct, 10) << __func__ << " no enough buffers in worker " << worker << dendl; + ldout(cct, 1) << __func__ << " no enough buffers in worker " << worker << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_no_mem); return -EAGAIN; // that is ok , cause send will return bytes. == 0 enough buffers, < 0 no buffer, >0 not enough } vector::iterator current_buffer = tx_buffers.begin(); @@ -427,6 +439,7 @@ ssize_t RDMAConnectedSocketImpl::submit(bool more) assert(total <= pending_bl.length()); bufferlist swapped; if (total < pending_bl.length()) { + worker->perf_logger->inc(l_msgr_rdma_tx_parital_mem); pending_bl.splice(total, pending_bl.length()-total, &swapped); pending_bl.swap(swapped); } else { @@ -474,6 +487,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) ldout(cct, 20) << __func__ << " send_inline." << dendl; }*/ + worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length); if (pre_wr) pre_wr->next = &iswr[current_swr]; pre_wr = &iswr[current_swr]; @@ -484,11 +498,13 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector &tx_buffers) ibv_send_wr *bad_tx_work_request; if (ibv_post_send(qp->get_qp(), iswr, &bad_tx_work_request)) { - lderr(cct) << __func__ << " failed to send data" - << " (most probably should be peer not ready): " - << cpp_strerror(errno) << dendl; + ldout(cct, 1) << __func__ << " failed to send data" + << " (most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_failed); return -errno; } + worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size()); ldout(cct, 20) << __func__ << " qp state is : " << Infiniband::qp_state_string(qp->get_state()) << dendl; return 0; } @@ -502,9 +518,10 @@ void RDMAConnectedSocketImpl::fin() { wr.send_flags = IBV_SEND_SIGNALED; ibv_send_wr* bad_tx_work_request; if (ibv_post_send(qp->get_qp(), &wr, &bad_tx_work_request)) { - lderr(cct) << __func__ << " failed to send message=" - << " ibv_post_send failed(most probably should be peer not ready): " - << cpp_strerror(errno) << dendl; + ldout(cct, 1) << __func__ << " failed to send message=" + << " ibv_post_send failed(most probably should be peer not ready): " + << cpp_strerror(errno) << dendl; + worker->perf_logger->inc(l_msgr_rdma_tx_failed); return ; } } diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index 16e996c8e2a82b..9643c426a029d6 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -55,13 +55,26 @@ RDMADispatcher::RDMADispatcher(CephContext* c, Infiniband* i, RDMAStack* s) assert(rx_cc); rx_cq = ib->create_comp_queue(c, rx_cc); assert(rx_cq); + + PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last); + + plb.add_u64_counter(l_msgr_rdma_polling, "polling", "Whether dispatcher thread is polling"); + plb.add_u64_counter(l_msgr_rdma_inflight_tx_chunks, "inflight_tx_chunks", "The number of inflight tx chunks"); + plb.add_u64_counter(l_msgr_rdma_rx_total_wc, "rx_total_wc", "The number of total rx work completion"); + plb.add_u64_counter(l_msgr_rdma_rx_total_wc_errors, "rx_total_wc_errors", "The number of total rx error work completion"); + plb.add_u64_counter(l_msgr_rdma_total_async_events, "total_async_events", "The number of async events"); + plb.add_u64_counter(l_msgr_rdma_async_last_wqe_events, "async_last_wqe_events", "The number of last wqe events"); + + perf_logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(perf_logger); + t = std::thread(&RDMADispatcher::polling, this); cct->register_fork_watcher(this); } void RDMADispatcher::handle_async_event() { - ldout(cct, 20) << __func__ << dendl; + ldout(cct, 30) << __func__ << dendl; while (1) { ibv_async_event async_event; if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) { @@ -70,9 +83,11 @@ void RDMADispatcher::handle_async_event() << " " << cpp_strerror(errno) << ")" << dendl; return; } + perf_logger->inc(l_msgr_rdma_total_async_events); // FIXME: Currently we must ensure no other factor make QP in ERROR state, // otherwise this qp can't be deleted in current cleanup flow. if (async_event.event_type == IBV_EVENT_QP_LAST_WQE_REACHED) { + perf_logger->inc(l_msgr_rdma_async_last_wqe_events); uint64_t qpn = async_event.element.qp->qp_num; ldout(cct, 10) << __func__ << " event associated qp=" << async_event.element.qp << " evt: " << ibv_event_type_str(async_event.event_type) << dendl; @@ -114,6 +129,7 @@ void RDMADispatcher::polling() // for dead_queue_pairs). // Additionally, don't delete qp while outstanding_buffers isn't empty, // because we need to check qp's state before sending + perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight); if (!inflight.load()) { Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms while (!dead_queue_pairs.empty()) { @@ -122,11 +138,11 @@ void RDMADispatcher::polling() dead_queue_pairs.pop_back(); } } - handle_async_event(); if (done) break; if ((ceph_clock_now() - last_inactive).to_nsec() / 1000 > cct->_conf->ms_async_rdma_polling_us) { + handle_async_event(); if (!rearmed) { // Clean up cq events after rearm notify ensure no new incoming event // arrived between polling and rearm @@ -140,6 +156,7 @@ void RDMADispatcher::polling() channel_poll.events = POLLIN | POLLERR | POLLNVAL | POLLHUP; channel_poll.revents = 0; int r = 0; + perf_logger->set(l_msgr_rdma_polling, 0); while (!done && r == 0) { r = poll(&channel_poll, 1, 1); if (r < 0) { @@ -151,6 +168,7 @@ void RDMADispatcher::polling() if (r > 0 && rx_cc->get_cq_event()) ldout(cct, 20) << __func__ << " got cq event." << dendl; last_inactive = ceph_clock_now(); + perf_logger->set(l_msgr_rdma_polling, 1); rearmed = false; } continue; @@ -158,12 +176,14 @@ void RDMADispatcher::polling() ldout(cct, 20) << __func__ << " pool completion queue got " << n << " responses."<< dendl; + perf_logger->inc(l_msgr_rdma_rx_total_wc, n); Mutex::Locker l(lock);//make sure connected socket alive when pass wc for (int i = 0; i < n; ++i) { ibv_wc* response = &wc[i]; Chunk* chunk = reinterpret_cast(response->wr_id); if (response->status != IBV_WC_SUCCESS) { + perf_logger->inc(l_msgr_rdma_rx_total_wc_errors); ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk << ") status(" << response->status << ":" << ib->wc_status_to_string(response->status) << dendl; @@ -295,6 +315,33 @@ RDMAWorker::RDMAWorker(CephContext *c, unsigned i) : Worker(c, i), stack(nullptr), infiniband(NULL), tx_handler(new C_handle_cq_tx(this)), memory_manager(NULL), lock("RDMAWorker::lock"), pended(false) { + // initialize perf_logger + char name[128]; + sprintf(name, "AsyncMessenger::RDMAWorker-%u", id); + PerfCountersBuilder plb(cct, name, l_msgr_rdma_first, l_msgr_rdma_last); + + plb.add_u64_counter(l_msgr_rdma_handshake_errors, "handshake_errors", "The number of handshake errors"); + + plb.add_u64_counter(l_msgr_rdma_tx_total_wc, "tx_total_wc", "The number of tx work comletions"); + plb.add_u64_counter(l_msgr_rdma_tx_total_wc_errors, "tx_total_wc_errors", "The number of tx errors"); + plb.add_u64_counter(l_msgr_rdma_tx_wc_retry_errors, "tx_retry_errors", "The number of tx retry errors"); + plb.add_u64_counter(l_msgr_rdma_tx_wc_wr_flush_errors, "tx_wr_flush_errors", "The number of tx work request flush errors"); + + plb.add_u64_counter(l_msgr_rdma_tx_no_mem, "tx_no_mem", "The count of no tx buffer"); + plb.add_u64_counter(l_msgr_rdma_tx_parital_mem, "tx_parital_mem", "The count of parital tx buffer"); + plb.add_u64_counter(l_msgr_rdma_tx_failed, "tx_failed_post", "The number of tx failed posted"); + plb.add_u64_counter(l_msgr_rdma_rx_fin, "rx_fin", "The number of rx finish work request"); + + plb.add_u64_counter(l_msgr_rdma_tx_chunks, "tx_chunks", "The number of tx chunks transmitted"); + plb.add_u64_counter(l_msgr_rdma_tx_bytes, "tx_bytes", "The bytes of tx chunks transmitted"); + plb.add_u64_counter(l_msgr_rdma_rx_chunks, "rx_chunks", "The number of rx chunks transmitted"); + plb.add_u64_counter(l_msgr_rdma_rx_bytes, "rx_bytes", "The bytes of rx chunks transmitted"); + + plb.add_u64_counter(l_msgr_rdma_created_queue_pair, "created_queue_pair", "Active queue pair number"); + plb.add_u64_counter(l_msgr_rdma_active_queue_pair, "active_queue_pair", "Created queue pair number"); + + perf_logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(perf_logger); } RDMAWorker::~RDMAWorker() @@ -441,12 +488,15 @@ void RDMAWorker::handle_tx_event() ldout(cct, 25) << __func__ << " QP: " << response->qp_num << " len: " << response->byte_len << " , addr:" << chunk << " " << infiniband->wc_status_to_string(response->status) << dendl; if (response->status != IBV_WC_SUCCESS) { + perf_logger->inc(l_msgr_rdma_tx_total_wc_errors); if (response->status == IBV_WC_RETRY_EXC_ERR) { ldout(cct, 1) << __func__ << " connection between server and client not working. Disconnect this now" << dendl; + perf_logger->inc(l_msgr_rdma_tx_wc_retry_errors); } else if (response->status == IBV_WC_WR_FLUSH_ERR) { ldout(cct, 1) << __func__ << " Work Request Flushed Error: this connection's qp=" << response->qp_num << " should be down while this WR=" << response->wr_id << " still in flight." << dendl; + perf_logger->inc(l_msgr_rdma_tx_wc_wr_flush_errors); } else { ldout(cct, 1) << __func__ << " send work request returned error for buffer(" << response->wr_id << ") status(" << response->status << "): " @@ -469,6 +519,7 @@ void RDMAWorker::handle_tx_event() } } + perf_logger->inc(l_msgr_rdma_tx_total_wc, cqe.size()); post_tx_buffer(tx_chunks); ldout(cct, 20) << __func__ << " give back " << tx_chunks.size() << " in Worker " << this << dendl; @@ -489,6 +540,7 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t) w->set_ib(global_infiniband); w->set_stack(this); } + ldout(cct, 20) << " creating RDMAStack:" << this << " with dispatcher:" << dispatcher << dendl; } diff --git a/src/msg/async/rdma/RDMAStack.h b/src/msg/async/rdma/RDMAStack.h index b3c726d90d689b..ea2eb2048498cb 100644 --- a/src/msg/async/rdma/RDMAStack.h +++ b/src/msg/async/rdma/RDMAStack.h @@ -34,6 +34,22 @@ class RDMAServerSocketImpl; class RDMAStack; class RDMAWorker; +enum { + l_msgr_rdma_dispatcher_first = 94000, + + l_msgr_rdma_polling, + l_msgr_rdma_inflight_tx_chunks, + + l_msgr_rdma_rx_total_wc, + l_msgr_rdma_rx_total_wc_errors, + + l_msgr_rdma_total_async_events, + l_msgr_rdma_async_last_wqe_events, + + l_msgr_rdma_dispatcher_last, +}; + + class RDMADispatcher : public CephContext::ForkWatcher { typedef Infiniband::MemoryManager::Chunk Chunk; typedef Infiniband::QueuePair QueuePair; @@ -41,6 +57,7 @@ class RDMADispatcher : public CephContext::ForkWatcher { std::thread t; CephContext *cct; Infiniband* ib; + PerfCounters *perf_logger; Infiniband::CompletionQueue* rx_cq; // common completion queue for all transmits Infiniband::CompletionChannel* rx_cc; EventCallbackRef async_handler; @@ -105,6 +122,32 @@ class RDMADispatcher : public CephContext::ForkWatcher { }; +enum { + l_msgr_rdma_first = 95000, + + l_msgr_rdma_handshake_errors, + + l_msgr_rdma_tx_total_wc, + l_msgr_rdma_tx_total_wc_errors, + l_msgr_rdma_tx_wc_retry_errors, + l_msgr_rdma_tx_wc_wr_flush_errors, + + l_msgr_rdma_tx_no_mem, + l_msgr_rdma_tx_parital_mem, + l_msgr_rdma_tx_failed, + l_msgr_rdma_rx_fin, + + l_msgr_rdma_tx_chunks, + l_msgr_rdma_tx_bytes, + l_msgr_rdma_rx_chunks, + l_msgr_rdma_rx_bytes, + + l_msgr_rdma_created_queue_pair, + l_msgr_rdma_active_queue_pair, + + l_msgr_rdma_last, +}; + class RDMAWorker : public Worker { typedef Infiniband::CompletionQueue CompletionQueue; typedef Infiniband::CompletionChannel CompletionChannel; @@ -132,6 +175,7 @@ class RDMAWorker : public Worker { }; public: + PerfCounters *perf_logger; explicit RDMAWorker(CephContext *c, unsigned i); virtual ~RDMAWorker(); void notify(); @@ -245,6 +289,7 @@ class RDMAServerSocketImpl : public ServerSocketImpl { class RDMAStack : public NetworkStack { vector threads; RDMADispatcher *dispatcher; + PerfCounters *perf_counter; public: explicit RDMAStack(CephContext *cct, const string &t);