Skip to content

Commit

Permalink
Merge pull request #18053 from ownedu/wip-fix-async-rdma-tx-buffer-le…
Browse files Browse the repository at this point in the history
…akage

msg/async/rdma: fix Tx buffer leakage that can introduce "heartbeat no reply"

Reviewed-by: Haomai Wang <haomai@xsky.com>
Reviewed-by: Alex Mikheev <alexm@mellanox.com>
  • Loading branch information
yuyuyu101 committed Oct 3, 2017
2 parents 0ee5afd + e323771 commit 5d6d138
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 5 deletions.
5 changes: 5 additions & 0 deletions src/msg/async/rdma/Infiniband.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <infiniband/verbs.h>

#include <atomic>
#include <string>
#include <vector>

Expand Down Expand Up @@ -464,6 +465,9 @@ class Infiniband {
* Return true if the queue pair is in an error state, false otherwise.
*/
bool is_error() const;
void add_tx_wr(uint32_t amt) { tx_wr_inflight += amt; }
void dec_tx_wr(uint32_t amt) { tx_wr_inflight -= amt; }
uint32_t get_tx_wr() const { return tx_wr_inflight; }
ibv_qp* get_qp() const { return qp; }
Infiniband::CompletionQueue* get_tx_cq() const { return txcq; }
Infiniband::CompletionQueue* get_rx_cq() const { return rxcq; }
Expand All @@ -486,6 +490,7 @@ class Infiniband {
uint32_t max_recv_wr;
uint32_t q_key;
bool dead;
std::atomic<uint32_t> tx_wr_inflight = {0}; // counter for inflight Tx WQEs
};

public:
Expand Down
4 changes: 4 additions & 0 deletions src/msg/async/rdma/RDMAConnectedSocketImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
ibv_send_wr iswr[tx_buffers.size()];
uint32_t current_swr = 0;
ibv_send_wr* pre_wr = NULL;
uint32_t num = 0;

memset(iswr, 0, sizeof(iswr));
memset(isge, 0, sizeof(isge));
Expand All @@ -558,6 +559,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
ldout(cct, 20) << __func__ << " send_inline." << dendl;
}*/

num++;
worker->perf_logger->inc(l_msgr_rdma_tx_bytes, isge[current_sge].length);
if (pre_wr)
pre_wr->next = &iswr[current_swr];
Expand All @@ -575,6 +577,7 @@ int RDMAConnectedSocketImpl::post_work_request(std::vector<Chunk*> &tx_buffers)
worker->perf_logger->inc(l_msgr_rdma_tx_failed);
return -errno;
}
qp->add_tx_wr(num);
worker->perf_logger->inc(l_msgr_rdma_tx_chunks, tx_buffers.size());
ldout(cct, 20) << __func__ << " qp state is " << Infiniband::qp_state_string(qp->get_state()) << dendl;
return 0;
Expand All @@ -595,6 +598,7 @@ void RDMAConnectedSocketImpl::fin() {
worker->perf_logger->inc(l_msgr_rdma_tx_failed);
return ;
}
qp->add_tx_wr(1);
}

void RDMAConnectedSocketImpl::cleanup() {
Expand Down
35 changes: 30 additions & 5 deletions src/msg/async/rdma/RDMAStack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,20 @@ void RDMADispatcher::polling()
perf_logger->set(l_msgr_rdma_inflight_tx_chunks, inflight);
if (num_dead_queue_pair) {
Mutex::Locker l(lock); // FIXME reuse dead qp because creating one qp costs 1 ms
while (!dead_queue_pairs.empty()) {
ldout(cct, 10) << __func__ << " finally delete qp=" << dead_queue_pairs.back() << dendl;
delete dead_queue_pairs.back();
for (auto &i : dead_queue_pairs) {
// Bypass QPs that do not collect all Tx completions yet.
if (i->get_tx_wr())
continue;
ldout(cct, 10) << __func__ << " finally delete qp=" << i << dendl;
delete i;
auto it = std::find(dead_queue_pairs.begin(), dead_queue_pairs.end(), i);
if (it != dead_queue_pairs.end())
dead_queue_pairs.erase(it);
perf_logger->dec(l_msgr_rdma_active_queue_pair);
dead_queue_pairs.pop_back();
--num_dead_queue_pair;
}
}
if (!num_qp_conn && done)
if (!num_qp_conn && done && dead_queue_pairs.empty())
break;

uint64_t now = Cycles::rdtsc();
Expand Down Expand Up @@ -333,6 +338,22 @@ RDMAConnectedSocketImpl* RDMADispatcher::get_conn_lockless(uint32_t qp)
return it->second.second;
}

Infiniband::QueuePair* RDMADispatcher::get_qp(uint32_t qp)
{
Mutex::Locker l(lock);
// Try to find the QP in qp_conns firstly.
auto it = qp_conns.find(qp);
if (it != qp_conns.end())
return it->second.first;

// Try again in dead_queue_pairs.
for (auto &i: dead_queue_pairs)
if (i->get_local_qp_number() == qp)
return i;

return nullptr;
}

void RDMADispatcher::erase_qpn_lockless(uint32_t qpn)
{
auto it = qp_conns.find(qpn);
Expand Down Expand Up @@ -361,6 +382,10 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
<< " len: " << response->byte_len << " , addr:" << chunk
<< " " << get_stack()->get_infiniband().wc_status_to_string(response->status) << dendl;

QueuePair *qp = get_qp(response->qp_num);
if (qp)
qp->dec_tx_wr(1);

if (response->status != IBV_WC_SUCCESS) {
perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
if (response->status == IBV_WC_RETRY_EXC_ERR) {
Expand Down
1 change: 1 addition & 0 deletions src/msg/async/rdma/RDMAStack.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class RDMADispatcher {
}
RDMAStack* get_stack() { return stack; }
RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
QueuePair* get_qp(uint32_t qp);
void erase_qpn_lockless(uint32_t qpn);
void erase_qpn(uint32_t qpn);
Infiniband::CompletionQueue* get_tx_cq() const { return tx_cq; }
Expand Down

0 comments on commit 5d6d138

Please sign in to comment.