Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msg/async: ibverbs/rdma support #11531

Merged
merged 7 commits into from Nov 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions CMakeLists.txt
Expand Up @@ -180,6 +180,12 @@ else(ENABLE_SHARED)
endif(ENABLE_SHARED)
set(CMAKE_POSITION_INDEPENDENT_CODE ${ENABLE_SHARED})

option(WITH_RDMA "Enable RDMA in async messenger" OFF)
if(WITH_RDMA)
find_package(rdma REQUIRED)
set(HAVE_RDMA ${RDMA_FOUND})
endif(WITH_RDMA)

find_package(Backtrace)

if(LINUX)
Expand Down
38 changes: 38 additions & 0 deletions cmake/modules/Findrdma.cmake
@@ -0,0 +1,38 @@
# - Find rdma
# Find the rdma library and includes
#
# RDMA_INCLUDE_DIR - where to find ibverbs.h, etc.
# RDMA_LIBRARIES - List of libraries when using ibverbs.
# RDMA_FOUND - True if ibverbs found.

find_path(RDMA_INCLUDE_DIR infiniband/verbs.h)

set(RDMA_NAMES ${RDMA_NAMES} ibverbs)
find_library(RDMA_LIBRARY NAMES ${RDMA_NAMES})

if (RDMA_INCLUDE_DIR AND RDMA_LIBRARY)
set(RDMA_FOUND TRUE)
set(RDMA_LIBRARIES ${RDMA_LIBRARY})
else ()
set(RDMA_FOUND FALSE)
set( RDMA_LIBRARIES )
endif ()

if (RDMA_FOUND)
message(STATUS "Found libibverbs: ${RDMA_LIBRARY}")
else ()
message(STATUS "Not Found libibverbs: ${RDMA_LIBRARY}")
if (RDMA_FIND_REQUIRED)
message(STATUS "Looked for libibverbs named ${RDMA_NAMES}.")
message(FATAL_ERROR "Could NOT find libibverbs")
endif ()
endif ()

# handle the QUIETLY and REQUIRED arguments and set UUID_FOUND to TRUE if
# all listed variables are TRUE
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(ibverbs DEFAULT_MSG RDMA_LIBRARIES RDMA_INCLUDE_DIR)

mark_as_advanced(
RDMA_LIBRARY
)
19 changes: 17 additions & 2 deletions src/CMakeLists.txt
Expand Up @@ -226,6 +226,11 @@ if(HAVE_XIO)
list(APPEND EXTRALIBS ${XIO_LIBRARY} pthread rt)
endif(HAVE_XIO)

if(HAVE_RDMA)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -I${RDMA_INCLUDE_DIR}")
list(APPEND EXTRALIBS ${RDMA_LIBRARIES} pthread rt)
endif(HAVE_RDMA)

# sort out which allocator to use
if(ALLOCATOR STREQUAL "tcmalloc")
set(ALLOC_LIBS ${GPERFTOOLS_TCMALLOC_LIBRARY})
Expand Down Expand Up @@ -309,6 +314,15 @@ if(HAVE_XIO)
msg/xio/QueueStrategy.cc)
endif(HAVE_XIO)

set(async_rdma_common_srcs)
if(HAVE_RDMA)
list(APPEND async_rdma_common_srcs
msg/async/rdma/Infiniband.cc
msg/async/rdma/RDMAConnectedSocketImpl.cc
msg/async/rdma/RDMAServerSocketImpl.cc
msg/async/rdma/RDMAStack.cc)
endif(HAVE_RDMA)

if(HAVE_GOOD_YASM_ELF64)
set(yasm_srcs
common/crc32c_intel_fast_asm.S
Expand Down Expand Up @@ -394,6 +408,7 @@ set(libcommon_files
msg/async/PosixStack.cc
msg/async/net_handler.cc
${xio_common_srcs}
${async_rdma_common_srcs}
msg/msg_types.cc
common/hobject.cc
osd/OSDMap.cc
Expand Down Expand Up @@ -543,7 +558,7 @@ endif (WITH_MGR)
set(librados_config_srcs
librados-config.cc)
add_executable(librados-config ${librados_config_srcs})
target_link_libraries(librados-config librados global ${BLKID_LIBRARIES}
target_link_libraries(librados-config librados global ${BLKID_LIBRARIES} ${RDMA_LIBRARIES}
${CMAKE_DL_LIBS})

install(TARGETS librados-config DESTINATION bin)
Expand Down Expand Up @@ -684,7 +699,7 @@ set(ceph_osd_srcs
add_executable(ceph-osd ${ceph_osd_srcs}
$<TARGET_OBJECTS:common_util_obj>)
add_dependencies(ceph-osd erasure_code_plugins)
target_link_libraries(ceph-osd osd os global ${BLKID_LIBRARIES})
target_link_libraries(ceph-osd osd os global ${BLKID_LIBRARIES} ${RDMA_LIBRARIES})
if(WITH_FUSE)
target_link_libraries(ceph-osd ${FUSE_LIBRARIES})
endif()
Expand Down
6 changes: 6 additions & 0 deletions src/common/config_opts.h
Expand Up @@ -213,6 +213,12 @@ OPTION(ms_async_set_affinity, OPT_BOOL, true)
// core
OPTION(ms_async_affinity_cores, OPT_STR, "")
OPTION(ms_async_send_inline, OPT_BOOL, false)
OPTION(ms_async_rdma_device_name, OPT_STR, "")
OPTION(ms_async_rdma_enable_hugepage, OPT_BOOL, false)
OPTION(ms_async_rdma_buffer_size, OPT_INT, 8192)
OPTION(ms_async_rdma_send_buffers, OPT_U32, 10240)
OPTION(ms_async_rdma_receive_buffers, OPT_U32, 10240)
OPTION(ms_async_rdma_port_num, OPT_U32, 1)

OPTION(inject_early_sigterm, OPT_BOOL, false)

Expand Down
3 changes: 3 additions & 0 deletions src/include/config-h.in.cmake
Expand Up @@ -123,6 +123,9 @@
/* Accelio conditional compilation */
#cmakedefine HAVE_XIO

/* AsyncMessenger RDMA conditional compilation */
#cmakedefine HAVE_RDMA

/* define if radosgw enabled */
#cmakedefine WITH_RADOSGW

Expand Down
3 changes: 2 additions & 1 deletion src/msg/async/AsyncConnection.cc
Expand Up @@ -885,7 +885,8 @@ ssize_t AsyncConnection::_process_connection()
goto fail;
} else if (r == 0) {
ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl;
center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler);
if (async_msgr->get_stack()->nonblock_connect_need_writable_event())
center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler);
break;
}

Expand Down
12 changes: 6 additions & 6 deletions src/msg/async/AsyncMessenger.cc
Expand Up @@ -194,11 +194,14 @@ void Processor::accept()
while (true) {
entity_addr_t addr;
ConnectedSocket cli_socket;
int r = listen_socket.accept(&cli_socket, opts, &addr);
Worker *w = worker;
if (!msgr->get_stack()->support_local_listen_table())
w = msgr->get_stack()->get_worker();
int r = listen_socket.accept(&cli_socket, opts, &addr, w);
if (r == 0) {
ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl;

msgr->add_accept(worker, std::move(cli_socket), addr);
msgr->add_accept(w, std::move(cli_socket), addr);
continue;
} else {
if (r == -EINTR) {
Expand Down Expand Up @@ -440,16 +443,13 @@ void AsyncMessenger::wait()
started = false;
}

AsyncConnectionRef AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
void AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr)
{
lock.Lock();
if (!stack->support_local_listen_table())
w = stack->get_worker();
AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w);
conn->accept(std::move(cli_socket), addr);
accepting_conns.insert(conn);
lock.Unlock();
return conn;
}

AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
Expand Down
5 changes: 4 additions & 1 deletion src/msg/async/AsyncMessenger.h
Expand Up @@ -350,7 +350,10 @@ class AsyncMessenger : public SimplePolicyMessenger {
}

void learned_addr(const entity_addr_t &peer_addr_for_me);
AsyncConnectionRef add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr);
void add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr);
NetworkStack *get_stack() {
return stack;
}

/**
* This wraps ms_deliver_get_authorizer. We use it for AsyncConnection.
Expand Down
4 changes: 2 additions & 2 deletions src/msg/async/PosixStack.cc
Expand Up @@ -249,7 +249,7 @@ class PosixServerSocketImpl : public ServerSocketImpl {

public:
explicit PosixServerSocketImpl(NetHandler &h, const entity_addr_t &sa, int f): handler(h), sa(sa), _fd(f) {}
virtual int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out) override;
virtual int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
virtual void abort_accept() override {
::close(_fd);
}
Expand All @@ -258,7 +258,7 @@ class PosixServerSocketImpl : public ServerSocketImpl {
}
};

int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) {
int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
assert(sock);
sockaddr_storage ss;
socklen_t slen = sizeof(ss);
Expand Down
11 changes: 11 additions & 0 deletions src/msg/async/Stack.cc
Expand Up @@ -17,6 +17,9 @@
#include "common/Cond.h"
#include "common/errno.h"
#include "PosixStack.h"
#ifdef HAVE_RDMA
#include "rdma/RDMAStack.h"
#endif

#include "common/dout.h"
#include "include/assert.h"
Expand Down Expand Up @@ -54,6 +57,10 @@ std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string
{
if (t == "posix")
return std::make_shared<PosixNetworkStack>(c, t);
#ifdef HAVE_RDMA
else if (t == "rdma")
return std::make_shared<RDMAStack>(c, t);
#endif

return nullptr;
}
Expand All @@ -62,6 +69,10 @@ Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned
{
if (type == "posix")
return new PosixWorker(c, i);
#ifdef HAVE_RDMA
else if (type == "rdma")
return new RDMAWorker(c, i);
#endif
return nullptr;
}

Expand Down
8 changes: 5 additions & 3 deletions src/msg/async/Stack.h
Expand Up @@ -23,6 +23,7 @@
#include "msg/msg_types.h"
#include "msg/async/Event.h"

class Worker;
class ConnectedSocketImpl {
public:
virtual ~ConnectedSocketImpl() {}
Expand All @@ -47,7 +48,7 @@ struct SocketOptions {
class ServerSocketImpl {
public:
virtual ~ServerSocketImpl() {}
virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) = 0;
virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) = 0;
virtual void abort_accept() = 0;
/// Get file descriptor
virtual int fd() const = 0;
Expand Down Expand Up @@ -157,8 +158,8 @@ class ServerSocket {
///
/// \Accepts a \ref ConnectedSocket representing the connection, and
/// a \ref entity_addr_t describing the remote endpoint.
int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) {
return _ssi->accept(sock, opt, out);
int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out, Worker *w) {
return _ssi->accept(sock, opt, out, w);
}

/// Stops any \ref accept() in progress.
Expand Down Expand Up @@ -309,6 +310,7 @@ class NetworkStack : public CephContext::ForkWatcher {
// But for dpdk backend, we maintain listen table in each thread. So we
// need to let each thread do binding port.
virtual bool support_local_listen_table() const { return false; }
virtual bool nonblock_connect_need_writable_event() const { return true; }

void start();
void stop();
Expand Down