From 0ba42237e2c1e32507e66c35cc69f3d130184ad9 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Mon, 4 Jul 2016 14:41:13 +0800 Subject: [PATCH 01/16] msg/async/Stack: add abstract Stack Stack is a network IO framework which encapsulates all necessary basic network interface, then it manages threads to work. Different network backend like posix, dpdk even RDMA need to inherit Stack class to implement necessary interfaces. So it will make ease for other network backend to integrated into ceph. Otherwise, each backend need to implement the whole Messenger logics like reconnect, policy handle, session maintain... Signed-off-by: Haomai Wang --- src/CMakeLists.txt | 1 + src/msg/Makefile.am | 2 + src/msg/async/Stack.cc | 162 ++++++++++++++++++++ src/msg/async/Stack.h | 327 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 492 insertions(+) create mode 100644 src/msg/async/Stack.cc create mode 100644 src/msg/async/Stack.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5a7e6126b9fa5..8ade253592b19 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -424,6 +424,7 @@ set(libcommon_files msg/async/Event.cc msg/async/EventEpoll.cc msg/async/EventSelect.cc + msg/async/Stack.cc msg/async/net_handler.cc ${xio_common_srcs} msg/msg_types.cc diff --git a/src/msg/Makefile.am b/src/msg/Makefile.am index 3081566d80d65..0c6ad392c8aed 100644 --- a/src/msg/Makefile.am +++ b/src/msg/Makefile.am @@ -22,6 +22,7 @@ libmsg_la_SOURCES += \ msg/async/AsyncMessenger.cc \ msg/async/Event.cc \ msg/async/net_handler.cc \ + msg/async/Stack.cc \ msg/async/EventSelect.cc if LINUX @@ -47,6 +48,7 @@ noinst_HEADERS += \ msg/async/Event.h \ msg/async/EventEpoll.h \ msg/async/EventSelect.h \ + msg/async/Stack.h \ msg/async/net_handler.h if LINUX diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc new file mode 100644 index 0000000000000..9b27b0ec0c1f3 --- /dev/null +++ b/src/msg/async/Stack.cc @@ -0,0 +1,162 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSky + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/Cond.h" +#include "common/errno.h" + +#include "common/dout.h" +#include "include/assert.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << "stack " + +void NetworkStack::add_thread(unsigned i) +{ + assert(threads.size() <= i); + Worker *w = workers[i]; + threads.emplace_back( + [this, w]() { + const uint64_t InitEventNumber = 5000; + const uint64_t EventMaxWaitUs = 30000000; + w->center.init(InitEventNumber, w->id); + ldout(cct, 10) << __func__ << " starting" << dendl; + w->initialize(); + w->init_done(); + while (!w->done) { + ldout(cct, 30) << __func__ << " calling event process" << dendl; + + int r = w->center.process_events(EventMaxWaitUs); + if (r < 0) { + ldout(cct, 20) << __func__ << " process events failed: " + << cpp_strerror(errno) << dendl; + // TODO do something? + } + } + w->reset(); + } + ); +} + +std::shared_ptr NetworkStack::create(CephContext *c, const string &t) +{ + return nullptr; +} + +Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i) +{ + return nullptr; +} + +NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c) +{ + for (unsigned i = 0; i < cct->_conf->ms_async_max_op_threads; ++i) { + Worker *w = create_worker(cct, type, i); + workers.push_back(w); + } + num_workers = cct->_conf->ms_async_op_threads; +} + +void NetworkStack::start() +{ + pool_spin.lock(); + if (started) { + pool_spin.unlock(); + return ; + } + for (unsigned i = 0; i < num_workers; ++i) + add_thread(i); + spawn_workers(threads); + started = true; + pool_spin.unlock(); + + for (unsigned i = 0; i < num_workers; ++i) + workers[i]->wait_for_init(); +} + +Worker* NetworkStack::get_worker() +{ + ldout(cct, 10) << __func__ << dendl; + + // start with some reasonably large number + unsigned min_load = std::numeric_limits::max(); + Worker* current_best = nullptr; + + pool_spin.lock(); + // find worker with least references + // tempting case is returning on references == 0, but in reality + // this will happen so rarely that there's no need for special case. + for (unsigned i = 0; i < num_workers; ++i) { + unsigned worker_load = workers[i]->references.load(); + if (worker_load < min_load) { + current_best = workers[i]; + min_load = worker_load; + } + } + + pool_spin.unlock(); + assert(current_best); + ++current_best->references; + return current_best; +} + +void NetworkStack::stop() +{ + Spinlock::Locker l(pool_spin); + for (unsigned i = 0; i < num_workers; ++i) { + workers[i]->done = true; + workers[i]->center.wakeup(); + } + join_workers(); + threads.clear(); + started = false; +} + +class C_drain : public EventCallback { + Mutex drain_lock; + Cond drain_cond; + std::atomic drain_count; + + public: + explicit C_drain(size_t c) + : drain_lock("C_drain::drain_lock"), + drain_count(c) {} + void do_request(int id) { + Mutex::Locker l(drain_lock); + drain_count--; + drain_cond.Signal(); + } + void wait() { + Mutex::Locker l(drain_lock); + while (drain_count.load()) + drain_cond.Wait(drain_lock); + } +}; + +void NetworkStack::drain() +{ + ldout(cct, 10) << __func__ << " started." << dendl; + pthread_t cur = pthread_self(); + pool_spin.lock(); + C_drain drain(num_workers); + for (unsigned i = 0; i < num_workers; ++i) { + assert(cur != workers[i]->center.get_owner()); + workers[i]->center.dispatch_event_external(EventCallbackRef(&drain)); + } + pool_spin.unlock(); + drain.wait(); + ldout(cct, 10) << __func__ << " end." << dendl; +} diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h new file mode 100644 index 0000000000000..9996ed2841085 --- /dev/null +++ b/src/msg/async/Stack.h @@ -0,0 +1,327 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_ASYNC_STACK_H +#define CEPH_MSG_ASYNC_STACK_H + +#include "include/Spinlock.h" +#include "common/perf_counters.h" +#include "common/simple_spin.h" +#include "msg/msg_types.h" +#include "msg/async/Event.h" + +class ConnectedSocketImpl { + public: + virtual ~ConnectedSocketImpl() {} + virtual int is_connected() = 0; + virtual ssize_t read(char*, size_t) = 0; + virtual ssize_t zero_copy_read(bufferptr&) = 0; + virtual ssize_t send(bufferlist &bl, bool more) = 0; + virtual void shutdown() = 0; + virtual void close() = 0; + virtual int fd() const = 0; +}; + +class ConnectedSocket; +struct SocketOptions { + bool nonblock = true; + bool nodelay = true; + int rcbuf_size = 0; +}; + +/// \cond internal +class ServerSocketImpl { + public: + virtual ~ServerSocketImpl() {} + virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) = 0; + virtual void abort_accept() = 0; + /// Get file descriptor + virtual int fd() const = 0; +}; +/// \endcond + +/// \addtogroup networking-module +/// @{ + +/// A TCP (or other stream-based protocol) connection. +/// +/// A \c ConnectedSocket represents a full-duplex stream between +/// two endpoints, a local endpoint and a remote endpoint. +class ConnectedSocket { + std::unique_ptr _csi; + + public: + /// Constructs a \c ConnectedSocket not corresponding to a connection + ConnectedSocket() {}; + /// \cond internal + explicit ConnectedSocket(std::unique_ptr csi) + : _csi(std::move(csi)) {} + /// \endcond + ~ConnectedSocket() { + if (_csi) + _csi->close(); + } + /// Moves a \c ConnectedSocket object. + ConnectedSocket(ConnectedSocket&& cs) = default; + /// Move-assigns a \c ConnectedSocket object. + ConnectedSocket& operator=(ConnectedSocket&& cs) = default; + + int is_connected() { + return _csi->is_connected(); + } + /// Read the input stream with copy. + /// + /// Copy an object returning data sent from the remote endpoint. + ssize_t read(char* buf, size_t len) { + return _csi->read(buf, len); + } + /// Gets the input stream. + /// + /// Gets an object returning data sent from the remote endpoint. + ssize_t zero_copy_read(bufferptr &data) { + return _csi->zero_copy_read(data); + } + /// Gets the output stream. + /// + /// Gets an object that sends data to the remote endpoint. + ssize_t send(bufferlist &bl, bool more) { + return _csi->send(bl, more); + } + /// Disables output to the socket. + /// + /// Current or future writes that have not been successfully flushed + /// will immediately fail with an error. This is useful to abort + /// operations on a socket that is not making progress due to a + /// peer failure. + void shutdown() { + return _csi->shutdown(); + } + /// Disables input from the socket. + /// + /// Current or future reads will immediately fail with an error. + /// This is useful to abort operations on a socket that is not making + /// progress due to a peer failure. + void close() { + _csi->close(); + _csi.reset(); + } + + /// Get file descriptor + int fd() const { + return _csi->fd(); + } + + explicit operator bool() const { + return _csi.get(); + } +}; +/// @} + +/// \addtogroup networking-module +/// @{ + +/// A listening socket, waiting to accept incoming network connections. +class ServerSocket { + std::unique_ptr _ssi; + public: + /// Constructs a \c ServerSocket not corresponding to a connection + ServerSocket() {} + /// \cond internal + explicit ServerSocket(std::unique_ptr ssi) + : _ssi(std::move(ssi)) {} + ~ServerSocket() { + if (_ssi) + _ssi->abort_accept(); + } + /// \endcond + /// Moves a \c ServerSocket object. + ServerSocket(ServerSocket&& ss) = default; + /// Move-assigns a \c ServerSocket object. + ServerSocket& operator=(ServerSocket&& cs) = default; + + /// Accepts the next connection to successfully connect to this socket. + /// + /// \Accepts a \ref ConnectedSocket representing the connection, and + /// a \ref entity_addr_t describing the remote endpoint. + int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) { + return _ssi->accept(sock, opt, out); + } + + /// Stops any \ref accept() in progress. + /// + /// Current and future \ref accept() calls will terminate immediately + /// with an error. + void abort_accept() { + _ssi->abort_accept(); + _ssi.reset(); + } + + /// Get file descriptor + int fd() const { + return _ssi->fd(); + } + + explicit operator bool() const { + return _ssi.get(); + } +}; +/// @} + +class NetworkStack; + +enum { + l_msgr_first = 94000, + l_msgr_recv_messages, + l_msgr_send_messages, + l_msgr_send_messages_inline, + l_msgr_recv_bytes, + l_msgr_send_bytes, + l_msgr_created_connections, + l_msgr_active_connections, + l_msgr_last, +}; + +class Worker { + std::mutex init_lock; + std::condition_variable init_cond; + bool init = false; + + public: + bool done = false; + + CephContext *cct; + PerfCounters *perf_logger; + unsigned id; + + std::atomic_uint references; + EventCenter center; + + Worker(const Worker&) = delete; + Worker& operator=(const Worker&) = delete; + + Worker(CephContext *c, unsigned i) + : cct(c), perf_logger(NULL), id(i), references(0), center(c) { + char name[128]; + sprintf(name, "AsyncMessenger::Worker-%d", id); + // initialize perf_logger + PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); + + plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); + plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); + plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages"); + plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); + plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); + + perf_logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(perf_logger); + } + virtual ~Worker() { + if (perf_logger) { + cct->get_perfcounters_collection()->remove(perf_logger); + delete perf_logger; + } + } + + virtual int listen(entity_addr_t &addr, + const SocketOptions &opts, ServerSocket *) = 0; + virtual int connect(const entity_addr_t &addr, + const SocketOptions &opts, ConnectedSocket *socket) = 0; + + virtual void initialize() {} + PerfCounters *get_perf_counter() { return perf_logger; } + void release_worker() { + int oldref = references.fetch_sub(1); + assert(oldref > 0); + } + void init_done() { + init_lock.lock(); + init = true; + init_cond.notify_all(); + init_lock.unlock(); + } + void wait_for_init() { + std::unique_lock l(init_lock); + while (!init) + init_cond.wait(l); + } + void reset() { + init_lock.lock(); + init = false; + init_cond.notify_all(); + init_lock.unlock(); + done = false; + } +}; + +class NetworkStack { + std::string type; + std::atomic_bool started; + unsigned num_workers = 0; + Spinlock pool_spin; + + void add_thread(unsigned i); + + protected: + CephContext *cct; + vector workers; + std::vector> threads; + // Used to indicate whether thread started + + explicit NetworkStack(CephContext *c, const string &t); + public: + virtual ~NetworkStack() { + for (auto &&w : workers) + delete w; + } + + static std::shared_ptr create( + CephContext *c, const string &type); + + static Worker* create_worker( + CephContext *c, const string &t, unsigned i); + // backend need to override this method if supports zero copy read + virtual bool support_zero_copy_read() const { return false; } + // backend need to override this method if backend doesn't support shared + // listen table. + // For example, posix backend has in kernel global listen table. If one + // thread bind a port, other threads also aware this. + // But for dpdk backend, we maintain listen table in each thread. So we + // need to let each thread do binding port. + virtual bool support_local_listen_table() const { return false; } + + void start(); + void stop(); + virtual Worker *get_worker(); + Worker *get_worker(unsigned i) { + return workers[i]; + } + void drain(); + unsigned get_num_worker() const { + return num_workers; + } + + // direct is used in tests only + virtual void spawn_workers(std::vector> &) = 0; + virtual void join_workers() = 0; + + private: + NetworkStack(const NetworkStack &); + NetworkStack& operator=(const NetworkStack &); +}; + +#endif //CEPH_MSG_ASYNC_STACK_H From 31bbe39ac288c7e7353e829863cdacbcfcf06009 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 13 Jul 2016 15:56:56 +0800 Subject: [PATCH 02/16] msg/async: add PosixStack support Add default posix backend support, it should be the full replacement for the original AsyncMessenger IO logics. Signed-off-by: Haomai Wang --- src/CMakeLists.txt | 1 + src/msg/Makefile.am | 2 + src/msg/async/PosixStack.cc | 370 ++++++++++++++++++++++++++++++++++++ src/msg/async/PosixStack.h | 62 ++++++ src/msg/async/Stack.cc | 6 + 5 files changed, 441 insertions(+) create mode 100644 src/msg/async/PosixStack.cc create mode 100644 src/msg/async/PosixStack.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 8ade253592b19..0596491aece3f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -425,6 +425,7 @@ set(libcommon_files msg/async/EventEpoll.cc msg/async/EventSelect.cc msg/async/Stack.cc + msg/async/PosixStack.cc msg/async/net_handler.cc ${xio_common_srcs} msg/msg_types.cc diff --git a/src/msg/Makefile.am b/src/msg/Makefile.am index 0c6ad392c8aed..f05788d69677d 100644 --- a/src/msg/Makefile.am +++ b/src/msg/Makefile.am @@ -23,6 +23,7 @@ libmsg_la_SOURCES += \ msg/async/Event.cc \ msg/async/net_handler.cc \ msg/async/Stack.cc \ + msg/async/PosixStack.cc \ msg/async/EventSelect.cc if LINUX @@ -49,6 +50,7 @@ noinst_HEADERS += \ msg/async/EventEpoll.h \ msg/async/EventSelect.h \ msg/async/Stack.h \ + msg/async/PosixStack.h \ msg/async/net_handler.h if LINUX diff --git a/src/msg/async/PosixStack.cc b/src/msg/async/PosixStack.cc new file mode 100644 index 0000000000000..909cec3614f83 --- /dev/null +++ b/src/msg/async/PosixStack.cc @@ -0,0 +1,370 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include +#include + +#include + +#include "PosixStack.h" + +#include "include/buffer.h" +#include "include/str_list.h" +#include "include/sock_compat.h" +#include "common/errno.h" +#include "common/strtol.h" +#include "common/dout.h" +#include "include/assert.h" +#include "common/simple_spin.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << "PosixStack " + +class PosixConnectedSocketImpl final : public ConnectedSocketImpl { + NetHandler &handler; + int _fd; + entity_addr_t sa; + bool connected; +#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) + sigset_t sigpipe_mask; + bool sigpipe_pending; + bool sigpipe_unblock; +#endif + + public: + explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected) + : handler(h), _fd(f), sa(sa), connected(connected) {} + + virtual int is_connected() override { + if (connected) + return 1; + + int r = handler.reconnect(sa, _fd); + if (r == 0) { + connected = true; + return 1; + } else if (r < 0) { + return r; + } else { + return 0; + } + } + + virtual ssize_t zero_copy_read(bufferptr&) override { + return -EOPNOTSUPP; + } + + virtual ssize_t read(char *buf, size_t len) override { + ssize_t r = ::read(_fd, buf, len); + if (r < 0) + r = -errno; + return r; + } + + /* + SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL + http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html + http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html + */ + static void suppress_sigpipe() + { + #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) + /* + We want to ignore possible SIGPIPE that we can generate on write. + SIGPIPE is delivered *synchronously* and *only* to the thread + doing the write. So if it is reported as already pending (which + means the thread blocks it), then we do nothing: if we generate + SIGPIPE, it will be merged with the pending one (there's no + queuing), and that suits us well. If it is not pending, we block + it in this thread (and we avoid changing signal action, because it + is per-process). + */ + sigset_t pending; + sigemptyset(&pending); + sigpending(&pending); + sigpipe_pending = sigismember(&pending, SIGPIPE); + if (!sigpipe_pending) { + sigset_t blocked; + sigemptyset(&blocked); + pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked); + + /* Maybe is was blocked already? */ + sigpipe_unblock = ! sigismember(&blocked, SIGPIPE); + } + #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ + } + + static void restore_sigpipe() + { + #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) + /* + If SIGPIPE was pending already we do nothing. Otherwise, if it + become pending (i.e., we generated it), then we sigwait() it (thus + clearing pending status). Then we unblock SIGPIPE, but only if it + were us who blocked it. + */ + if (!sigpipe_pending) { + sigset_t pending; + sigemptyset(&pending); + sigpending(&pending); + if (sigismember(&pending, SIGPIPE)) { + /* + Protect ourselves from a situation when SIGPIPE was sent + by the user to the whole process, and was delivered to + other thread before we had a chance to wait for it. + */ + static const struct timespec nowait = { 0, 0 }; + TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait)); + } + + if (sigpipe_unblock) + pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL); + } + #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ + } + + // return the sent length + // < 0 means error occured + static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more) + { + suppress_sigpipe(); + + ssize_t sent = 0; + while (1) { + ssize_t r; + #if defined(MSG_NOSIGNAL) + r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); + #else + r = ::sendmsg(fd, &msg, (more ? MSG_MORE : 0)); + #endif /* defined(MSG_NOSIGNAL) */ + + if (r < 0) { + if (errno == EINTR) { + continue; + } else if (errno == EAGAIN) { + break; + } + return -errno; + } + + sent += r; + if (len == sent) break; + + while (r > 0) { + if (msg.msg_iov[0].iov_len <= (size_t)r) { + // drain this whole item + r -= msg.msg_iov[0].iov_len; + msg.msg_iov++; + msg.msg_iovlen--; + } else { + msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r; + msg.msg_iov[0].iov_len -= r; + break; + } + } + } + restore_sigpipe(); + return (ssize_t)sent; + } + + virtual ssize_t send(bufferlist &bl, bool more) { + ssize_t sent_bytes = 0; + std::list::const_iterator pb = bl.buffers().begin(); + uint64_t left_pbrs = bl.buffers().size(); + while (left_pbrs) { + struct msghdr msg; + struct iovec msgvec[IOV_MAX]; + uint64_t size = MIN(left_pbrs, IOV_MAX); + left_pbrs -= size; + memset(&msg, 0, sizeof(msg)); + msg.msg_iovlen = 0; + msg.msg_iov = msgvec; + unsigned msglen = 0; + while (size > 0) { + msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()); + msgvec[msg.msg_iovlen].iov_len = pb->length(); + msg.msg_iovlen++; + msglen += pb->length(); + ++pb; + size--; + } + + ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more); + if (r < 0) + return r; + + // "r" is the remaining length + sent_bytes += r; + if (r < msglen) + break; + // only "r" == 0 continue + } + + if (sent_bytes) { + bufferlist swapped; + if (sent_bytes < bl.length()) { + bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped); + bl.swap(swapped); + } else { + bl.clear(); + } + } + + return sent_bytes; + } + virtual void shutdown() { + ::shutdown(_fd, SHUT_RDWR); + } + virtual void close() { + ::close(_fd); + } + virtual int fd() const override { + return _fd; + } + friend class PosixServerSocketImpl; + friend class PosixNetworkStack; +}; + +class PosixServerSocketImpl : public ServerSocketImpl { + NetHandler &handler; + entity_addr_t sa; + int _fd; + + public: + explicit PosixServerSocketImpl(NetHandler &h, const entity_addr_t &sa, int f): handler(h), sa(sa), _fd(f) {} + virtual int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out) override; + virtual void abort_accept() override { + ::close(_fd); + } + virtual int fd() const override { + return _fd; + } +}; + +int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) { + assert(sock); + sockaddr_storage ss; + socklen_t slen = sizeof(ss); + int sd = ::accept(_fd, (sockaddr*)&ss, &slen); + if (sd < 0) { + return -errno; + } + + handler.set_close_on_exec(sd); + int r = handler.set_nonblock(sd); + if (r < 0) { + ::close(sd); + return -errno; + } + + r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size); + if (r < 0) { + ::close(sd); + return -errno; + } + + std::unique_ptr csi(new PosixConnectedSocketImpl(handler, *out, sd, true)); + *sock = ConnectedSocket(std::move(csi)); + if (out) + out->set_sockaddr((sockaddr*)&ss); + return 0; +} + +void PosixWorker::initialize() +{ +} + +int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt, + ServerSocket *sock) +{ + int listen_sd = net.create_socket(sa.get_family(), true); + if (listen_sd < 0) { + return -errno; + } + + int r = net.set_nonblock(listen_sd); + if (r < 0) { + ::close(listen_sd); + return -errno; + } + + net.set_close_on_exec(listen_sd); + r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size); + if (r < 0) { + ::close(listen_sd); + return -errno; + } + + r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len()); + if (r < 0) { + r = -errno; + lderr(cct) << __func__ << " unable to bind to " << sa.get_sockaddr() + << ": " << cpp_strerror(r) << dendl; + ::close(listen_sd); + return r; + } + + r = ::listen(listen_sd, 128); + if (r < 0) { + r = -errno; + lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl; + ::close(listen_sd); + return r; + } + + *sock = ServerSocket( + std::unique_ptr( + new PosixServerSocketImpl(net, sa, listen_sd))); + return 0; +} + +int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { + int sd; + + if (opts.nonblock) { + sd = net.nonblock_connect(addr); + } else { + sd = net.connect(addr); + } + + if (sd < 0) { + ::close(sd); + return -errno; + } + + *socket = ConnectedSocket( + std::unique_ptr(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock))); + return 0; +} + +PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t) + : NetworkStack(c, t) +{ + vector corestrs; + get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs); + for (auto & corestr : corestrs) { + string err; + int coreid = strict_strtol(corestr.c_str(), 10, &err); + if (err == "") + coreids.push_back(coreid); + else + lderr(cct) << __func__ << " failed to parse " << corestr << " in " << cct->_conf->ms_async_affinity_cores << dendl; + } +} diff --git a/src/msg/async/PosixStack.h b/src/msg/async/PosixStack.h new file mode 100644 index 0000000000000..eeb7f318c8dd0 --- /dev/null +++ b/src/msg/async/PosixStack.h @@ -0,0 +1,62 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_ASYNC_POSIXSTACK_H +#define CEPH_MSG_ASYNC_POSIXSTACK_H + +#include + +#include "msg/msg_types.h" +#include "msg/async/net_handler.h" + +#include "Stack.h" + +class PosixWorker : public Worker { + NetHandler net; + std::thread t; + virtual void initialize(); + public: + PosixWorker(CephContext *c, unsigned i) + : Worker(c, i), net(c) {} + virtual int listen(entity_addr_t &sa, const SocketOptions &opt, + ServerSocket *socks) override; + virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; +}; + +class PosixNetworkStack : public NetworkStack { + vector coreids; + vector threads; + + public: + explicit PosixNetworkStack(CephContext *c, const string &t); + + int get_cpuid(int id) const { + if (coreids.empty()) + return -1; + return coreids[id % coreids.size()]; + } + virtual void spawn_workers(std::vector> &funcs) override { + for (unsigned i = threads.size(); i < funcs.size(); ++i) + threads.emplace_back(std::thread(std::move(funcs[i]))); + } + virtual void join_workers() override { + for (auto &&t : threads) + t.join(); + threads.clear(); + } +}; + +#endif //CEPH_MSG_ASYNC_POSIXSTACK_H diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc index 9b27b0ec0c1f3..6755d1e8af5b5 100644 --- a/src/msg/async/Stack.cc +++ b/src/msg/async/Stack.cc @@ -16,6 +16,7 @@ #include "common/Cond.h" #include "common/errno.h" +#include "PosixStack.h" #include "common/dout.h" #include "include/assert.h" @@ -53,11 +54,16 @@ void NetworkStack::add_thread(unsigned i) std::shared_ptr NetworkStack::create(CephContext *c, const string &t) { + if (t == "posix") + return std::make_shared(c, t); + return nullptr; } Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i) { + if (type == "posix") + return new PosixWorker(c, i); return nullptr; } From 57629c67007817eff0ee73c1552d55667cb25ca0 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 13 Jul 2016 15:58:25 +0800 Subject: [PATCH 03/16] msg/async/net_handler: let set_socket_options return value Signed-off-by: Haomai Wang --- src/msg/async/net_handler.cc | 10 ++++++---- src/msg/async/net_handler.h | 5 ++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/msg/async/net_handler.cc b/src/msg/async/net_handler.cc index d092a6609926f..c4c2bfe4b09ab 100644 --- a/src/msg/async/net_handler.cc +++ b/src/msg/async/net_handler.cc @@ -89,19 +89,20 @@ void NetHandler::set_close_on_exec(int sd) } } -void NetHandler::set_socket_options(int sd, bool nodelay, int size) +int NetHandler::set_socket_options(int sd, bool nodelay, int size) { + int r = 0; // disable Nagle algorithm? if (nodelay) { int flag = 1; - int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); + r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); if (r < 0) { r = -errno; ldout(cct, 0) << "couldn't set TCP_NODELAY: " << cpp_strerror(r) << dendl; } } if (size) { - int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size)); + r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size)); if (r < 0) { r = -errno; ldout(cct, 0) << "couldn't set SO_RCVBUF to " << size << ": " << cpp_strerror(r) << dendl; @@ -111,12 +112,13 @@ void NetHandler::set_socket_options(int sd, bool nodelay, int size) // block ESIGPIPE #ifdef SO_NOSIGPIPE int val = 1; - int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val)); + r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val)); if (r) { r = -errno; ldout(cct,0) << "couldn't set SO_NOSIGPIPE: " << cpp_strerror(r) << dendl; } #endif + return r; } void NetHandler::set_priority(int sd, int prio) diff --git a/src/msg/async/net_handler.h b/src/msg/async/net_handler.h index eb43f54f7bb4f..311276dba8bab 100644 --- a/src/msg/async/net_handler.h +++ b/src/msg/async/net_handler.h @@ -20,16 +20,15 @@ namespace ceph { class NetHandler { - private: - int create_socket(int domain, bool reuse_addr=false); int generic_connect(const entity_addr_t& addr, bool nonblock); CephContext *cct; public: + int create_socket(int domain, bool reuse_addr=false); explicit NetHandler(CephContext *c): cct(c) {} int set_nonblock(int sd); void set_close_on_exec(int sd); - void set_socket_options(int sd, bool nodelay, int size); + int set_socket_options(int sd, bool nodelay, int size); int connect(const entity_addr_t &addr); /** From d0cd88b3ee805854c6c0c5272f14793e95211b43 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 13 Jul 2016 15:59:12 +0800 Subject: [PATCH 04/16] msg/async/AsyncConnection: support NetworkStack api instead of posix 1. replace sd to ConnectedSocket 2. Replace WorkerPool with Stack 3. Use Stack worker Signed-off-by: Haomai Wang --- src/common/config_opts.h | 5 +- src/msg/async/AsyncConnection.cc | 303 +++++---------------- src/msg/async/AsyncConnection.h | 26 +- src/msg/async/AsyncMessenger.cc | 440 ++++++++----------------------- src/msg/async/AsyncMessenger.h | 74 +----- 5 files changed, 192 insertions(+), 656 deletions(-) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 72cee895d64e7..b42bab3ffdd1c 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -197,8 +197,9 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1] OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at -OPTION(ms_async_op_threads, OPT_INT, 3) // number of worker processing threads for async messenger created on init -OPTION(ms_async_max_op_threads, OPT_INT, 5) // max number of worker processing threads for async messenger +OPTION(ms_async_transport_type, OPT_STR, "posix") +OPTION(ms_async_op_threads, OPT_U64, 3) // number of worker processing threads for async messenger created on init +OPTION(ms_async_max_op_threads, OPT_U64, 5) // max number of worker processing threads for async messenger OPTION(ms_async_set_affinity, OPT_BOOL, true) // example: ms_async_affinity_cores = 0,1 // The number of coreset is expected to equal to ms_async_op_threads, otherwise diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 80756a0e27825..5012e92df0328 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -14,8 +14,6 @@ * */ -#include -#include #include #include "include/Context.h" @@ -23,8 +21,6 @@ #include "AsyncMessenger.h" #include "AsyncConnection.h" -#include "include/sock_compat.h" - // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR #define SEQ_MASK 0x7fffffff @@ -32,8 +28,9 @@ #undef dout_prefix #define dout_prefix _conn_prefix(_dout) ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { + int fd = cs ? cs.fd() : -1; return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this - << " sd=" << sd << " :" << port + << " sd=" << fd << " :" << port << " s=" << get_state_name(state) << " pgs=" << peer_global_seq << " cs=" << connect_seq @@ -122,7 +119,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu Worker *w) : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0), - out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1), + out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), port(-1), dispatch_queue(q), can_write(WriteStatus::NOWRITE), open_write(false), keepalive(false), recv_buf(NULL), recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), @@ -130,7 +127,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu last_active(ceph::coarse_mono_clock::now()), inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000), got_bad_auth(false), authorizer(NULL), replacing(false), - is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), + is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), worker(w), center(&w->center) { read_handler = new C_handle_read(this); @@ -167,203 +164,52 @@ void AsyncConnection::maybe_start_delay_thread() /* return -1 means `fd` occurs error or closed, it should be closed * return 0 means EAGAIN or EINTR */ -ssize_t AsyncConnection::read_bulk(int fd, char *buf, unsigned len) +ssize_t AsyncConnection::read_bulk(char *buf, unsigned len) { ssize_t nread; again: - nread = ::read(fd, buf, len); - if (nread == -1) { - if (errno == EAGAIN) { + nread = cs.read(buf, len); + if (nread < 0) { + if (nread == -EAGAIN) { nread = 0; - } else if (errno == EINTR) { + } else if (nread == -EINTR) { goto again; } else { - ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << fd - << " : "<< strerror(errno) << dendl; + ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd() + << " : "<< strerror(nread) << dendl; return -1; } } else if (nread == 0) { ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor " - << fd << dendl; + << cs.fd() << dendl; return -1; } return nread; } -/* - SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL - http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html - http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html -*/ -void AsyncConnection::suppress_sigpipe() -{ -#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) - /* - We want to ignore possible SIGPIPE that we can generate on write. - SIGPIPE is delivered *synchronously* and *only* to the thread - doing the write. So if it is reported as already pending (which - means the thread blocks it), then we do nothing: if we generate - SIGPIPE, it will be merged with the pending one (there's no - queuing), and that suits us well. If it is not pending, we block - it in this thread (and we avoid changing signal action, because it - is per-process). - */ - sigset_t pending; - sigemptyset(&pending); - sigpending(&pending); - sigpipe_pending = sigismember(&pending, SIGPIPE); - if (!sigpipe_pending) { - sigset_t blocked; - sigemptyset(&blocked); - pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked); - - /* Maybe is was blocked already? */ - sigpipe_unblock = ! sigismember(&blocked, SIGPIPE); - } -#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ -} - - -void AsyncConnection::restore_sigpipe() -{ -#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) - /* - If SIGPIPE was pending already we do nothing. Otherwise, if it - become pending (i.e., we generated it), then we sigwait() it (thus - clearing pending status). Then we unblock SIGPIPE, but only if it - were us who blocked it. - */ - if (!sigpipe_pending) { - sigset_t pending; - sigemptyset(&pending); - sigpending(&pending); - if (sigismember(&pending, SIGPIPE)) { - /* - Protect ourselves from a situation when SIGPIPE was sent - by the user to the whole process, and was delivered to - other thread before we had a chance to wait for it. - */ - static const struct timespec nowait = { 0, 0 }; - TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait)); - } - - if (sigpipe_unblock) - pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL); - } -#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ -} - -// return the length of msg needed to be sent, -// < 0 means error occured -ssize_t AsyncConnection::do_sendmsg(struct msghdr &msg, unsigned len, bool more) -{ - suppress_sigpipe(); - - while (len > 0) { - ssize_t r; -#if defined(MSG_NOSIGNAL) - r = ::sendmsg(sd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); -#else - r = ::sendmsg(sd, &msg, (more ? MSG_MORE : 0)); -#endif /* defined(MSG_NOSIGNAL) */ - - if (r == 0) { - ldout(async_msgr->cct, 10) << __func__ << " sendmsg got r==0!" << dendl; - } else if (r < 0) { - if (errno == EINTR) { - continue; - } else if (errno == EAGAIN) { - break; - } else { - ldout(async_msgr->cct, 1) << __func__ << " sendmsg error: " << cpp_strerror(errno) << dendl; - restore_sigpipe(); - return r; - } - } - - len -= r; - if (len == 0) break; - - // hrmph. drain r bytes from the front of our message. - ldout(async_msgr->cct, 20) << __func__ << " short write did " << r << ", still have " << len << dendl; - while (r > 0) { - if (msg.msg_iov[0].iov_len <= (size_t)r) { - // drain this whole item - r -= msg.msg_iov[0].iov_len; - msg.msg_iov++; - msg.msg_iovlen--; - } else { - msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r; - msg.msg_iov[0].iov_len -= r; - break; - } - } - } - restore_sigpipe(); - return (ssize_t)len; -} - // return the remaining bytes, it may larger than the length of ptr // else return < 0 means error ssize_t AsyncConnection::_try_send(bool more) { - if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; - ::shutdown(sd, SHUT_RDWR); - } - } - - uint64_t sent_bytes = 0; - list::const_iterator pb = outcoming_bl.buffers().begin(); - uint64_t left_pbrs = outcoming_bl.buffers().size(); - while (left_pbrs) { - struct msghdr msg; - uint64_t size = MIN(left_pbrs, ASYNC_IOV_MAX); - left_pbrs -= size; - memset(&msg, 0, sizeof(msg)); - msg.msg_iovlen = 0; - msg.msg_iov = msgvec; - unsigned msglen = 0; - while (size > 0) { - msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()); - msgvec[msg.msg_iovlen].iov_len = pb->length(); - msg.msg_iovlen++; - msglen += pb->length(); - ++pb; - size--; + cs.shutdown(); } - - ssize_t r = do_sendmsg(msg, msglen, left_pbrs || more); - if (r < 0) - return r; - - // "r" is the remaining length - sent_bytes += msglen - r; - if (r > 0) { - ldout(async_msgr->cct, 5) << __func__ << " remaining " << r - << " needed to be sent, creating event for writing" - << dendl; - break; - } - // only "r" == 0 continue } - // trim already sent for outcoming_bl - if (sent_bytes) { - if (sent_bytes < outcoming_bl.length()) { - outcoming_bl.splice(0, sent_bytes); - } else { - outcoming_bl.clear(); - } + ssize_t r = cs.send(outcoming_bl, more); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl; + return r; } - ldout(async_msgr->cct, 20) << __func__ << " sent bytes " << sent_bytes + ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r << " remaining bytes " << outcoming_bl.length() << dendl; if (!open_write && is_queued()) { if (center->in_thread()) { - center->create_file_event(sd, EVENT_WRITABLE, write_handler); + center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler); open_write = true; } else { center->dispatch_event_external(write_handler); @@ -372,7 +218,7 @@ ssize_t AsyncConnection::_try_send(bool more) if (open_write && !is_queued()) { if (center->in_thread()) { - center->delete_file_event(sd, EVENT_WRITABLE); + center->delete_file_event(cs.fd(), EVENT_WRITABLE); open_write = false; } else { center->dispatch_event_external(write_handler); @@ -398,10 +244,10 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p) ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is " << state_offset << dendl; - if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; - ::shutdown(sd, SHUT_RDWR); + cs.shutdown(); } } @@ -426,7 +272,7 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p) if (len > recv_max_prefetch) { /* this was a large read, we don't prefetch for these */ do { - r = read_bulk(sd, p+state_offset, left); + r = read_bulk(p+state_offset, left); ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl; if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl; @@ -440,7 +286,7 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p) } while (r > 0); } else { do { - r = read_bulk(sd, recv_buf+recv_end, recv_max_prefetch); + r = read_bulk(recv_buf+recv_end, recv_max_prefetch); ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end << " left is " << left << " got " << r << dendl; if (r < 0) { @@ -1011,35 +857,34 @@ ssize_t AsyncConnection::_process_connection() global_seq = async_msgr->get_global_seq(); // close old socket. this is safe because we stopped the reader thread above. - if (sd >= 0) { - center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); - ::close(sd); + if (cs) { + center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); + cs.close(); } - sd = net.nonblock_connect(get_peer_addr()); - if (sd < 0) { + SocketOptions opts; + r = worker->connect(get_peer_addr(), opts, &cs); + if (r < 0) goto fail; - } - center->create_file_event(sd, EVENT_READABLE, read_handler); + center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); state = STATE_CONNECTING_RE; break; } case STATE_CONNECTING_RE: { - r = net.reconnect(get_peer_addr(), sd); + r = cs.is_connected(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl; goto fail; - } else if (r > 0) { + } else if (r == 0) { ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl; - center->create_file_event(sd, EVENT_WRITABLE, read_handler); + center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler); break; } - center->delete_file_event(sd, EVENT_WRITABLE); - net.set_priority(sd, async_msgr->get_socket_priority()); + center->delete_file_event(cs.fd(), EVENT_WRITABLE); ldout(async_msgr->cct, 10) << __func__ << " connect successfully, ready to send banner" << dendl; bufferlist bl; @@ -1092,7 +937,7 @@ ssize_t AsyncConnection::_process_connection() goto fail; } ldout(async_msgr->cct, 20) << __func__ << " connect read peer addr " - << paddr << " on socket " << sd << dendl; + << paddr << " on socket " << cs.fd() << dendl; if (peer_addr != paddr) { if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() && peer_addr.get_nonce() == paddr.get_nonce()) { @@ -1139,7 +984,7 @@ ssize_t AsyncConnection::_process_connection() << async_msgr->get_myaddr() << dendl; } else { ldout(async_msgr->cct, 2) << __func__ << " connect couldn't write my addr, " - << cpp_strerror(errno) << dendl; + << cpp_strerror(r) << dendl; goto fail; } @@ -1185,7 +1030,7 @@ ssize_t AsyncConnection::_process_connection() ldout(async_msgr->cct, 10) << __func__ << " continue send reply " << dendl; } else { ldout(async_msgr->cct, 2) << __func__ << " connect couldn't send reply " - << cpp_strerror(errno) << dendl; + << cpp_strerror(r) << dendl; goto fail; } @@ -1340,30 +1185,14 @@ ssize_t AsyncConnection::_process_connection() case STATE_ACCEPTING: { bufferlist bl; - - if (net.set_nonblock(sd) < 0) - goto fail; - - net.set_socket_options(sd, async_msgr->cct->_conf->ms_tcp_nodelay, async_msgr->cct->_conf->ms_tcp_rcvbuf); - net.set_priority(sd, async_msgr->get_socket_priority()); - center->create_file_event(sd, EVENT_READABLE, read_handler); + center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); bl.append(CEPH_BANNER, strlen(CEPH_BANNER)); ::encode(async_msgr->get_myaddr(), bl, 0); // legacy port = async_msgr->get_myaddr().get_port(); - // and peer's socket addr (they might not know their ip) - sockaddr_storage ss; - socklen_t len = sizeof(ss); - r = ::getpeername(sd, (sockaddr*)&ss, &len); - if (r < 0) { - ldout(async_msgr->cct, 0) << __func__ << " failed to getpeername " - << cpp_strerror(errno) << dendl; - goto fail; - } - socket_addr.set_sockaddr((sockaddr*)&ss); ::encode(socket_addr, bl, 0); // legacy - ldout(async_msgr->cct, 1) << __func__ << " sd=" << sd << " " << socket_addr << dendl; + ldout(async_msgr->cct, 1) << __func__ << " sd=" << cs.fd() << " " << socket_addr << dendl; r = try_send(bl); if (r == 0) { @@ -1813,7 +1642,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->is_reset_from_peer = true; } - center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); + center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); // Clean up output buffer existing->outcoming_bl.clear(); @@ -1824,13 +1653,13 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->requeue_sent(); existing->reset_recv_state(); - int new_fd = sd; + auto temp_cs = std::move(cs); EventCenter *new_center = center; Worker *new_worker = worker; // avoid _stop shutdown replacing socket - sd = -1; // queue a reset on the new connection, which we're dumping for the old _stop(); + dispatch_queue->queue_reset(this); ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl; existing->can_write = WriteStatus::REPLACING; @@ -1844,25 +1673,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // there shouldn't exist any buffer assert(recv_start == recv_end); - existing->write_lock.unlock(); - // new sd now isn't registered any event while origin events - // have been deleted. - // previous existing->sd now is still open, event will continue to - // notify previous existing->center from now. - // From now, no one will dispatch event to `existing` - // Note: we must use async dispatch instead of execute this inline - // even existing->center == center. Because we must ensure below - // event executed after all pending external events like - // "dispatch_state->queue" - existing->center->submit_to( - existing->center->get_id(), - [existing, new_fd, new_worker, new_center, connect, reply, authorizer_reply]() mutable { + auto deactivate_existing = std::bind( + [existing, new_worker, new_center, connect, reply, authorizer_reply](ConnectedSocket &cs) mutable { // we need to delete time event in original thread { std::lock_guard l(existing->lock); if (existing->state == STATE_NONE) { existing->shutdown_socket(); - existing->sd = new_fd; + existing->cs = std::move(cs); existing->worker->references--; new_worker->references++; existing->logger = new_worker->get_perf_counter(); @@ -1871,7 +1689,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis if (existing->delay_state) existing->delay_state->set_center(new_center); } else if (existing->state == STATE_CLOSED) { - ::close(new_fd); + cs.close(); return ; } else { assert(0); @@ -1881,15 +1699,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // Before changing existing->center, it may already exists some events in existing->center's queue. // Then if we mark down `existing`, it will execute in another thread and clean up connection. // Previous event will result in segment fault - auto transfer_existing = [existing, new_fd, connect, reply, authorizer_reply]() mutable { + auto transfer_existing = [existing, connect, reply, authorizer_reply]() mutable { std::lock_guard l(existing->lock); if (existing->state == STATE_CLOSED) return ; - assert(new_fd == existing->sd); assert(existing->state == STATE_NONE); - + existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG; - existing->center->create_file_event(existing->sd, EVENT_READABLE, existing->read_handler); + existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler); reply.global_seq = existing->peer_global_seq; if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { // handle error @@ -1901,8 +1718,11 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis else existing->center->submit_to( existing->center->get_id(), std::move(transfer_existing), true); - }, true); + }, std::move(temp_cs)); + existing->center->submit_to( + existing->center->get_id(), std::move(deactivate_existing), true); + existing->write_lock.unlock(); existing->lock.unlock(); return 0; } @@ -2012,13 +1832,14 @@ void AsyncConnection::_connect() center->dispatch_event_external(read_handler); } -void AsyncConnection::accept(int incoming) +void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr) { - ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl; - assert(sd < 0); + ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() << dendl; + assert(socket.fd() > 0); std::lock_guard l(lock); - sd = incoming; + cs = std::move(socket); + socket_addr = addr; state = STATE_ACCEPTING; // rescheduler connection in order to avoid lock dep center->dispatch_event_external(read_handler); @@ -2407,7 +2228,7 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) ssize_t rc = _try_send(more); if (rc < 0) { ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", " - << cpp_strerror(errno) << dendl; + << cpp_strerror(rc) << dendl; } else if (rc == 0) { ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl; } else { @@ -2617,7 +2438,7 @@ void AsyncConnection::handle_write() if (state == STATE_STANDBY && !policy.server && is_queued()) { ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl; _connect(); - } else if (sd >= 0 && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) { + } else if (cs && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) { r = _try_send(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 3988bf9a7447b..9d6c3435eef64 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -34,7 +34,7 @@ using namespace std; #include "msg/Messenger.h" #include "Event.h" -#include "net_handler.h" +#include "Stack.h" class AsyncMessenger; class Worker; @@ -50,9 +50,7 @@ static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); */ class AsyncConnection : public Connection { - ssize_t read_bulk(int fd, char *buf, unsigned len); - void suppress_sigpipe(); - void restore_sigpipe(); + ssize_t read_bulk(char *buf, unsigned len); ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more); ssize_t try_send(bufferlist &bl, bool more=false) { std::lock_guard l(write_lock); @@ -110,11 +108,10 @@ class AsyncConnection : public Connection { center->delete_time_event(last_tick_id); last_tick_id = 0; } - if (sd >= 0) { - center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); - ::shutdown(sd, SHUT_RDWR); - ::close(sd); - sd = -1; + if (cs) { + center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); + cs.shutdown(); + cs.close(); } } Message *_get_next_outgoing(bufferlist *bl) { @@ -209,7 +206,7 @@ class AsyncConnection : public Connection { _connect(); } // Only call when AsyncConnection first construct - void accept(int sd); + void accept(ConnectedSocket socket, entity_addr_t &addr); int send_message(Message *m) override; void send_keepalive() override; @@ -303,7 +300,7 @@ class AsyncConnection : public Connection { atomic64_t ack_left, in_seq; int state; int state_after_send; - int sd; + ConnectedSocket cs; int port; Messenger::Policy policy; @@ -372,17 +369,10 @@ class AsyncConnection : public Connection { char *state_buffer; // used only by "read_until" uint64_t state_offset; - NetHandler net; Worker *worker; EventCenter *center; ceph::shared_ptr session_security; -#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) - sigset_t sigpipe_mask; - bool sigpipe_pending; - bool sigpipe_unblock; -#endif - public: // used by eventcallback void handle_write(); diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 53eff733e2703..95ec0ebd1409f 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -16,19 +16,14 @@ #include "acconfig.h" -#include #include #include #include "AsyncMessenger.h" -#include "include/str_list.h" -#include "common/strtol.h" #include "common/config.h" #include "common/Timer.h" #include "common/errno.h" -#include "auth/Crypto.h" -#include "include/Spinlock.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix @@ -41,14 +36,6 @@ static ostream& _prefix(std::ostream *_dout, Processor *p) { return *_dout << " Processor -- "; } -static ostream& _prefix(std::ostream *_dout, Worker *w) { - return *_dout << " Worker -- "; -} - -static ostream& _prefix(std::ostream *_dout, WorkerPool *p) { - return *_dout << " WorkerPool -- "; -} - /******************* * Processor @@ -64,13 +51,9 @@ class Processor::C_processor_accept : public EventCallback { } }; -Processor::Processor(AsyncMessenger *r, CephContext *c, uint64_t n) - : msgr(r), - net(c), - worker(NULL), - listen_sd(-1), - nonce(n), - listen_handler(new C_processor_accept(this)) {} +Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n) + : msgr(r), net(c), worker(w), nonce(n), + listen_handler(new C_processor_accept(this)) {} int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) { @@ -90,30 +73,16 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET; } - /* socket creation */ - listen_sd = ::socket(family, SOCK_STREAM, 0); - if (listen_sd < 0) { - lderr(msgr->cct) << __func__ << " unable to create socket: " - << cpp_strerror(errno) << dendl; - return -errno; - } - - int r = net.set_nonblock(listen_sd); - if (r < 0) { - ::close(listen_sd); - listen_sd = -1; - return r; - } - net.set_close_on_exec(listen_sd); - net.set_socket_options(listen_sd, msgr->cct->_conf->ms_tcp_nodelay, msgr->cct->_conf->ms_tcp_rcvbuf); + SocketOptions opts; + opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; + opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; // use whatever user specified (if anything) entity_addr_t listen_addr = bind_addr; listen_addr.set_family(family); /* bind to port */ - int rc = -1; - r = -1; + int r = -1; for (int i = 0; i < conf->ms_bind_retry_count; i++) { if (i > 0) { @@ -123,22 +92,12 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) } if (listen_addr.get_port()) { - // specific port - // reuse addr+port when possible - int on = 1; - rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); - if (rc < 0) { - lderr(msgr->cct) << __func__ << " unable to setsockopt: " << cpp_strerror(errno) << dendl; - r = -errno; - continue; - } - - rc = ::bind(listen_sd, listen_addr.get_sockaddr(), - listen_addr.get_sockaddr_len()); - if (rc < 0) { + worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() { + r = worker->listen(listen_addr, opts, &listen_socket); + }, false); + if (r < 0) { lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr - << ": " << cpp_strerror(errno) << dendl; - r = -errno; + << ": " << cpp_strerror(r) << dendl; continue; } } else { @@ -148,60 +107,34 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) continue; listen_addr.set_port(port); - rc = ::bind(listen_sd, listen_addr.get_sockaddr(), - listen_addr.get_sockaddr_len()); - if (rc == 0) + worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() { + r = worker->listen(listen_addr, opts, &listen_socket); + }, false); + if (r == 0) break; } - if (rc < 0) { + if (r < 0) { lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr << " on any port in range " << msgr->cct->_conf->ms_bind_port_min << "-" << msgr->cct->_conf->ms_bind_port_max << ": " - << cpp_strerror(errno) << dendl; - r = -errno; + << cpp_strerror(r) << dendl; listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again. continue; } ldout(msgr->cct, 10) << __func__ << " bound on random port " << listen_addr << dendl; } - if (rc == 0) + if (r == 0) break; } // It seems that binding completely failed, return with that exit status - if (rc < 0) { + if (r < 0) { lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count - << " attempts: " << cpp_strerror(errno) << dendl; - ::close(listen_sd); - listen_sd = -1; + << " attempts: " << cpp_strerror(r) << dendl; return r; } - // what port did we get? - sockaddr_storage ss; - socklen_t llen = sizeof(ss); - rc = getsockname(listen_sd, (sockaddr*)&ss, &llen); - if (rc < 0) { - rc = -errno; - lderr(msgr->cct) << __func__ << " failed getsockname: " << cpp_strerror(rc) << dendl; - ::close(listen_sd); - listen_sd = -1; - return rc; - } - listen_addr.set_sockaddr((sockaddr*)&ss); - ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl; - // listen! - rc = ::listen(listen_sd, 128); - if (rc < 0) { - rc = -errno; - lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr - << ": " << cpp_strerror(rc) << dendl; - ::close(listen_sd); - listen_sd = -1; - return rc; - } - msgr->set_myaddr(bind_addr); if (bind_addr != entity_addr_t()) msgr->learned_addr(bind_addr); @@ -237,47 +170,48 @@ int Processor::rebind(const set& avoid_ports) return bind(addr, new_avoid); } -void Processor::start(Worker *w) +void Processor::start() { - ldout(msgr->cct, 1) << __func__ << " " << dendl; + ldout(msgr->cct, 1) << __func__ << dendl; // start thread - if (listen_sd >= 0) { - worker = w; + if (listen_socket) { worker->center.submit_to(worker->center.get_id(), [this]() { - worker->center.create_file_event(listen_sd, EVENT_READABLE, listen_handler); }); + worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false); } } void Processor::accept() { - ldout(msgr->cct, 10) << __func__ << " listen_sd=" << listen_sd << dendl; + ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() << dendl; + SocketOptions opts; + opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; + opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; while (true) { - sockaddr_storage ss; - socklen_t slen = sizeof(ss); - int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen); - if (sd >= 0) { - net.set_close_on_exec(sd); - ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl; - - msgr->add_accept(sd); + entity_addr_t addr; + ConnectedSocket cli_socket; + int r = listen_socket.accept(&cli_socket, opts, &addr); + if (r == 0) { + ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl; + + msgr->add_accept(worker, std::move(cli_socket), addr); continue; } else { - if (errno == EINTR) { + if (r == -EINTR) { continue; - } else if (errno == EAGAIN) { + } else if (r == -EAGAIN) { break; - } else if (errno == EMFILE || errno == ENFILE) { - lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << sd - << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } else if (r == -EMFILE || r == -ENFILE) { + lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd() + << " errno " << r << " " << cpp_strerror(r) << dendl; break; - } else if (errno == ECONNABORTED) { - ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << sd - << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } else if (r == -ECONNABORTED) { + ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd() + << " errno " << r << " " << cpp_strerror(r) << dendl; continue; } else { - lderr(msgr->cct) << __func__ << " no incoming connection? sd = " << sd - << " errno " << errno << " " << cpp_strerror(errno) << dendl; + lderr(msgr->cct) << __func__ << " no incoming connection?" + << " errno " << r << " " << cpp_strerror(r) << dendl; break; } } @@ -288,208 +222,25 @@ void Processor::stop() { ldout(msgr->cct,10) << __func__ << dendl; - if (listen_sd >= 0) { + if (listen_socket) { worker->center.submit_to(worker->center.get_id(), [this]() { - worker->center.delete_file_event(listen_sd, EVENT_READABLE); - ::shutdown(listen_sd, SHUT_RDWR); - ::close(listen_sd); - listen_sd = -1; - }); - } -} - -void Worker::stop() -{ - ldout(cct, 10) << __func__ << dendl; - done = true; - center.wakeup(); -} - -class WorkerPool { - CephContext *cct; - vector workers; - vector coreids; - // Used to indicate whether thread started - bool started; - Mutex barrier_lock; - Cond barrier_cond; - atomic_t barrier_count; - simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER; - - class C_barrier : public EventCallback { - WorkerPool *pool; - public: - explicit C_barrier(WorkerPool *p): pool(p) {} - void do_request(int id) { - Mutex::Locker l(pool->barrier_lock); - pool->barrier_count.dec(); - pool->barrier_cond.Signal(); - delete this; - } - }; - friend class C_barrier; - public: - std::atomic_uint pending; - explicit WorkerPool(CephContext *c); - WorkerPool(const WorkerPool &) = delete; - WorkerPool& operator=(const WorkerPool &) = delete; - virtual ~WorkerPool(); - void start(); - Worker *get_worker(); - int get_cpuid(int id) { - if (coreids.empty()) - return -1; - return coreids[id % coreids.size()]; - } - void barrier(); -}; - -void *Worker::entry() -{ - ldout(cct, 10) << __func__ << " starting" << dendl; - if (cct->_conf->ms_async_set_affinity) { - int cid = pool->get_cpuid(id); - if (cid >= 0 && set_affinity(cid)) { - ldout(cct, 0) << __func__ << " sched_setaffinity failed: " - << cpp_strerror(errno) << dendl; - } - } - - center.set_owner(); - pool->pending--; - while (!done) { - ldout(cct, 20) << __func__ << " calling event process" << dendl; - - int r = center.process_events(EventMaxWaitUs); - if (r < 0) { - ldout(cct, 20) << __func__ << " process events failed: " - << cpp_strerror(errno) << dendl; - // TODO do something? - } - } - - return 0; -} - -/******************* - * WorkerPool - *******************/ -WorkerPool::WorkerPool(CephContext *c): cct(c), started(false), - barrier_lock("WorkerPool::WorkerPool::barrier_lock"), - barrier_count(0), pending(0) -{ - assert(cct->_conf->ms_async_op_threads > 0); - // make sure user won't try to force some crazy number of worker threads - assert(cct->_conf->ms_async_max_op_threads >= cct->_conf->ms_async_op_threads && - cct->_conf->ms_async_op_threads <= 32); - for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) { - Worker *w = new Worker(cct, this, i); - workers.push_back(w); - } - vector corestrs; - get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs); - for (vector::iterator it = corestrs.begin(); - it != corestrs.end(); ++it) { - string err; - int coreid = strict_strtol(it->c_str(), 10, &err); - if (err == "") - coreids.push_back(coreid); - else - lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->ms_async_affinity_cores << dendl; - } - -} - -WorkerPool::~WorkerPool() -{ - for (uint64_t i = 0; i < workers.size(); ++i) { - if (workers[i]->is_started()) { - workers[i]->stop(); - workers[i]->join(); - } - delete workers[i]; - } -} - -void WorkerPool::start() -{ - if (!started) { - for (uint64_t i = 0; i < workers.size(); ++i) { - pending++; - workers[i]->create("ms_async_worker"); - } - started = true; + worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE); + listen_socket.abort_accept(); + }, false); } - while (pending) - usleep(50); } -Worker* WorkerPool::get_worker() -{ - ldout(cct, 10) << __func__ << dendl; - - // start with some reasonably large number - unsigned min_load = std::numeric_limits::max(); - Worker* current_best = nullptr; - - simple_spin_lock(&pool_spin); - // find worker with least references - // tempting case is returning on references == 0, but in reality - // this will happen so rarely that there's no need for special case. - for (auto p = workers.begin(); p != workers.end(); ++p) { - unsigned worker_load = (*p)->references.load(); - ldout(cct, 20) << __func__ << " Worker " << *p << " load: " << worker_load << dendl; - if (worker_load < min_load) { - current_best = *p; - min_load = worker_load; - } - } - // if minimum load exceeds amount of workers, make a new worker - // logic behind this is that we're not going to create new worker - // just because others have *some* load, we'll defer worker creation - // until others have *plenty* of load. This will cause new worker - // to get assigned to all new connections *unless* one or more - // of workers get their load reduced - in that case, this worker - // will be assigned to new connection. - // TODO: add more logic and heuristics, so connections known to be - // of light workload (heartbeat service, etc.) won't overshadow - // heavy workload (clients, etc). - if (!current_best || ((workers.size() < (unsigned)cct->_conf->ms_async_max_op_threads) - && (min_load > workers.size()))) { - ldout(cct, 20) << __func__ << " creating worker" << dendl; - current_best = new Worker(cct, this, workers.size()); - workers.push_back(current_best); - pending++; - current_best->create("ms_async_worker"); - } else { - ldout(cct, 20) << __func__ << " picked " << current_best - << " as best worker with load " << min_load << dendl; +struct StackSingleton { + std::shared_ptr stack; + StackSingleton(CephContext *c) { + stack = NetworkStack::create(c, c->_conf->ms_async_transport_type); } - - ++current_best->references; - simple_spin_unlock(&pool_spin); - - while (pending) - usleep(50); - assert(current_best); - return current_best; -} - -void WorkerPool::barrier() -{ - ldout(cct, 10) << __func__ << " started." << dendl; - for (vector::iterator it = workers.begin(); it != workers.end(); ++it) { - barrier_count.inc(); - (*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this))); + ~StackSingleton() { + stack->stop(); } - ldout(cct, 10) << __func__ << " wait for " << barrier_count.read() << " barrier" << dendl; - Mutex::Locker l(barrier_lock); - while (barrier_count.read()) - barrier_cond.Wait(barrier_lock); +}; - ldout(cct, 10) << __func__ << " end." << dendl; -} class C_handle_reap : public EventCallback { AsyncMessenger *msgr; @@ -509,7 +260,6 @@ class C_handle_reap : public EventCallback { AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, string mname, uint64_t _nonce, uint64_t features) : SimplePolicyMessenger(cct, name,mname, _nonce), - processor(this, cct, _nonce), dispatch_queue(cct, this, mname), lock("AsyncMessenger::lock"), nonce(_nonce), need_addr(true), did_bind(false), @@ -517,13 +267,20 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, cluster_protocol(0), stopped(true) { ceph_spin_init(&global_seq_lock); - // uniq name for CephContext to distinguish from other objects - cct->lookup_or_create_singleton_object(pool, "AsyncMessenger::WorkerPool"); - local_worker = pool->get_worker(); + StackSingleton *single; + cct->lookup_or_create_singleton_object(single, "AsyncMessenger::NetworkStack"); + stack = single->stack.get(); + stack->start(); + local_worker = stack->get_worker(); local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker); local_features = features; init_local_connection(); reap_handler = new C_handle_reap(this); + unsigned processor_num = 1; + if (stack->support_local_listen_table()) + processor_num = stack->get_num_worker(); + for (unsigned i = 0; i < processor_num; ++i) + processors.push_back(new Processor(this, stack->get_worker(i), cct, _nonce)); } /** @@ -535,6 +292,8 @@ AsyncMessenger::~AsyncMessenger() delete reap_handler; assert(!did_bind); // either we didn't bind or we shut down the Processor local_connection->mark_down(); + for (auto &&p : processors) + delete p; } void AsyncMessenger::ready() @@ -542,9 +301,8 @@ void AsyncMessenger::ready() ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; Mutex::Locker l(lock); - pool->start(); - Worker *w = pool->get_worker(); - processor.start(w); + for (auto &&p : processors) + p->start(); dispatch_queue.start(); } @@ -552,17 +310,18 @@ int AsyncMessenger::shutdown() { ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; + // done! clean up. + for (auto &&p : processors) + p->stop(); mark_down_all(); // break ref cycles on the loopback connection local_connection->set_priv(NULL); - // done! clean up. - processor.stop(); did_bind = false; lock.Lock(); stop_cond.Signal(); stopped = true; lock.Unlock(); - pool->barrier(); + stack->drain(); return 0; } @@ -580,7 +339,25 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr) // bind to a socket set avoid_ports; - int r = processor.bind(bind_addr, avoid_ports); + int r = 0; + unsigned i = 0; + for (auto &&p : processors) { + r = p->bind(bind_addr, avoid_ports); + if (r < 0) { + // Note: this is related to local tcp listen table problem. + // Posix(default kernel implementation) backend shares listen table + // in the kernel, so all threads can use the same listen table naturally + // and only one thread need to bind. But other backends(like dpdk) uses local + // listen table, we need to bind/listen tcp port for each worker. So if the + // first worker failed to bind, it could be think the normal error then handle + // it, like port is used case. But if the first worker successfully to bind + // but the second worker failed, it's not expected and we need to assert + // here + assert(i == 0); + break; + } + ++i; + } if (r >= 0) did_bind = true; return r; @@ -591,12 +368,20 @@ int AsyncMessenger::rebind(const set& avoid_ports) ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl; assert(did_bind); - processor.stop(); + for (auto &&p : processors) + p->stop(); mark_down_all(); - int r = processor.rebind(avoid_ports); - if (r == 0) { - Worker *w = pool->get_worker(); - processor.start(w); + unsigned i = 0; + int r = 0; + for (auto &&p : processors) { + r = p->rebind(avoid_ports); + if (r == 0) { + p->start(); + } else { + assert(i == 0); + break; + } + i++; } return r; } @@ -644,19 +429,20 @@ void AsyncMessenger::wait() // close all connections shutdown_connections(false); - pool->barrier(); + stack->drain(); ldout(cct, 10) << __func__ << ": done." << dendl; ldout(cct, 1) << __func__ << " complete." << dendl; started = false; } -AsyncConnectionRef AsyncMessenger::add_accept(int sd) +AsyncConnectionRef AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr) { lock.Lock(); - Worker *w = pool->get_worker(); + if (!stack->support_local_listen_table()) + w = stack->get_worker(); AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); - conn->accept(sd); + conn->accept(std::move(cli_socket), addr); accepting_conns.insert(conn); lock.Unlock(); return conn; @@ -671,7 +457,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int << ", creating connection and registering" << dendl; // create connection - Worker *w = pool->get_worker(); + Worker *w = stack->get_worker(); AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); conn->connect(addr, type); assert(!conns.count(addr)); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 8125cff2f91ce..47898f49d91d4 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -36,70 +36,9 @@ using namespace std; #include "include/assert.h" #include "AsyncConnection.h" #include "Event.h" -#include "common/simple_spin.h" class AsyncMessenger; -class WorkerPool; - -enum { - l_msgr_first = 94000, - l_msgr_recv_messages, - l_msgr_send_messages, - l_msgr_send_messages_inline, - l_msgr_recv_bytes, - l_msgr_send_bytes, - l_msgr_created_connections, - l_msgr_active_connections, - l_msgr_last, -}; - - -class Worker : public Thread { - static const uint64_t InitEventNumber = 5000; - static const uint64_t EventMaxWaitUs = 30000000; - CephContext *cct; - WorkerPool *pool; - bool done; - int id; - PerfCounters *perf_logger; - - public: - EventCenter center; - std::atomic_uint references; - Worker(CephContext *c, WorkerPool *p, int i) - : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { - center.init(InitEventNumber, i); - char name[128]; - sprintf(name, "AsyncMessenger::Worker-%d", id); - // initialize perf_logger - PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); - - plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); - plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); - plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages"); - plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); - plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); - - perf_logger = plb.create_perf_counters(); - cct->get_perfcounters_collection()->add(perf_logger); - } - ~Worker() { - if (perf_logger) { - cct->get_perfcounters_collection()->remove(perf_logger); - delete perf_logger; - } - } - void *entry(); - void stop(); - PerfCounters *get_perf_counter() { return perf_logger; } - void release_worker() { - int oldref = references.fetch_sub(1); - assert(oldref > 0); - } -}; /** * If the Messenger binds to a specific address, the Processor runs @@ -109,20 +48,20 @@ class Processor { AsyncMessenger *msgr; NetHandler net; Worker *worker; - int listen_sd; + ServerSocket listen_socket; uint64_t nonce; EventCallbackRef listen_handler; class C_processor_accept; public: - Processor(AsyncMessenger *r, CephContext *c, uint64_t n); + Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n); ~Processor() { delete listen_handler; }; void stop(); int bind(const entity_addr_t &bind_addr, const set& avoid_ports); int rebind(const set& avoid_port); - void start(Worker *w); + void start(); void accept(); }; @@ -276,9 +215,8 @@ class AsyncMessenger : public SimplePolicyMessenger { private: static const uint64_t ReapDeadConnectionThreshold = 5; - WorkerPool *pool; - - Processor processor; + NetworkStack *stack; + std::vector processors; friend class Processor; DispatchQueue dispatch_queue; @@ -413,7 +351,7 @@ class AsyncMessenger : public SimplePolicyMessenger { } void learned_addr(const entity_addr_t &peer_addr_for_me); - AsyncConnectionRef add_accept(int sd); + AsyncConnectionRef add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr); /** * This wraps ms_deliver_get_authorizer. We use it for AsyncConnection. From fca0de1f074926c1316261ac0feb83c0985e53ec Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Sun, 10 Jul 2016 00:18:22 +0800 Subject: [PATCH 05/16] msg/async/Event: debug event address when dispatching and executing Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 1388ee9bd5a1b..b3b12ddd03036 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -370,6 +370,7 @@ int EventCenter::process_events(int timeout_microseconds) external_lock.unlock(); while (!cur_process.empty()) { EventCallbackRef e = cur_process.front(); + ldout(cct, 20) << __func__ << " do " << e << dendl; if (e) e->do_request(0); cur_process.pop_front(); @@ -389,5 +390,5 @@ void EventCenter::dispatch_event_external(EventCallbackRef e) if (!in_thread()) wakeup(); - ldout(cct, 10) << __func__ << " " << e << " pending " << num << dendl; + ldout(cct, 20) << __func__ << " " << e << " pending " << num << dendl; } From f62f28ceea5649dc821365348093f8541db66e88 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Sun, 10 Jul 2016 16:19:29 +0800 Subject: [PATCH 06/16] msg/async/Event: remove event wakeup flag Now only dispatch external event will wakeup event thread(previously delete_time_event will call wakeup), we only need to use "external_num_events" to indicate whether we have extra events. Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 56 ++++++++++++++++++++---------------------- src/msg/async/Event.h | 6 +---- 2 files changed, 27 insertions(+), 35 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index b3b12ddd03036..5162059552b6d 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -39,14 +39,14 @@ class C_handle_notify : public EventCallback { C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {} void do_request(int fd_or_id) { char c[256]; + int r = 0; do { - center->already_wakeup.set(0); - int r = read(fd_or_id, c, sizeof(c)); + r = read(fd_or_id, c, sizeof(c)); if (r < 0) { - ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl; - break; + if (errno != EAGAIN) + ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl; } - } while (center->already_wakeup.read()); + } while (r > 0); } }; @@ -260,15 +260,15 @@ void EventCenter::delete_time_event(uint64_t id) void EventCenter::wakeup() { - ldout(cct, 1) << __func__ << dendl; - already_wakeup.compare_and_swap(0, 1); - - char buf[1]; - buf[0] = 'c'; - // wake up "event_wait" - int n = write(notify_send_fd, buf, 1); - // FIXME ? - assert(n == 1); + ldout(cct, 1) << __func__ << dendl; + + char buf = 'c'; + // wake up "event_wait" + int n = write(notify_send_fd, &buf, sizeof(buf)); + if (n < 0) { + ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl; + assert(0); + } } int EventCenter::process_time_events() @@ -361,21 +361,16 @@ int EventCenter::process_events(int timeout_microseconds) if (external_num_events.load()) { external_lock.lock(); - if (external_events.empty()) { - external_lock.unlock(); - } else { - deque cur_process; - cur_process.swap(external_events); - external_num_events.store(0); - external_lock.unlock(); - while (!cur_process.empty()) { - EventCallbackRef e = cur_process.front(); - ldout(cct, 20) << __func__ << " do " << e << dendl; - if (e) - e->do_request(0); - cur_process.pop_front(); - numevents++; - } + deque cur_process; + cur_process.swap(external_events); + external_num_events.store(0); + external_lock.unlock(); + while (!cur_process.empty()) { + EventCallbackRef e = cur_process.front(); + ldout(cct, 20) << __func__ << " do " << e << dendl; + e->do_request(0); + cur_process.pop_front(); + numevents++; } } return numevents; @@ -385,9 +380,10 @@ void EventCenter::dispatch_event_external(EventCallbackRef e) { external_lock.lock(); external_events.push_back(e); + bool wake = !external_num_events.load(); uint64_t num = ++external_num_events; external_lock.unlock(); - if (!in_thread()) + if (!in_thread() && wake) wakeup(); ldout(cct, 20) << __func__ << " " << e << " pending " << num << dendl; diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index a320489ceda2c..daa4537eef6e8 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -136,16 +136,12 @@ class EventCenter { } public: - atomic_t already_wakeup; - explicit EventCenter(CephContext *c): cct(c), nevent(0), external_num_events(0), driver(NULL), time_event_next_id(1), notify_receive_fd(-1), notify_send_fd(-1), net(c), - notify_handler(NULL), - already_wakeup(0) { - } + notify_handler(NULL) { } ~EventCenter(); ostream& _event_prefix(std::ostream *_dout); From 4e57181b54649f1483a223a8d8f87260afff2f72 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Mon, 11 Jul 2016 19:59:23 +0800 Subject: [PATCH 07/16] msg/async/PosixStack: increase log level AsyncMessenger will try to loop the bind port range, so it will produce some addr inuse errors which is not abnormal. Signed-off-by: Haomai Wang --- src/msg/async/PosixStack.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/msg/async/PosixStack.cc b/src/msg/async/PosixStack.cc index 909cec3614f83..54ddf4217dade 100644 --- a/src/msg/async/PosixStack.cc +++ b/src/msg/async/PosixStack.cc @@ -315,8 +315,8 @@ int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt, r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len()); if (r < 0) { r = -errno; - lderr(cct) << __func__ << " unable to bind to " << sa.get_sockaddr() - << ": " << cpp_strerror(r) << dendl; + ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr() + << ": " << cpp_strerror(r) << dendl; ::close(listen_sd); return r; } From 31833e8b488797ea956a7c877a22d74abb4df510 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 12 Jul 2016 10:16:33 +0800 Subject: [PATCH 08/16] msg/async/Stack: disable smart thread spawn now New async msgr runtime need to spawn threads when binding, but ceph-osd will call daemon() after binding port. So we need to respawn threads if forked. Then thread spawn delay will increase complexity for this change and it's really a simple strategy which help less, we disable auto spawn now. Signed-off-by: Haomai Wang --- src/msg/async/AsyncMessenger.cc | 1 + src/msg/async/PosixStack.h | 13 ++++++------- src/msg/async/Stack.cc | 26 ++++++++++++++++---------- src/msg/async/Stack.h | 13 ++++++++----- 4 files changed, 31 insertions(+), 22 deletions(-) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 95ec0ebd1409f..0964dad30334b 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -300,6 +300,7 @@ void AsyncMessenger::ready() { ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; + stack->start(); Mutex::Locker l(lock); for (auto &&p : processors) p->start(); diff --git a/src/msg/async/PosixStack.h b/src/msg/async/PosixStack.h index eeb7f318c8dd0..149db320d8df2 100644 --- a/src/msg/async/PosixStack.h +++ b/src/msg/async/PosixStack.h @@ -48,14 +48,13 @@ class PosixNetworkStack : public NetworkStack { return -1; return coreids[id % coreids.size()]; } - virtual void spawn_workers(std::vector> &funcs) override { - for (unsigned i = threads.size(); i < funcs.size(); ++i) - threads.emplace_back(std::thread(std::move(funcs[i]))); + virtual void spawn_worker(unsigned i, std::function &&func) override { + threads.resize(i+1); + threads[i] = std::move(std::thread(func)); } - virtual void join_workers() override { - for (auto &&t : threads) - t.join(); - threads.clear(); + virtual void join_worker(unsigned i) override { + assert(threads.size() > i && threads[i].joinable()); + threads[i].join(); } }; diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc index 6755d1e8af5b5..52f17df0de603 100644 --- a/src/msg/async/Stack.cc +++ b/src/msg/async/Stack.cc @@ -25,11 +25,10 @@ #undef dout_prefix #define dout_prefix *_dout << "stack " -void NetworkStack::add_thread(unsigned i) +void NetworkStack::add_thread(unsigned i, std::function &thread) { - assert(threads.size() <= i); Worker *w = workers[i]; - threads.emplace_back( + thread = std::move( [this, w]() { const uint64_t InitEventNumber = 5000; const uint64_t EventMaxWaitUs = 30000000; @@ -69,11 +68,11 @@ Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c) { - for (unsigned i = 0; i < cct->_conf->ms_async_max_op_threads; ++i) { + num_workers = cct->_conf->ms_async_op_threads; + for (unsigned i = 0; i < num_workers; ++i) { Worker *w = create_worker(cct, type, i); workers.push_back(w); } - num_workers = cct->_conf->ms_async_op_threads; } void NetworkStack::start() @@ -83,9 +82,17 @@ void NetworkStack::start() pool_spin.unlock(); return ; } - for (unsigned i = 0; i < num_workers; ++i) - add_thread(i); - spawn_workers(threads); + + if (started) { + return ; + } + for (unsigned i = 0; i < num_workers; ++i) { + if (workers[i]->is_init()) + continue; + std::function thread; + add_thread(i, thread); + spawn_worker(i, std::move(thread)); + } started = true; pool_spin.unlock(); @@ -125,9 +132,8 @@ void NetworkStack::stop() for (unsigned i = 0; i < num_workers; ++i) { workers[i]->done = true; workers[i]->center.wakeup(); + join_worker(i); } - join_workers(); - threads.clear(); started = false; } diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h index 9996ed2841085..5b75aa699dc28 100644 --- a/src/msg/async/Stack.h +++ b/src/msg/async/Stack.h @@ -254,6 +254,10 @@ class Worker { init_cond.notify_all(); init_lock.unlock(); } + bool is_init() { + std::lock_guard l(init_lock); + return init; + } void wait_for_init() { std::unique_lock l(init_lock); while (!init) @@ -270,16 +274,15 @@ class Worker { class NetworkStack { std::string type; - std::atomic_bool started; unsigned num_workers = 0; Spinlock pool_spin; + bool started = false; - void add_thread(unsigned i); + void add_thread(unsigned i, std::function &ts); protected: CephContext *cct; vector workers; - std::vector> threads; // Used to indicate whether thread started explicit NetworkStack(CephContext *c, const string &t); @@ -316,8 +319,8 @@ class NetworkStack { } // direct is used in tests only - virtual void spawn_workers(std::vector> &) = 0; - virtual void join_workers() = 0; + virtual void spawn_worker(unsigned i, std::function &&) = 0; + virtual void join_worker(unsigned i) = 0; private: NetworkStack(const NetworkStack &); From 524553ff31d29d029f08011053104fd004f7eeb6 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 12 Jul 2016 12:26:30 +0800 Subject: [PATCH 09/16] common/ceph_context: add prefork/postfork wathcher support Because daemon() will termniate all existing threads, it will make something go wrong. So we want to add hook at CephContext, do something before/after fork. Signed-off-by: Haomai Wang --- src/common/ceph_context.cc | 2 ++ src/common/ceph_context.h | 30 ++++++++++++++++++++++++++++++ src/global/global_init.cc | 2 ++ 3 files changed, 34 insertions(+) diff --git a/src/common/ceph_context.cc b/src/common/ceph_context.cc index abf9f07e05bca..b4cb9cabe2060 100644 --- a/src/common/ceph_context.cc +++ b/src/common/ceph_context.cc @@ -472,6 +472,7 @@ CephContext::CephContext(uint32_t module_type_, int init_flags_) { ceph_spin_init(&_service_thread_lock); ceph_spin_init(&_associated_objs_lock); + ceph_spin_init(&_fork_watchers_lock); ceph_spin_init(&_feature_lock); ceph_spin_init(&_cct_perf_lock); @@ -575,6 +576,7 @@ CephContext::~CephContext() delete _conf; ceph_spin_destroy(&_service_thread_lock); + ceph_spin_destroy(&_fork_watchers_lock); ceph_spin_destroy(&_associated_objs_lock); ceph_spin_destroy(&_feature_lock); ceph_spin_destroy(&_cct_perf_lock); diff --git a/src/common/ceph_context.h b/src/common/ceph_context.h index 86c1a4a736de8..6bf03b3214b81 100644 --- a/src/common/ceph_context.h +++ b/src/common/ceph_context.h @@ -177,6 +177,33 @@ class CephContext { return _set_gid_string; } + class ForkWatcher { + public: + virtual ~ForkWatcher() {} + virtual void handle_pre_fork() = 0; + virtual void handle_post_fork() = 0; + }; + + void register_fork_watcher(ForkWatcher *w) { + ceph_spin_lock(&_fork_watchers_lock); + _fork_watchers.push_back(w); + ceph_spin_unlock(&_fork_watchers_lock); + } + + void notify_pre_fork() { + ceph_spin_lock(&_fork_watchers_lock); + for (auto &&t : _fork_watchers) + t->handle_pre_fork(); + ceph_spin_unlock(&_fork_watchers_lock); + } + + void notify_post_fork() { + ceph_spin_lock(&_fork_watchers_lock); + for (auto &&t : _fork_watchers) + t->handle_post_fork(); + ceph_spin_unlock(&_fork_watchers_lock); + } + private: struct SingletonWrapper : boost::noncopyable { virtual ~SingletonWrapper() {} @@ -235,6 +262,9 @@ class CephContext { ceph_spinlock_t _associated_objs_lock; std::map _associated_objs; + ceph_spinlock_t _fork_watchers_lock; + std::vector _fork_watchers; + // crypto CryptoHandler *_crypto_none; CryptoHandler *_crypto_aes; diff --git a/src/global/global_init.cc b/src/global/global_init.cc index cda4438da9039..033516240b6c5 100644 --- a/src/global/global_init.cc +++ b/src/global/global_init.cc @@ -339,6 +339,7 @@ int global_init_prefork(CephContext *cct) return -1; } + cct->notify_pre_fork(); // stop log thread cct->_log->flush(); cct->_log->stop(); @@ -370,6 +371,7 @@ void global_init_postfork_start(CephContext *cct) { // restart log thread cct->_log->start(); + cct->notify_post_fork(); /* This is the old trick where we make file descriptors 0, 1, and possibly 2 * point to /dev/null. From 8d63e2c709d736bf9d5fda8d14109cf93d823f26 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 12 Jul 2016 12:30:50 +0800 Subject: [PATCH 10/16] msg/async/Stack: register fork watcher Signed-off-by: Haomai Wang --- src/msg/async/Stack.cc | 1 + src/msg/async/Stack.h | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc index 52f17df0de603..8ca2270c982a9 100644 --- a/src/msg/async/Stack.cc +++ b/src/msg/async/Stack.cc @@ -73,6 +73,7 @@ NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(fa Worker *w = create_worker(cct, type, i); workers.push_back(w); } + cct->register_fork_watcher(this); } void NetworkStack::start() diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h index 5b75aa699dc28..d6d7a33b0f0c5 100644 --- a/src/msg/async/Stack.h +++ b/src/msg/async/Stack.h @@ -272,7 +272,7 @@ class Worker { } }; -class NetworkStack { +class NetworkStack : public CephContext::ForkWatcher { std::string type; unsigned num_workers = 0; Spinlock pool_spin; @@ -322,6 +322,14 @@ class NetworkStack { virtual void spawn_worker(unsigned i, std::function &&) = 0; virtual void join_worker(unsigned i) = 0; + virtual void handle_pre_fork() override { + stop(); + } + + virtual void handle_post_fork() override { + start(); + } + private: NetworkStack(const NetworkStack &); NetworkStack& operator=(const NetworkStack &); From 154376b211a5463a0aa770236df083397345b489 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 12 Jul 2016 14:15:23 +0800 Subject: [PATCH 11/16] msg/async: allow EventCenter::set_owner reentry If daemonlize we need to respawn event threads, it need to allow set_owner again Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 17 +++++++++-------- src/msg/async/Event.h | 3 ++- src/msg/async/Stack.cc | 5 +++-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 5162059552b6d..0b31b1eb3caf9 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -135,16 +135,17 @@ EventCenter::~EventCenter() void EventCenter::set_owner() { - cct->lookup_or_create_singleton_object( - global_centers, "AsyncMessenger::EventCenter::global_center"); - assert(global_centers && !global_centers->centers[idx]); - global_centers->centers[idx] = this; owner = pthread_self(); ldout(cct, 1) << __func__ << " idx=" << idx << " owner=" << owner << dendl; - - notify_handler = new C_handle_notify(this, cct); - int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler); - assert(r == 0); + if (!global_centers) { + cct->lookup_or_create_singleton_object( + global_centers, "AsyncMessenger::EventCenter::global_center"); + assert(global_centers && !global_centers->centers[idx]); + global_centers->centers[idx] = this; + notify_handler = new C_handle_notify(this, cct); + int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler); + assert(r == 0); + } } int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index daa4537eef6e8..eb3397623c255 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -127,7 +127,7 @@ class EventCenter { NetHandler net; EventCallbackRef notify_handler; unsigned idx = 10000; - AssociatedCenters *global_centers; + AssociatedCenters *global_centers = nullptr; int process_time_events(); FileEvent *_get_file_event(int fd) { @@ -147,6 +147,7 @@ class EventCenter { int init(int nevent, unsigned idx); void set_owner(); + pthread_t get_owner() const { return owner; } unsigned get_id() const { return idx; } // Used by internal thread diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc index 8ca2270c982a9..25e1d5862f1a9 100644 --- a/src/msg/async/Stack.cc +++ b/src/msg/async/Stack.cc @@ -30,9 +30,8 @@ void NetworkStack::add_thread(unsigned i, std::function &thread) Worker *w = workers[i]; thread = std::move( [this, w]() { - const uint64_t InitEventNumber = 5000; const uint64_t EventMaxWaitUs = 30000000; - w->center.init(InitEventNumber, w->id); + w->center.set_owner(); ldout(cct, 10) << __func__ << " starting" << dendl; w->initialize(); w->init_done(); @@ -68,9 +67,11 @@ Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c) { + const uint64_t InitEventNumber = 5000; num_workers = cct->_conf->ms_async_op_threads; for (unsigned i = 0; i < num_workers; ++i) { Worker *w = create_worker(cct, type, i); + w->center.init(InitEventNumber, i); workers.push_back(w); } cct->register_fork_watcher(this); From 1d4ff2a30a9c55736ccfeb79787e901bac2fb471 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Tue, 12 Jul 2016 15:51:23 +0800 Subject: [PATCH 12/16] msg/async: readd set_priority call Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 1 + src/msg/async/AsyncMessenger.cc | 1 + src/msg/async/PosixStack.cc | 2 ++ src/msg/async/Stack.h | 1 + 4 files changed, 5 insertions(+) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 5012e92df0328..c682dd1158584 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -863,6 +863,7 @@ ssize_t AsyncConnection::_process_connection() } SocketOptions opts; + opts.priority = async_msgr->get_socket_priority(); r = worker->connect(get_peer_addr(), opts, &cs); if (r < 0) goto fail; diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 0964dad30334b..a7270cd52c712 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -187,6 +187,7 @@ void Processor::accept() SocketOptions opts; opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; + opts.priority = msgr->get_socket_priority(); while (true) { entity_addr_t addr; ConnectedSocket cli_socket; diff --git a/src/msg/async/PosixStack.cc b/src/msg/async/PosixStack.cc index 54ddf4217dade..65d0bf64b77f0 100644 --- a/src/msg/async/PosixStack.cc +++ b/src/msg/async/PosixStack.cc @@ -279,6 +279,7 @@ int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &op ::close(sd); return -errno; } + handler.set_priority(sd, opt.priority); std::unique_ptr csi(new PosixConnectedSocketImpl(handler, *out, sd, true)); *sock = ConnectedSocket(std::move(csi)); @@ -349,6 +350,7 @@ int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, C return -errno; } + net.set_priority(sd, opts.priority); *socket = ConnectedSocket( std::unique_ptr(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock))); return 0; diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h index d6d7a33b0f0c5..c397f942612be 100644 --- a/src/msg/async/Stack.h +++ b/src/msg/async/Stack.h @@ -40,6 +40,7 @@ struct SocketOptions { bool nonblock = true; bool nodelay = true; int rcbuf_size = 0; + int priority = -1; }; /// \cond internal From 3a43a0bb34b1c74b19dcdc0401fd6adbc6b94b34 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Wed, 13 Jul 2016 01:26:04 +0800 Subject: [PATCH 13/16] msg/async/AsyncConnection: fix _conn_prefix racing when stopped When the connection is lossy and enter fault, it will dispatch reset event. If cleanup handler is executed as well as ms_handle_reset call mark_down, it may exists racing for "cs". cleanup handler will reset "cs" but _conn_prefix in mark_down will access "cs". Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index c682dd1158584..498cb03ff665e 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -28,7 +28,7 @@ #undef dout_prefix #define dout_prefix _conn_prefix(_dout) ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { - int fd = cs ? cs.fd() : -1; + int fd = (cs && state != STATE_CLOSED) ? cs.fd() : -1; return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this << " sd=" << fd << " :" << port << " s=" << get_state_name(state) From 6d3e495de2cfc6e5e0068e1b32cca0f5b16cef79 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Fri, 15 Jul 2016 12:14:17 +0800 Subject: [PATCH 14/16] test_async_networkstack: add networkstack tests Signed-off-by: Haomai Wang --- src/test/Makefile-server.am | 5 + src/test/msgr/CMakeLists.txt | 9 + src/test/msgr/test_async_networkstack.cc | 1045 ++++++++++++++++++++++ 3 files changed, 1059 insertions(+) create mode 100644 src/test/msgr/test_async_networkstack.cc diff --git a/src/test/Makefile-server.am b/src/test/Makefile-server.am index 29c09da349f94..85117028d7c40 100644 --- a/src/test/Makefile-server.am +++ b/src/test/Makefile-server.am @@ -8,6 +8,11 @@ ceph_test_msgr_LDADD = $(LIBOS) $(UNITTEST_LDADD) $(CEPH_GLOBAL) ceph_test_msgr_CXXFLAGS = $(UNITTEST_CXXFLAGS) bin_DEBUGPROGRAMS += ceph_test_msgr +ceph_test_async_networkstack_SOURCES = test/msgr/test_async_networkstack.cc +ceph_test_async_networkstack_LDADD = $(LIBOS) $(UNITTEST_LDADD) $(CEPH_GLOBAL) +ceph_test_async_networkstack_CXXFLAGS = $(UNITTEST_CXXFLAGS) +bin_DEBUGPROGRAMS += ceph_test_async_networkstack + ceph_test_trans_SOURCES = test/test_trans.cc ceph_test_trans_LDADD = $(LIBOS) $(CEPH_GLOBAL) bin_DEBUGPROGRAMS += ceph_test_trans diff --git a/src/test/msgr/CMakeLists.txt b/src/test/msgr/CMakeLists.txt index 4eb10acfa17a6..a792233ffc749 100644 --- a/src/test/msgr/CMakeLists.txt +++ b/src/test/msgr/CMakeLists.txt @@ -14,6 +14,14 @@ set_target_properties(ceph_test_msgr PROPERTIES COMPILE_FLAGS ${UNITTEST_CXX_FLAGS}) target_link_libraries(ceph_test_msgr os global ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} ${UNITTEST_LIBS}) +# ceph_test_async_networkstack +add_executable(ceph_test_async_networkstack + test_async_networkstack.cc + ) +set_target_properties(ceph_test_async_networkstack PROPERTIES COMPILE_FLAGS + ${UNITTEST_CXX_FLAGS}) +target_link_libraries(ceph_test_async_networkstack global ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} ${UNITTEST_LIBS}) + #ceph_perf_msgr_server add_executable(ceph_perf_msgr_server perf_msgr_server.cc) set_target_properties(ceph_perf_msgr_server PROPERTIES COMPILE_FLAGS @@ -29,6 +37,7 @@ target_link_libraries(ceph_perf_msgr_client os global ${UNITTEST_LIBS}) install(TARGETS ceph_test_async_driver ceph_test_msgr + ceph_test_async_networkstack ceph_perf_msgr_server ceph_perf_msgr_client DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/test/msgr/test_async_networkstack.cc b/src/test/msgr/test_async_networkstack.cc new file mode 100644 index 0000000000000..fa14e81c6d6dc --- /dev/null +++ b/src/test/msgr/test_async_networkstack.cc @@ -0,0 +1,1045 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSky + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "acconfig.h" +#include "include/Context.h" +#include "common/ceph_argparse.h" +#include "global/global_init.h" + +#include "msg/async/Event.h" +#include "msg/async/Stack.h" + +#if GTEST_HAS_PARAM_TEST + +class NetworkWorkerTest : public ::testing::TestWithParam { + public: + std::shared_ptr stack; + string addr, port_addr; + + NetworkWorkerTest() {} + virtual void SetUp() { + cerr << __func__ << " start set up " << GetParam() << std::endl; + addr = "127.0.0.1:15000"; + port_addr = "127.0.0.1:15001"; + stack = NetworkStack::create(g_ceph_context, GetParam()); + stack->start(); + } + virtual void TearDown() { + stack->stop(); + } + string get_addr() const { + return addr; + } + string get_ip_different_port() const { + return port_addr; + } + string get_different_ip() const { + return "10.0.123.100:4323"; + } + EventCenter *get_center(unsigned i) { + return &stack->get_worker(i)->center; + } + Worker *get_worker(unsigned i) { + return stack->get_worker(i); + } + template + class C_dispatch : public EventCallback { + Worker *worker; + func f; + std::atomic_bool done; + public: + C_dispatch(Worker *w, func &&_f): worker(w), f(std::move(_f)), done(false) {} + void do_request(int id) { + f(worker); + done = true; + } + void wait() { + int us = 1000 * 1000 * 1000; + while (!done) { + ASSERT_TRUE(us > 0); + usleep(100); + us -= 100; + } + } + }; + template + void exec_events(func &&f) { + std::vector*> dis; + for (unsigned i = 0; i < stack->get_num_worker(); ++i) { + Worker *w = stack->get_worker(i); + C_dispatch *e = new C_dispatch(w, std::move(f)); + stack->get_worker(i)->center.dispatch_event_external(e); + dis.push_back(e); + } + + for (auto &&e : dis) { + e->wait(); + delete e; + } + } +}; + +class C_poll : public EventCallback { + EventCenter *center; + std::atomic woken; + static const int sleepus = 500; + + public: + C_poll(EventCenter *c): center(c), woken(false) {} + void do_request(int r) { + woken = true; + } + bool poll(int milliseconds) { + auto start = ceph::coarse_real_clock::now(g_ceph_context); + while (!woken) { + center->process_events(sleepus); + usleep(sleepus); + auto r = std::chrono::duration_cast( + ceph::coarse_real_clock::now(g_ceph_context) - start); + if (r >= std::chrono::milliseconds(milliseconds)) + break; + } + return woken; + } + void reset() { + woken = false; + } +}; + +TEST_P(NetworkWorkerTest, SimpleTest) { + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + std::atomic_bool accepted(false); + std::atomic_bool *accepted_p = &accepted; + + exec_events([this, accepted_p, bind_addr](Worker *worker) mutable { + entity_addr_t cli_addr; + SocketOptions options; + ServerSocket bind_socket; + EventCenter *center = &worker->center; + ssize_t r = 0; + if (stack->support_local_listen_table() || worker->id == 0) + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + + ConnectedSocket cli_socket, srv_socket; + if (worker->id == 0) { + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + } + + bool is_my_accept = false; + if (bind_socket) { + C_poll cb(center); + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + if (cb.poll(500)) { + *accepted_p = true; + is_my_accept = true; + } + ASSERT_TRUE(*accepted_p); + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + + if (is_my_accept) { + r = bind_socket.accept(&srv_socket, options, &cli_addr); + ASSERT_EQ(0, r); + ASSERT_TRUE(srv_socket.fd() > 0); + } + + if (worker->id == 0) { + C_poll cb(center); + center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb); + r = cli_socket.is_connected(); + if (r == 0) { + ASSERT_EQ(true, cb.poll(500)); + r = cli_socket.is_connected(); + } + ASSERT_EQ(1, r); + center->delete_file_event(cli_socket.fd(), EVENT_READABLE); + } + + const char *message = "this is a new message"; + int len = strlen(message); + bufferlist bl; + bl.append(message, len); + if (worker->id == 0) { + r = cli_socket.send(bl, false); + ASSERT_EQ(len, r); + } + + char buf[1024]; + C_poll cb(center); + if (is_my_accept) { + center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb); + { + r = srv_socket.read(buf, sizeof(buf)); + while (r == -EAGAIN) { + ASSERT_TRUE(cb.poll(500)); + r = srv_socket.read(buf, sizeof(buf)); + } + ASSERT_EQ(len, r); + ASSERT_EQ(0, memcmp(buf, message, len)); + } + bind_socket.abort_accept(); + } + if (worker->id == 0) { + cli_socket.shutdown(); + // ack delay is 200 ms + } + + bl.clear(); + bl.append(message, len); + if (worker->id == 0) { + r = cli_socket.send(bl, false); + ASSERT_EQ(-EPIPE, r); + } + if (is_my_accept) { + cb.reset(); + ASSERT_TRUE(cb.poll(500)); + r = srv_socket.read(buf, sizeof(buf)); + if (r == -EAGAIN) { + cb.reset(); + ASSERT_TRUE(cb.poll(1000*500)); + r = srv_socket.read(buf, sizeof(buf)); + } + ASSERT_EQ(0, r); + center->delete_file_event(srv_socket.fd(), EVENT_READABLE); + srv_socket.close(); + } + }); +} + +TEST_P(NetworkWorkerTest, ConnectFailedTest) { + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + + exec_events([this, bind_addr](Worker *worker) mutable { + EventCenter *center = &worker->center; + entity_addr_t cli_addr; + SocketOptions options; + ServerSocket bind_socket; + int r = 0; + if (stack->support_local_listen_table() || worker->id == 0) + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + + ConnectedSocket cli_socket1, cli_socket2; + if (worker->id == 0) { + ASSERT_TRUE(cli_addr.parse(get_ip_different_port().c_str())); + r = worker->connect(cli_addr, options, &cli_socket1); + ASSERT_EQ(0, r); + C_poll cb(center); + center->create_file_event(cli_socket1.fd(), EVENT_READABLE, &cb); + r = cli_socket1.is_connected(); + if (r == 0) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket1.is_connected(); + } + ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET); + } + + if (worker->id == 1) { + ASSERT_TRUE(cli_addr.parse(get_different_ip().c_str())); + r = worker->connect(cli_addr, options, &cli_socket2); + ASSERT_EQ(0, r); + C_poll cb(center); + center->create_file_event(cli_socket2.fd(), EVENT_READABLE, &cb); + r = cli_socket2.is_connected(); + if (r == 0) { + ASSERT_FALSE(cb.poll(500)); + r = cli_socket2.is_connected(); + } + ASSERT_TRUE(r != 1); + center->delete_file_event(cli_socket2.fd(), EVENT_READABLE); + } + }); +} + +TEST_P(NetworkWorkerTest, ListenTest) { + Worker *worker = get_worker(0); + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + SocketOptions options; + ServerSocket bind_socket1, bind_socket2; + int r = worker->listen(bind_addr, options, &bind_socket1); + ASSERT_EQ(0, r); + + r = worker->listen(bind_addr, options, &bind_socket2); + ASSERT_EQ(-EADDRINUSE, r); +} + +TEST_P(NetworkWorkerTest, AcceptAndCloseTest) { + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + std::atomic_bool accepted(false); + std::atomic_bool *accepted_p = &accepted; + std::atomic_int unbind_count(stack->get_num_worker()); + std::atomic_int *count_p = &unbind_count; + exec_events([this, bind_addr, accepted_p, count_p](Worker *worker) mutable { + SocketOptions options; + EventCenter *center = &worker->center; + entity_addr_t cli_addr; + int r = 0; + { + ServerSocket bind_socket; + if (stack->support_local_listen_table() || worker->id == 0) + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + + ConnectedSocket srv_socket, cli_socket; + if (bind_socket) { + r = bind_socket.accept(&srv_socket, options, &cli_addr); + ASSERT_EQ(-EAGAIN, r); + } + + C_poll cb(center); + if (worker->id == 0) { + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + ASSERT_TRUE(cb.poll(500)); + } + + if (bind_socket) { + cb.reset(); + cb.poll(500); + ConnectedSocket srv_socket2; + do { + r = bind_socket.accept(&srv_socket2, options, &cli_addr); + usleep(100); + } while (r == -EAGAIN && !*accepted_p); + if (r == 0) + *accepted_p = true; + ASSERT_TRUE(*accepted_p); + // srv_socket2 closed + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + + if (worker->id == 0) { + char buf[100]; + cb.reset(); + center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb); + int i = 3; + while (!i--) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket.read(buf, sizeof(buf)); + if (r == 0) + break; + } + ASSERT_EQ(0, r); + center->delete_file_event(cli_socket.fd(), EVENT_READABLE); + } + + if (bind_socket) + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + if (worker->id == 0) { + *accepted_p = false; + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + cb.reset(); + ASSERT_TRUE(cb.poll(500)); + cli_socket.close(); + } + + if (bind_socket) { + do { + r = bind_socket.accept(&srv_socket, options, &cli_addr); + usleep(100); + } while (r == -EAGAIN && !*accepted_p); + if (r == 0) + *accepted_p = true; + ASSERT_TRUE(*accepted_p); + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + // unbind + } + + --*count_p; + while (*count_p > 0) + usleep(100); + + ConnectedSocket cli_socket; + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + { + C_poll cb(center); + center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb); + r = cli_socket.is_connected(); + if (r == 0) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket.is_connected(); + } + ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET); + } + }); +} + +TEST_P(NetworkWorkerTest, ComplexTest) { + entity_addr_t bind_addr; + std::atomic_bool accepted(false); + std::atomic_bool *accepted_p = &accepted; + std::atomic_bool done(false); + std::atomic_bool *done_p = &done; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + exec_events([this, bind_addr, accepted_p, done_p](Worker *worker) mutable { + entity_addr_t cli_addr; + EventCenter *center = &worker->center; + SocketOptions options; + ServerSocket bind_socket; + int r = 0; + if (stack->support_local_listen_table() || worker->id == 0) { + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + } + ConnectedSocket cli_socket, srv_socket; + if (worker->id == 1) { + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + } + + if (bind_socket) { + C_poll cb(center); + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + if (cb.poll(500)) { + r = bind_socket.accept(&srv_socket, options, &cli_addr); + ASSERT_EQ(0, r); + *accepted_p = true; + } + ASSERT_TRUE(*accepted_p); + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + + if (worker->id == 1) { + C_poll cb(center); + center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb); + r = cli_socket.is_connected(); + if (r == 0) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket.is_connected(); + } + ASSERT_EQ(1, r); + center->delete_file_event(cli_socket.fd(), EVENT_READABLE); + } + + const size_t message_size = 10240; + size_t count = 100; + string message(message_size, '!'); + for (size_t i = 0; i < message_size; i += 100) + message[i] = ','; + size_t len = message_size * count; + C_poll cb(center); + if (worker->id == 1) + center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb); + if (srv_socket) + center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb); + size_t left = len; + len *= 2; + string read_string; + int again_count = 0; + int c = 2; + bufferlist bl; + for (size_t i = 0; i < count; ++i) + bl.push_back(bufferptr((char*)message.data(), message_size)); + while (!*done_p) { + again_count = 0; + if (worker->id == 1) { + if (c > 0) { + ssize_t r = 0; + usleep(100); + if (left > 0) { + r = cli_socket.send(bl, false); + ASSERT_TRUE(r >= 0 || r == -EAGAIN); + if (r > 0) + left -= r; + if (r == -EAGAIN) + ++again_count; + } + if (left == 0) { + --c; + left = message_size * count; + ASSERT_EQ(0U, bl.length()); + for (size_t i = 0; i < count; ++i) + bl.push_back(bufferptr((char*)message.data(), message_size)); + } + } + } + + if (srv_socket) { + char buf[1000]; + if (len > 0) { + r = srv_socket.read(buf, sizeof(buf)); + ASSERT_TRUE(r > 0 || r == -EAGAIN); + if (r > 0) { + read_string.append(buf, r); + len -= r; + } else if (r == -EAGAIN) { + ++again_count; + } + } + if (len == 0) { + for (size_t i = 0; i < read_string.size(); i += message_size) + ASSERT_EQ(0, memcmp(read_string.c_str()+i, message.c_str(), message_size)); + *done_p = true; + } + } + if (again_count) { + cb.reset(); + cb.poll(500); + } + } + if (worker->id == 1) + center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE); + if (srv_socket) + center->delete_file_event(srv_socket.fd(), EVENT_READABLE); + + if (bind_socket) + bind_socket.abort_accept(); + if (srv_socket) + srv_socket.close(); + if (worker->id == 1) + cli_socket.close(); + }); +} + +class StressFactory { + struct Client; + struct Server; + struct ThreadData { + Worker *worker; + std::set clients; + std::set servers; + ~ThreadData() { + for (auto && i : clients) + delete i; + for (auto && i : servers) + delete i; + } + }; + + struct RandomString { + size_t slen; + vector strs; + std::random_device rd; + std::default_random_engine rng; + + RandomString(size_t s): slen(s), rng(rd()) {} + void prepare(size_t n) { + static const char alphabet[] = + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "0123456789"; + + std::uniform_int_distribution<> dist( + 0, sizeof(alphabet) / sizeof(*alphabet) - 2); + + strs.reserve(n); + std::generate_n( + std::back_inserter(strs), strs.capacity(), [&] { + std::string str; + str.reserve(slen); + std::generate_n(std::back_inserter(str), slen, [&]() { + return alphabet[dist(rng)]; + }); + return str; + } + ); + } + std::string &get_random_string() { + std::uniform_int_distribution<> dist( + 0, strs.size() - 1); + return strs[dist(rng)]; + } + }; + struct Message { + size_t idx; + size_t len; + std::string content; + + explicit Message(RandomString &rs, size_t i, size_t l): idx(i) { + size_t slen = rs.slen; + len = std::max(slen, l); + + std::vector strs; + strs.reserve(len / slen); + std::generate_n( + std::back_inserter(strs), strs.capacity(), [&] { + return rs.get_random_string(); + } + ); + len = slen * strs.size(); + content.reserve(len); + for (auto &&s : strs) + content.append(s); + } + bool verify(const char *b, size_t len = 0) const { + return content.compare(0, len, b, 0, len) == 0; + } + }; + + template + class C_delete : public EventCallback { + T *ctxt; + public: + C_delete(T *c): ctxt(c) {} + void do_request(int id) { + delete ctxt; + delete this; + } + }; + + class Client { + StressFactory *factory; + EventCenter *center; + ConnectedSocket socket; + std::deque acking; + std::deque writings; + std::string buffer; + size_t index = 0; + size_t left; + bool write_enabled = false; + size_t read_offset = 0, write_offset = 0; + bool first = true; + bool dead = false; + StressFactory::Message homeless_message; + + class Client_read_handle : public EventCallback { + Client *c; + public: + Client_read_handle(Client *_c): c(_c) {} + void do_request(int id) { + c->do_read_request(); + } + } read_ctxt; + + class Client_write_handle : public EventCallback { + Client *c; + public: + Client_write_handle(Client *_c): c(_c) {} + void do_request(int id) { + c->do_write_request(); + } + } write_ctxt; + + public: + Client(StressFactory *f, EventCenter *cen, ConnectedSocket s, size_t c) + : factory(f), center(cen), socket(std::move(s)), left(c), homeless_message(factory->rs, -1, 1024), + read_ctxt(this), write_ctxt(this) { + center->create_file_event( + socket.fd(), EVENT_READABLE, &read_ctxt); + center->dispatch_event_external(&read_ctxt); + } + void close() { + ASSERT_FALSE(write_enabled); + dead = true; + socket.shutdown(); + center->delete_file_event(socket.fd(), EVENT_READABLE); + center->dispatch_event_external(new C_delete(this)); + } + + void do_read_request() { + if (dead) + return ; + ASSERT_TRUE(socket.is_connected() >= 0); + if (!socket.is_connected()) + return ; + ASSERT_TRUE(!acking.empty() || first); + if (first) { + first = false; + center->dispatch_event_external(&write_ctxt); + if (acking.empty()) + return ; + } + StressFactory::Message *m = acking.front(); + int r = 0; + if (buffer.empty()) + buffer.resize(m->len); + bool must_no = false; + while (true) { + r = socket.read((char*)buffer.data() + read_offset, + m->len - read_offset); + ASSERT_TRUE(r == -EAGAIN || r > 0); + if (r == -EAGAIN) + break; + read_offset += r; + + std::cerr << " client " << this << " receive " << m->idx << " len " << r << " content: " << std::endl; + ASSERT_FALSE(must_no); + if ((m->len - read_offset) == 0) { + ASSERT_TRUE(m->verify(buffer.data(), 0)); + delete m; + acking.pop_front(); + read_offset = 0; + buffer.clear(); + if (acking.empty()) { + m = &homeless_message; + must_no = true; + } else { + m = acking.front(); + buffer.resize(m->len); + } + } + } + if (acking.empty()) { + center->dispatch_event_external(&write_ctxt); + return ; + } + } + + void do_write_request() { + if (dead) + return ; + ASSERT_TRUE(socket.is_connected() > 0); + + while (left > 0 && factory->queue_depth > writings.size() + acking.size()) { + StressFactory::Message *m = new StressFactory::Message( + factory->rs, ++index, + factory->rd() % factory->max_message_length); + std::cerr << " client " << this << " generate message " << m->idx << " length " << m->len << std::endl; + ASSERT_EQ(m->len, m->content.size()); + writings.push_back(m); + --left; + --factory->message_left; + } + + while (!writings.empty()) { + StressFactory::Message *m = writings.front(); + bufferlist bl; + bl.append(m->content.data() + write_offset, m->content.size() - write_offset); + ssize_t r = socket.send(bl, false); + if (r == 0) + break; + std::cerr << " client " << this << " send " << m->idx << " len " << r << " content: " << std::endl; + ASSERT_TRUE(r >= 0); + write_offset += r; + if (write_offset == m->content.size()) { + write_offset = 0; + writings.pop_front(); + acking.push_back(m); + } + } + if (writings.empty() && write_enabled) { + center->delete_file_event(socket.fd(), EVENT_WRITABLE); + write_enabled = false; + } else if (!writings.empty() && !write_enabled) { + ASSERT_EQ(0, center->create_file_event( + socket.fd(), EVENT_WRITABLE, &write_ctxt)); + write_enabled = true; + } + } + + bool finish() const { + return left == 0 && acking.empty() && writings.empty(); + } + }; + friend class Client; + + class Server { + StressFactory *factory; + EventCenter *center; + ConnectedSocket socket; + std::deque buffers; + bool write_enabled = false; + bool dead = false; + + class Server_read_handle : public EventCallback { + Server *s; + public: + Server_read_handle(Server *_s): s(_s) {} + void do_request(int id) { + s->do_read_request(); + } + } read_ctxt; + + class Server_write_handle : public EventCallback { + Server *s; + public: + Server_write_handle(Server *_s): s(_s) {} + void do_request(int id) { + s->do_write_request(); + } + } write_ctxt; + + public: + Server(StressFactory *f, EventCenter *c, ConnectedSocket s): + factory(f), center(c), socket(std::move(s)), read_ctxt(this), write_ctxt(this) { + center->create_file_event(socket.fd(), EVENT_READABLE, &read_ctxt); + center->dispatch_event_external(&read_ctxt); + } + void close() { + ASSERT_FALSE(write_enabled); + socket.shutdown(); + center->delete_file_event(socket.fd(), EVENT_READABLE); + center->dispatch_event_external(new C_delete(this)); + } + void do_read_request() { + if (dead) + return ; + int r = 0; + while (true) { + char buf[4096]; + bufferptr data; + if (factory->zero_copy_read) { + r = socket.zero_copy_read(data); + } else { + r = socket.read(buf, sizeof(buf)); + } + ASSERT_TRUE(r == -EAGAIN || (r >= 0 && (size_t)r <= sizeof(buf))); + if (r == 0) { + ASSERT_TRUE(buffers.empty()); + dead = true; + return ; + } else if (r == -EAGAIN) + break; + if (factory->zero_copy_read) { + buffers.emplace_back(data.c_str(), 0, data.length()); + } else { + buffers.emplace_back(buf, 0, r); + } + std::cerr << " server " << this << " receive " << r << " content: " << std::endl; + } + if (!buffers.empty() && !write_enabled) + center->dispatch_event_external(&write_ctxt); + } + + void do_write_request() { + if (dead) + return ; + + while (!buffers.empty()) { + bufferlist bl; + auto it = buffers.begin(); + for (size_t i = 0; i < buffers.size(); ++i) { + bl.push_back(bufferptr((char*)it->data(), it->size())); + ++it; + } + + ssize_t r = socket.send(bl, false); + std::cerr << " server " << this << " send " << r << std::endl; + if (r == 0) + break; + ASSERT_TRUE(r >= 0); + while (r > 0) { + ASSERT_TRUE(!buffers.empty()); + string &buffer = buffers.front(); + if (r >= (int)buffer.size()) { + r -= (int)buffer.size(); + buffers.pop_front(); + } else { + std::cerr << " server " << this << " sent " << r << std::endl; + buffer = buffer.substr(r, buffer.size()); + break; + } + } + } + if (buffers.empty()) { + if (write_enabled) { + center->delete_file_event(socket.fd(), EVENT_WRITABLE); + write_enabled = false; + } + } else if (!write_enabled) { + ASSERT_EQ(0, center->create_file_event( + socket.fd(), EVENT_WRITABLE, &write_ctxt)); + write_enabled = true; + } + } + + bool finish() { + return dead; + } + }; + friend class Server; + + class C_accept : public EventCallback { + StressFactory *factory; + ServerSocket bind_socket; + ThreadData *t_data; + + public: + C_accept(StressFactory *f, ServerSocket s, ThreadData *data) + : factory(f), bind_socket(std::move(s)), t_data(data) {} + void do_request(int id) { + while (true) { + entity_addr_t cli_addr; + ConnectedSocket srv_socket; + SocketOptions options; + int r = bind_socket.accept(&srv_socket, options, &cli_addr); + if (r == -EAGAIN) { + break; + } + ASSERT_EQ(0, r); + ASSERT_TRUE(srv_socket.fd() > 0); + Server *cb = new Server(factory, &t_data->worker->center, std::move(srv_socket)); + t_data->servers.insert(cb); + } + } + }; + friend class C_accept; + + public: + static const size_t min_client_send_messages = 100; + static const size_t max_client_send_messages = 1000; + std::shared_ptr stack; + RandomString rs; + std::random_device rd; + const size_t client_num, queue_depth, max_message_length; + atomic_int message_count, message_left; + entity_addr_t bind_addr; + bool zero_copy_read; + SocketOptions options; + + explicit StressFactory(std::shared_ptr s, const string &addr, + size_t cli, size_t qd, size_t mc, size_t l, bool zero_copy) + : stack(s), rs(128), client_num(cli), queue_depth(qd), + max_message_length(l), message_count(mc), message_left(mc), + zero_copy_read(zero_copy) { + bind_addr.parse(addr.c_str()); + rs.prepare(100); + } + ~StressFactory() { + } + + void add_client(ThreadData *t_data) { + static Mutex lock("add_client_lock"); + Mutex::Locker l(lock); + ConnectedSocket sock; + int r = t_data->worker->connect(bind_addr, options, &sock); + std::default_random_engine rng(rd()); + std::uniform_int_distribution<> dist( + min_client_send_messages, max_client_send_messages); + ASSERT_EQ(0, r); + int c = dist(rng); + if (c > message_count.load()) + c = message_count.load(); + Client *cb = new Client(this, &t_data->worker->center, std::move(sock), c); + t_data->clients.insert(cb); + message_count -= c; + } + + void drop_client(ThreadData *t_data, Client *c) { + c->close(); + ASSERT_EQ(1U, t_data->clients.erase(c)); + } + + void drop_server(ThreadData *t_data, Server *s) { + s->close(); + ASSERT_EQ(1U, t_data->servers.erase(s)); + } + + void start(Worker *worker) { + int r = 0; + ThreadData t_data; + t_data.worker = worker; + ServerSocket bind_socket; + if (stack->support_local_listen_table() || worker->id == 0) { + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + } + C_accept *accept_handler = nullptr; + int bind_fd = 0; + if (bind_socket) { + bind_fd = bind_socket.fd(); + accept_handler = new C_accept(this, std::move(bind_socket), &t_data); + ASSERT_EQ(0, worker->center.create_file_event( + bind_fd, EVENT_READABLE, accept_handler)); + } + + int echo_throttle = message_count; + while (message_count > 0 || !t_data.clients.empty() || !t_data.servers.empty()) { + if (message_count > 0 && t_data.clients.size() < client_num && t_data.servers.size() < client_num) + add_client(&t_data); + for (auto &&c : t_data.clients) { + if (c->finish()) { + drop_client(&t_data, c); + break; + } + } + for (auto &&s : t_data.servers) { + if (s->finish()) { + drop_server(&t_data, s); + break; + } + } + + worker->center.process_events(1); + if (echo_throttle > message_left) { + std::cerr << " clients " << t_data.clients.size() << " servers " << t_data.servers.size() + << " message count " << message_left << std::endl; + echo_throttle -= 100; + } + } + if (bind_fd) + worker->center.delete_file_event(bind_fd, EVENT_READABLE); + delete accept_handler; + } +}; + +TEST_P(NetworkWorkerTest, StressTest) { + StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024, false); + StressFactory *f = &factory; + exec_events([f](Worker *worker) mutable { + f->start(worker); + }); + ASSERT_EQ(0, factory.message_left); +} + + +INSTANTIATE_TEST_CASE_P( + NetworkStack, + NetworkWorkerTest, + ::testing::Values( + "posix" + ) +); + +#else + +// Google Test may not support value-parameterized tests with some +// compilers. If we use conditional compilation to compile out all +// code referring to the gtest_main library, MSVC linker will not link +// that library at all and consequently complain about missing entry +// point defined in that library (fatal error LNK1561: entry point +// must be defined). This dummy test keeps gtest_main linked in. +TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {} + +#endif + + +int main(int argc, char **argv) { + vector args; + argv_to_vec(argc, (const char **)argv, args); + + global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); + common_init_finish(g_ceph_context); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +/* + * Local Variables: + * compile-command: "cd ../.. ; make ceph_test_async_networkstack && + * ./ceph_test_async_networkstack + * + * End: + */ From 9bfd11e7f99dc556db7b0d226a2247fb853b7e53 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Fri, 15 Jul 2016 15:20:10 +0800 Subject: [PATCH 15/16] msg/async/Event: remove global_centers assert In tests we allow to reset EventCenter instance in the same CephContext, so it may let global_centers->centers to set the same position multi times. Signed-off-by: Haomai Wang --- src/msg/async/Event.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 0b31b1eb3caf9..1582dc5a16b34 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -140,7 +140,7 @@ void EventCenter::set_owner() if (!global_centers) { cct->lookup_or_create_singleton_object( global_centers, "AsyncMessenger::EventCenter::global_center"); - assert(global_centers && !global_centers->centers[idx]); + assert(global_centers); global_centers->centers[idx] = this; notify_handler = new C_handle_notify(this, cct); int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler); From f6c3fd9994a11b7f8e588524d08857049ef17f89 Mon Sep 17 00:00:00 2001 From: Haomai Wang Date: Mon, 18 Jul 2016 17:27:31 +0800 Subject: [PATCH 16/16] test_msgr: ensure wait for ms_dispatch executed Otherwise in slow runner, ms_dispatch may not executed when assert Signed-off-by: Haomai Wang --- src/msg/async/AsyncConnection.cc | 2 +- src/test/msgr/test_msgr.cc | 30 +++++++++++++++++++++++++----- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 498cb03ff665e..96142c3b661e5 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -2006,8 +2006,8 @@ void AsyncConnection::fault() } write_lock.lock(); - shutdown_socket(); can_write = WriteStatus::NOWRITE; + shutdown_socket(); open_write = false; // queue delayed items immediately diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index a94c77125b294..05fcbbe4ba437 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -175,6 +175,7 @@ class FakeDispatcher : public Dispatcher { s->put(); } got_remote_reset = true; + cond.Signal(); } void ms_fast_dispatch(Message *m) { Mutex::Locker l(lock); @@ -450,6 +451,7 @@ TEST_P(MessengerTest, StatefulTest) { // don't lose state ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); + srv_dispatcher.got_new = false; conn = client_msgr->get_connection(server_msgr->get_myinst()); { m = new MPing(); @@ -461,14 +463,26 @@ TEST_P(MessengerTest, StatefulTest) { } ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); - ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); + { + Mutex::Locker l(srv_dispatcher.lock); + while (!srv_dispatcher.got_remote_reset) + srv_dispatcher.cond.Wait(srv_dispatcher.lock); + } // 2. test for client reconnect ASSERT_FALSE(cli_dispatcher.got_remote_reset); cli_dispatcher.got_connect = false; + cli_dispatcher.got_new = false; + cli_dispatcher.got_remote_reset = false; server_conn->mark_down(); ASSERT_FALSE(server_conn->is_connected()); // ensure client detect server socket closed + { + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_remote_reset) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_remote_reset = false; + } { Mutex::Locker l(cli_dispatcher.lock); while (!cli_dispatcher.got_connect) @@ -477,6 +491,7 @@ TEST_P(MessengerTest, StatefulTest) { } CHECK_AND_WAIT_TRUE(conn->is_connected()); ASSERT_TRUE(conn->is_connected()); + { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); @@ -487,10 +502,9 @@ TEST_P(MessengerTest, StatefulTest) { cli_dispatcher.got_new = false; } // resetcheck happen - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(static_cast(conn->get_priv())->get_count(), 1); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); - ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); - ASSERT_TRUE(cli_dispatcher.got_remote_reset); + ASSERT_EQ(static_cast(server_conn->get_priv())->get_count(), 1); cli_dispatcher.got_remote_reset = false; server_msgr->shutdown(); @@ -529,6 +543,7 @@ TEST_P(MessengerTest, StatelessTest) { conn->mark_down(); ASSERT_FALSE(conn->is_connected()); + srv_dispatcher.got_new = false; conn = client_msgr->get_connection(server_msgr->get_myinst()); { m = new MPing(); @@ -541,7 +556,12 @@ TEST_P(MessengerTest, StatelessTest) { ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); // server lose state - ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); + { + Mutex::Locker l(srv_dispatcher.lock); + while (!srv_dispatcher.got_new) + srv_dispatcher.cond.Wait(srv_dispatcher.lock); + } + ASSERT_EQ(static_cast(server_conn->get_priv())->get_count(), 1); // 2. test for client lossy server_conn->mark_down();