Skip to content

Commit

Permalink
msg/async/rdma: Extract sockets stuff from RDMAStack.h
Browse files Browse the repository at this point in the history
This is a preparation commit, in order to make review easier. In this
commit I move code from RDMAStack.h into the new file
RDMAConnectedSocketImpl.h - without changing the code.

In the next commit, the actual logic changes will be done and socket
classes will be split into a base RDMAConnected classes and child
classes with TCP connection establishment specific code.

Issue: 995322
Change-Id: I639fda490a6fbd02addb95d3158c5ac1e7390ef0
Signed-off-by: Amir Vadai <amir@vadai.me>
  • Loading branch information
amirv authored and Adir Lev committed Mar 28, 2017
1 parent 94eddb1 commit 1890e92
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 96 deletions.
126 changes: 126 additions & 0 deletions src/msg/async/rdma/RDMAConnectedSocketImpl.h
@@ -0,0 +1,126 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2016 XSKY <haomai@xsky.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#ifndef CEPH_MSG_RDMA_CONNECTED_SOCKET_IMPL_H
#define CEPH_MSG_RDMA_CONNECTED_SOCKET_IMPL_H

#include "common/ceph_context.h"
#include "common/debug.h"
#include "common/errno.h"
#include "msg/async/Stack.h"
#include "Infiniband.h"

class RDMAWorker;
class RDMADispatcher;

class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
public:
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::CompletionChannel CompletionChannel;
typedef Infiniband::CompletionQueue CompletionQueue;

private:
CephContext *cct;
Infiniband::QueuePair *qp;
Device *ibdev;
int ibport;
IBSYNMsg peer_msg;
IBSYNMsg my_msg;
int connected;
int error;
Infiniband* infiniband;
RDMADispatcher* dispatcher;
RDMAWorker* worker;
std::vector<Chunk*> buffers;
int notify_fd = -1;
bufferlist pending_bl;

Mutex lock;
std::vector<ibv_wc> wc;
bool is_server;
EventCallbackRef con_handler;
int tcp_fd = -1;
bool active;// qp is active ?

void notify();
ssize_t read_buffers(char* buf, size_t len);
int post_work_request(std::vector<Chunk*>&);

public:
RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
RDMAWorker *w);
virtual ~RDMAConnectedSocketImpl();

Device *get_device() { return ibdev; }

void pass_wc(std::vector<ibv_wc> &&v);
void get_wc(std::vector<ibv_wc> &w);
virtual int is_connected() override { return connected; }

virtual ssize_t read(char* buf, size_t len) override;
virtual ssize_t zero_copy_read(bufferptr &data) override;
virtual ssize_t send(bufferlist &bl, bool more) override;
virtual void shutdown() override;
virtual void close() override;
virtual int fd() const override { return notify_fd; }
void fault();
const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
ssize_t submit(bool more);
int activate();
void fin();
void handle_connection();
void cleanup();
void set_accept_fd(int sd);
int try_connect(const entity_addr_t&, const SocketOptions &opt);

class C_handle_connection : public EventCallback {
RDMAConnectedSocketImpl *csi;
bool active;
public:
C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
void do_request(int fd) {
if (active)
csi->handle_connection();
}
void close() {
active = false;
}
};
};

class RDMAServerSocketImpl : public ServerSocketImpl {
CephContext *cct;
Device *ibdev;
int ibport;
NetHandler net;
int server_setup_socket;
Infiniband* infiniband;
RDMADispatcher *dispatcher;
RDMAWorker *worker;
entity_addr_t sa;

public:
RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);

int listen(entity_addr_t &sa, const SocketOptions &opt);
virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
virtual void abort_accept() override;
virtual int fd() const override { return server_setup_socket; }
int get_fd() { return server_setup_socket; }
};

#endif

97 changes: 1 addition & 96 deletions src/msg/async/rdma/RDMAStack.h
Expand Up @@ -28,6 +28,7 @@
#include "common/errno.h"
#include "msg/async/Stack.h"
#include "Infiniband.h"
#include "RDMAConnectedSocketImpl.h"

class RDMAConnectedSocketImpl;
class RDMAServerSocketImpl;
Expand Down Expand Up @@ -190,101 +191,6 @@ class RDMAWorker : public Worker {
}
};

class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
public:
typedef Infiniband::MemoryManager::Chunk Chunk;
typedef Infiniband::CompletionChannel CompletionChannel;
typedef Infiniband::CompletionQueue CompletionQueue;

private:
CephContext *cct;
Infiniband::QueuePair *qp;
Device *ibdev;
int ibport;
IBSYNMsg peer_msg;
IBSYNMsg my_msg;
int connected;
int error;
Infiniband* infiniband;
RDMADispatcher* dispatcher;
RDMAWorker* worker;
std::vector<Chunk*> buffers;
int notify_fd = -1;
bufferlist pending_bl;

Mutex lock;
std::vector<ibv_wc> wc;
bool is_server;
EventCallbackRef con_handler;
int tcp_fd = -1;
bool active;// qp is active ?

void notify();
ssize_t read_buffers(char* buf, size_t len);
int post_work_request(std::vector<Chunk*>&);

public:
RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
RDMAWorker *w);
virtual ~RDMAConnectedSocketImpl();

Device *get_device() { return ibdev; }

void pass_wc(std::vector<ibv_wc> &&v);
void get_wc(std::vector<ibv_wc> &w);
virtual int is_connected() override { return connected; }

virtual ssize_t read(char* buf, size_t len) override;
virtual ssize_t zero_copy_read(bufferptr &data) override;
virtual ssize_t send(bufferlist &bl, bool more) override;
virtual void shutdown() override;
virtual void close() override;
virtual int fd() const override { return notify_fd; }
void fault();
const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
ssize_t submit(bool more);
int activate();
void fin();
void handle_connection();
void cleanup();
void set_accept_fd(int sd);
int try_connect(const entity_addr_t&, const SocketOptions &opt);

class C_handle_connection : public EventCallback {
RDMAConnectedSocketImpl *csi;
bool active;
public:
C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
void do_request(int fd) {
if (active)
csi->handle_connection();
}
void close() {
active = false;
}
};
};

class RDMAServerSocketImpl : public ServerSocketImpl {
CephContext *cct;
Device *ibdev;
int ibport;
NetHandler net;
int server_setup_socket;
Infiniband* infiniband;
RDMADispatcher *dispatcher;
RDMAWorker *worker;
entity_addr_t sa;

public:
RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);

int listen(entity_addr_t &sa, const SocketOptions &opt);
virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
virtual void abort_accept() override;
virtual int fd() const override { return server_setup_socket; }
int get_fd() { return server_setup_socket; }
};

class RDMAStack : public NetworkStack {
vector<std::thread> threads;
Expand All @@ -301,5 +207,4 @@ class RDMAStack : public NetworkStack {
virtual void join_worker(unsigned i) override;
RDMADispatcher *get_dispatcher() { return dispatcher; }
};

#endif

0 comments on commit 1890e92

Please sign in to comment.