Skip to content

Commit

Permalink
Merge pull request #16893 from yuyuyu101/wip-fix-multicct-ib
Browse files Browse the repository at this point in the history
msg/async/rdma: fix multi cephcontext confllicting
  • Loading branch information
yuyuyu101 committed Aug 10, 2017
2 parents c8c837f + a394213 commit 20a03bb
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 31 deletions.
74 changes: 44 additions & 30 deletions src/msg/async/rdma/RDMAStack.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,20 @@
#undef dout_prefix
#define dout_prefix *_dout << "RDMAStack "

static Tub<Infiniband> global_infiniband;
struct InfinibandSingleton {
CephContext *cct;
std::shared_ptr<Infiniband> ib;

InfinibandSingleton(CephContext *c): cct(c), ib(nullptr) {}
void ready() {
if (!ib) {
ib.reset(new Infiniband(cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num));
}
}
~InfinibandSingleton() {
}
};


RDMADispatcher::~RDMADispatcher()
{
Expand All @@ -48,11 +61,11 @@ RDMADispatcher::~RDMADispatcher()
delete rx_cc;
delete async_handler;

global_infiniband->set_dispatcher(nullptr);
ib->set_dispatcher(nullptr);
}

RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)
: cct(c), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s, Infiniband *i)
: cct(c), ib(i), async_handler(new C_handle_cq_async(this)), lock("RDMADispatcher::lock"),
w_lock("RDMADispatcher::for worker pending list"), stack(s)
{
PerfCountersBuilder plb(cct, "AsyncMessenger::RDMADispatcher", l_msgr_rdma_dispatcher_first, l_msgr_rdma_dispatcher_last);
Expand Down Expand Up @@ -86,13 +99,13 @@ RDMADispatcher::RDMADispatcher(CephContext* c, RDMAStack* s)

void RDMADispatcher::polling_start()
{
tx_cc = global_infiniband->create_comp_channel(cct);
tx_cc = ib->create_comp_channel(cct);
assert(tx_cc);
rx_cc = global_infiniband->create_comp_channel(cct);
rx_cc = ib->create_comp_channel(cct);
assert(rx_cc);
tx_cq = global_infiniband->create_comp_queue(cct, tx_cc);
tx_cq = ib->create_comp_queue(cct, tx_cc);
assert(tx_cq);
rx_cq = global_infiniband->create_comp_queue(cct, rx_cc);
rx_cq = ib->create_comp_queue(cct, rx_cc);
assert(rx_cq);

t = std::thread(&RDMADispatcher::polling, this);
Expand All @@ -109,7 +122,7 @@ void RDMADispatcher::handle_async_event()
ldout(cct, 30) << __func__ << dendl;
while (1) {
ibv_async_event async_event;
if (ibv_get_async_event(global_infiniband->get_device()->ctxt, &async_event)) {
if (ibv_get_async_event(ib->get_device()->ctxt, &async_event)) {
if (errno != EAGAIN)
lderr(cct) << __func__ << " ibv_get_async_event failed. (errno=" << errno
<< " " << cpp_strerror(errno) << ")" << dendl;
Expand All @@ -133,7 +146,7 @@ void RDMADispatcher::handle_async_event()
erase_qpn_lockless(qpn);
}
} else {
ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << global_infiniband->get_device()->ctxt
ldout(cct, 1) << __func__ << " ibv_get_async_event: dev=" << ib->get_device()->ctxt
<< " evt: " << ibv_event_type_str(async_event.event_type)
<< dendl;
}
Expand All @@ -143,7 +156,7 @@ void RDMADispatcher::handle_async_event()

void RDMADispatcher::post_chunk_to_pool(Chunk* chunk) {
Mutex::Locker l(lock);
global_infiniband->post_chunk_to_pool(chunk);
ib->post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
}

Expand Down Expand Up @@ -176,7 +189,7 @@ void RDMADispatcher::polling()
perf_logger->inc(l_msgr_rdma_rx_bufs_in_use, rx_ret);

Mutex::Locker l(lock);//make sure connected socket alive when pass wc
global_infiniband->post_chunks_to_srq(rx_ret);
ib->post_chunks_to_srq(rx_ret);
for (int i = 0; i < rx_ret; ++i) {
ibv_wc* response = &wc[i];
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
Expand All @@ -188,7 +201,7 @@ void RDMADispatcher::polling()
conn = get_conn_lockless(response->qp_num);
if (!conn) {
ldout(cct, 1) << __func__ << " csi with qpn " << response->qp_num << " may be dead. chunk " << chunk << " will be back ? " << r << dendl;
global_infiniband->post_chunk_to_pool(chunk);
ib->post_chunk_to_pool(chunk);
perf_logger->dec(l_msgr_rdma_rx_bufs_in_use);
} else {
polled[conn].push_back(*response);
Expand All @@ -199,12 +212,12 @@ void RDMADispatcher::polling()

ldout(cct, 1) << __func__ << " work request returned error for buffer(" << chunk
<< ") status(" << response->status << ":"
<< global_infiniband->wc_status_to_string(response->status) << ")" << dendl;
<< ib->wc_status_to_string(response->status) << ")" << dendl;
conn = get_conn_lockless(response->qp_num);
if (conn && conn->is_connected())
conn->fault();

global_infiniband->post_chunk_to_pool(chunk);
ib->post_chunk_to_pool(chunk);
}
}
for (auto &&i : polled)
Expand Down Expand Up @@ -335,7 +348,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
Chunk* chunk = reinterpret_cast<Chunk *>(response->wr_id);
ldout(cct, 25) << __func__ << " QP: " << response->qp_num
<< " len: " << response->byte_len << " , addr:" << chunk
<< " " << global_infiniband->wc_status_to_string(response->status) << dendl;
<< " " << ib->wc_status_to_string(response->status) << dendl;

if (response->status != IBV_WC_SUCCESS) {
perf_logger->inc(l_msgr_rdma_tx_total_wc_errors);
Expand All @@ -350,7 +363,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)
} else {
ldout(cct, 1) << __func__ << " send work request returned error for buffer("
<< response->wr_id << ") status(" << response->status << "): "
<< global_infiniband->wc_status_to_string(response->status) << dendl;
<< ib->wc_status_to_string(response->status) << dendl;
}

Mutex::Locker l(lock);//make sure connected socket alive when pass wc
Expand All @@ -366,7 +379,7 @@ void RDMADispatcher::handle_tx_event(ibv_wc *cqe, int n)

//TX completion may come either from regular send message or from 'fin' message.
//In the case of 'fin' wr_id points to the QueuePair.
if (global_infiniband->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
if (ib->get_memory_manager()->is_tx_buffer(chunk->buffer)) {
tx_chunks.push_back(chunk);
} else if (reinterpret_cast<QueuePair*>(response->wr_id)->get_local_qp_number() == response->qp_num ) {
ldout(cct, 1) << __func__ << " sending of the disconnect msg completed" << dendl;
Expand Down Expand Up @@ -394,7 +407,7 @@ void RDMADispatcher::post_tx_buffer(std::vector<Chunk*> &chunks)
return ;

inflight -= chunks.size();
global_infiniband->get_memory_manager()->return_tx(chunks);
ib->get_memory_manager()->return_tx(chunks);
ldout(cct, 30) << __func__ << " release " << chunks.size()
<< " chunks, inflight " << inflight << dendl;
notify_pending_workers();
Expand Down Expand Up @@ -438,9 +451,9 @@ void RDMAWorker::initialize()

int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket *sock)
{
global_infiniband->init();
dispatcher->get_ib()->init();

auto p = new RDMAServerSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this, sa);
auto p = new RDMAServerSocketImpl(cct, dispatcher->get_ib(), get_stack()->get_dispatcher(), this, sa);
int r = p->listen(sa, opt);
if (r < 0) {
delete p;
Expand All @@ -453,9 +466,9 @@ int RDMAWorker::listen(entity_addr_t &sa, const SocketOptions &opt,ServerSocket

int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket)
{
global_infiniband->init();
dispatcher->get_ib()->init();

RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, global_infiniband.get(), get_stack()->get_dispatcher(), this);
RDMAConnectedSocketImpl* p = new RDMAConnectedSocketImpl(cct, dispatcher->get_ib(), get_stack()->get_dispatcher(), this);
int r = p->try_connect(addr, opts);

if (r < 0) {
Expand All @@ -471,9 +484,9 @@ int RDMAWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, Co
int RDMAWorker::get_reged_mem(RDMAConnectedSocketImpl *o, std::vector<Chunk*> &c, size_t bytes)
{
assert(center.in_thread());
int r = global_infiniband->get_tx_buffers(c, bytes);
int r = dispatcher->get_ib()->get_tx_buffers(c, bytes);
assert(r >= 0);
size_t got = global_infiniband->get_memory_manager()->get_tx_buffer_size() * r;
size_t got = dispatcher->get_ib()->get_memory_manager()->get_tx_buffer_size() * r;
ldout(cct, 30) << __func__ << " need " << bytes << " bytes, reserve " << got << " registered bytes, inflight " << dispatcher->inflight << dendl;
stack->get_dispatcher()->inflight += r;
if (got >= bytes)
Expand Down Expand Up @@ -543,12 +556,13 @@ RDMAStack::RDMAStack(CephContext *cct, const string &t): NetworkStack(cct, t)
" We recommend setting this parameter to infinity" << dendl;
}

if (!global_infiniband)
global_infiniband.construct(
cct, cct->_conf->ms_async_rdma_device_name, cct->_conf->ms_async_rdma_port_num);
InfinibandSingleton *single;
cct->lookup_or_create_singleton_object<InfinibandSingleton>(single, "Infiniband");
single->ready();

ldout(cct, 20) << __func__ << " constructing RDMAStack..." << dendl;
dispatcher = new RDMADispatcher(cct, this);
global_infiniband->set_dispatcher(dispatcher);
dispatcher = new RDMADispatcher(cct, this, single->ib.get());
single->ib->set_dispatcher(dispatcher);

unsigned num = get_num_worker();
for (unsigned i = 0; i < num; ++i) {
Expand Down
4 changes: 3 additions & 1 deletion src/msg/async/rdma/RDMAStack.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class RDMADispatcher {

std::thread t;
CephContext *cct;
Infiniband *ib;
Infiniband::CompletionQueue* tx_cq;
Infiniband::CompletionQueue* rx_cq;
Infiniband::CompletionChannel *tx_cc, *rx_cc;
Expand Down Expand Up @@ -89,7 +90,7 @@ class RDMADispatcher {
public:
PerfCounters *perf_logger;

explicit RDMADispatcher(CephContext* c, RDMAStack* s);
explicit RDMADispatcher(CephContext* c, RDMAStack* s, Infiniband *ib);
virtual ~RDMADispatcher();
void handle_async_event();

Expand All @@ -106,6 +107,7 @@ class RDMADispatcher {
++num_pending_workers;
}
RDMAStack* get_stack() { return stack; }
Infiniband *get_ib() { return ib; }
RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
void erase_qpn_lockless(uint32_t qpn);
void erase_qpn(uint32_t qpn);
Expand Down

0 comments on commit 20a03bb

Please sign in to comment.