Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

msg/async/rdma: Introduce RDMAConnMgr + Debug prints #14201

Merged
merged 2 commits into from Mar 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Expand Up @@ -349,6 +349,7 @@ if(HAVE_RDMA)
msg/async/rdma/Infiniband.cc
msg/async/rdma/Device.cc
msg/async/rdma/RDMAConnectedSocketImpl.cc
msg/async/rdma/RDMAConnTCP.cc
msg/async/rdma/RDMAServerSocketImpl.cc
msg/async/rdma/RDMAStack.cc)
endif(HAVE_RDMA)
Expand Down
88 changes: 0 additions & 88 deletions src/msg/async/rdma/Infiniband.cc
Expand Up @@ -26,7 +26,6 @@
#define dout_prefix *_dout << "Infiniband "

static const uint32_t MAX_INLINE_DATA = 0;
static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");

Infiniband::QueuePair::QueuePair(
CephContext *c, Device &device, ibv_qp_type type,
Expand Down Expand Up @@ -613,93 +612,6 @@ Device* Infiniband::get_device(const char* device_name)
return device_list->get_device(device_name);
}

// 1 means no valid buffer read, 0 means got enough buffer
// else return < 0 means error
int Infiniband::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
{
char msg[TCP_MSG_LEN];
char gid[33];
ssize_t r = ::read(sd, &msg, sizeof(msg));
// Drop incoming qpt
if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
return -EINVAL;
}
}
if (r < 0) {
r = -errno;
lderr(cct) << __func__ << " got error " << r << ": "
<< cpp_strerror(r) << dendl;
} else if (r == 0) { // valid disconnect message of length 0
ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
} else if ((size_t)r != sizeof(msg)) { // invalid message
ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
r = -EINVAL;
} else { // valid message
sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
wire_gid_to_gid(gid, &(im.gid));
ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;
}
return r;
}

int Infiniband::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
{
int retry = 0;
ssize_t r;

char msg[TCP_MSG_LEN];
char gid[33];
retry:
gid_to_wire_gid(&(im.gid), gid);
sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
<< ", " << im.peer_qpn << ", " << gid << dendl;
r = ::write(sd, msg, sizeof(msg));
// Drop incoming qpt
if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
return -EINVAL;
}
}

if ((size_t)r != sizeof(msg)) {
// FIXME need to handle EAGAIN instead of retry
if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
retry++;
goto retry;
}
if (r < 0)
lderr(cct) << __func__ << " send returned error " << errno << ": "
<< cpp_strerror(errno) << dendl;
else
lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
return -errno;
}
return 0;
}

void Infiniband::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
{
char tmp[9];
uint32_t v32;
int i;

for (tmp[8] = 0, i = 0; i < 4; ++i) {
memcpy(tmp, wgid + i * 8, 8);
sscanf(tmp, "%x", &v32);
*(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
}
}

void Infiniband::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
{
for (int i = 0; i < 4; ++i)
sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
}

Infiniband::QueuePair::~QueuePair()
{
if (qp) {
Expand Down
17 changes: 12 additions & 5 deletions src/msg/async/rdma/Infiniband.h
Expand Up @@ -18,6 +18,7 @@
#define CEPH_INFINIBAND_H

#include <infiniband/verbs.h>
#include <rdma/rdma_cma.h>

#include <string>
#include <vector>
Expand All @@ -30,6 +31,12 @@
#include "msg/async/net_handler.h"
#include "common/Mutex.h"

#define RDMA_DEBUG 0

#if RDMA_DEBUG
#include "ib_dbg.h"
#endif

#define HUGE_PAGE_SIZE (2 * 1024 * 1024)
#define ALIGN_TO_PAGE_SIZE(x) \
(((x) + HUGE_PAGE_SIZE -1) / HUGE_PAGE_SIZE * HUGE_PAGE_SIZE)
Expand Down Expand Up @@ -144,9 +151,6 @@ class Infiniband {
DeviceList *device_list;
RDMADispatcher *dispatcher = nullptr;

void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);

public:
explicit Infiniband(CephContext *c);
~Infiniband();
Expand Down Expand Up @@ -268,8 +272,6 @@ class Infiniband {
};

public:
int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
static const char* wc_status_to_string(int status);
static const char* qp_state_string(int status);

Expand All @@ -285,4 +287,9 @@ class Infiniband {
RDMADispatcher *get_dispatcher() { return dispatcher; }
};

inline ostream& operator<<(ostream& out, const Infiniband::QueuePair &qp)
{
return out << qp.get_local_qp_number();
}

#endif
117 changes: 117 additions & 0 deletions src/msg/async/rdma/RDMAConnTCP.cc
@@ -0,0 +1,117 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2016 XSKY <haomai@xsky.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#include "RDMAStack.h"
#include "Device.h"
#include "RDMAConnTCP.h"

#define dout_subsys ceph_subsys_ms
#undef dout_prefix
#define dout_prefix *_dout << " RDMAConnTCP "

static const uint32_t TCP_MSG_LEN = sizeof("0000:00000000:00000000:00000000:00000000000000000000000000000000");

// 1 means no valid buffer read, 0 means got enough buffer
// else return < 0 means error
int RDMAConnTCP::recv_msg(CephContext *cct, int sd, IBSYNMsg& im)
{
char msg[TCP_MSG_LEN];
char gid[33];
ssize_t r = ::read(sd, &msg, sizeof(msg));
// Drop incoming qpt
if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
return -EINVAL;
}
}
if (r < 0) {
r = -errno;
lderr(cct) << __func__ << " got error " << r << ": "
<< cpp_strerror(r) << dendl;
} else if (r == 0) { // valid disconnect message of length 0
ldout(cct, 10) << __func__ << " got disconnect message " << dendl;
} else if ((size_t)r != sizeof(msg)) { // invalid message
ldout(cct, 1) << __func__ << " got bad length (" << r << ") " << dendl;
r = -EINVAL;
} else { // valid message
sscanf(msg, "%hu:%x:%x:%x:%s", &(im.lid), &(im.qpn), &(im.psn), &(im.peer_qpn),gid);
wire_gid_to_gid(gid, &(im.gid));
ldout(cct, 5) << __func__ << " recevd: " << im.lid << ", " << im.qpn << ", " << im.psn << ", " << im.peer_qpn << ", " << gid << dendl;
}
return r;
}

int RDMAConnTCP::send_msg(CephContext *cct, int sd, IBSYNMsg& im)
{
int retry = 0;
ssize_t r;

char msg[TCP_MSG_LEN];
char gid[33];
retry:
gid_to_wire_gid(&(im.gid), gid);
sprintf(msg, "%04x:%08x:%08x:%08x:%s", im.lid, im.qpn, im.psn, im.peer_qpn, gid);
ldout(cct, 10) << __func__ << " sending: " << im.lid << ", " << im.qpn << ", " << im.psn
<< ", " << im.peer_qpn << ", " << gid << dendl;
r = ::write(sd, msg, sizeof(msg));
// Drop incoming qpt
if (cct->_conf->ms_inject_socket_failures && sd >= 0) {
if (rand() % cct->_conf->ms_inject_socket_failures == 0) {
ldout(cct, 0) << __func__ << " injecting socket failure" << dendl;
return -EINVAL;
}
}

if ((size_t)r != sizeof(msg)) {
// FIXME need to handle EAGAIN instead of retry
if (r < 0 && (errno == EINTR || errno == EAGAIN) && retry < 3) {
retry++;
goto retry;
}
if (r < 0)
lderr(cct) << __func__ << " send returned error " << errno << ": "
<< cpp_strerror(errno) << dendl;
else
lderr(cct) << __func__ << " send got bad length (" << r << ") " << cpp_strerror(errno) << dendl;
return -errno;
}
return 0;
}

void RDMAConnTCP::wire_gid_to_gid(const char *wgid, union ibv_gid *gid)
{
char tmp[9];
uint32_t v32;
int i;

for (tmp[8] = 0, i = 0; i < 4; ++i) {
memcpy(tmp, wgid + i * 8, 8);
sscanf(tmp, "%x", &v32);
*(uint32_t *)(&gid->raw[i * 4]) = ntohl(v32);
}
}

void RDMAConnTCP::gid_to_wire_gid(const union ibv_gid *gid, char wgid[])
{
for (int i = 0; i < 4; ++i)
sprintf(&wgid[i * 8], "%08x", htonl(*(uint32_t *)(gid->raw + i * 4)));
}

ostream &RDMAConnTCP::print(ostream &out) const
{
return out << "TCP {tcp_fd: " << tcp_fd << "}";
}
84 changes: 84 additions & 0 deletions src/msg/async/rdma/RDMAConnTCP.h
@@ -0,0 +1,84 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2016 XSKY <haomai@xsky.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#ifndef CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H
#define CEPH_MSG_RDMA_CONNECTED_SOCKET_TCP_H

#include "common/ceph_context.h"
#include "common/debug.h"
#include "common/errno.h"
#include "msg/async/Stack.h"
#include "Infiniband.h"
#include "RDMAConnectedSocketImpl.h"

class RDMAWorker;
class RDMADispatcher;

class RDMAConnTCP : public RDMAConnMgr {
class C_handle_connection : public EventCallback {
RDMAConnTCP *cst;
bool active;
public:
C_handle_connection(RDMAConnTCP *w): cst(w), active(true) {};
void do_request(int fd) {
if (active)
cst->handle_connection();
};
void close() {
active = false;
};
};

IBSYNMsg peer_msg;
IBSYNMsg my_msg;
EventCallbackRef con_handler;
int tcp_fd = -1;

private:
void handle_connection();
int send_msg(CephContext *cct, int sd, IBSYNMsg& msg);
int recv_msg(CephContext *cct, int sd, IBSYNMsg& msg);
int activate();
void wire_gid_to_gid(const char *wgid, union ibv_gid *gid);
void gid_to_wire_gid(const union ibv_gid *gid, char wgid[]);

public:
RDMAConnTCP(CephContext *cct, RDMAConnectedSocketImpl *sock,
Infiniband* ib, RDMADispatcher* s, RDMAWorker *w);
virtual ~RDMAConnTCP();

virtual ostream &print(ostream &out) const override;

void set_accept_fd(int sd);

virtual void cleanup() override;
virtual int try_connect(const entity_addr_t&, const SocketOptions &opt) override;
};

class RDMAServerConnTCP : public RDMAServerSocketImpl {
NetHandler net;
int server_setup_socket;

public:
RDMAServerConnTCP(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);

int listen(entity_addr_t &sa, const SocketOptions &opt);
virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
virtual void abort_accept() override;
virtual int fd() const override { return server_setup_socket; }
};

#endif