diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5a7e6126b9fa5..0596491aece3f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -424,6 +424,8 @@ set(libcommon_files msg/async/Event.cc msg/async/EventEpoll.cc msg/async/EventSelect.cc + msg/async/Stack.cc + msg/async/PosixStack.cc msg/async/net_handler.cc ${xio_common_srcs} msg/msg_types.cc diff --git a/src/common/ceph_context.cc b/src/common/ceph_context.cc index abf9f07e05bca..b4cb9cabe2060 100644 --- a/src/common/ceph_context.cc +++ b/src/common/ceph_context.cc @@ -472,6 +472,7 @@ CephContext::CephContext(uint32_t module_type_, int init_flags_) { ceph_spin_init(&_service_thread_lock); ceph_spin_init(&_associated_objs_lock); + ceph_spin_init(&_fork_watchers_lock); ceph_spin_init(&_feature_lock); ceph_spin_init(&_cct_perf_lock); @@ -575,6 +576,7 @@ CephContext::~CephContext() delete _conf; ceph_spin_destroy(&_service_thread_lock); + ceph_spin_destroy(&_fork_watchers_lock); ceph_spin_destroy(&_associated_objs_lock); ceph_spin_destroy(&_feature_lock); ceph_spin_destroy(&_cct_perf_lock); diff --git a/src/common/ceph_context.h b/src/common/ceph_context.h index 86c1a4a736de8..6bf03b3214b81 100644 --- a/src/common/ceph_context.h +++ b/src/common/ceph_context.h @@ -177,6 +177,33 @@ class CephContext { return _set_gid_string; } + class ForkWatcher { + public: + virtual ~ForkWatcher() {} + virtual void handle_pre_fork() = 0; + virtual void handle_post_fork() = 0; + }; + + void register_fork_watcher(ForkWatcher *w) { + ceph_spin_lock(&_fork_watchers_lock); + _fork_watchers.push_back(w); + ceph_spin_unlock(&_fork_watchers_lock); + } + + void notify_pre_fork() { + ceph_spin_lock(&_fork_watchers_lock); + for (auto &&t : _fork_watchers) + t->handle_pre_fork(); + ceph_spin_unlock(&_fork_watchers_lock); + } + + void notify_post_fork() { + ceph_spin_lock(&_fork_watchers_lock); + for (auto &&t : _fork_watchers) + t->handle_post_fork(); + ceph_spin_unlock(&_fork_watchers_lock); + } + private: struct SingletonWrapper : boost::noncopyable { virtual ~SingletonWrapper() {} @@ -235,6 +262,9 @@ class CephContext { ceph_spinlock_t _associated_objs_lock; std::map _associated_objs; + ceph_spinlock_t _fork_watchers_lock; + std::vector _fork_watchers; + // crypto CryptoHandler *_crypto_none; CryptoHandler *_crypto_aes; diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 72cee895d64e7..b42bab3ffdd1c 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -197,8 +197,9 @@ OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1] OPTION(ms_inject_internal_delays, OPT_DOUBLE, 0) // seconds OPTION(ms_dump_on_send, OPT_BOOL, false) // hexdump msg to log on send OPTION(ms_dump_corrupt_message_level, OPT_INT, 1) // debug level to hexdump undecodeable messages at -OPTION(ms_async_op_threads, OPT_INT, 3) // number of worker processing threads for async messenger created on init -OPTION(ms_async_max_op_threads, OPT_INT, 5) // max number of worker processing threads for async messenger +OPTION(ms_async_transport_type, OPT_STR, "posix") +OPTION(ms_async_op_threads, OPT_U64, 3) // number of worker processing threads for async messenger created on init +OPTION(ms_async_max_op_threads, OPT_U64, 5) // max number of worker processing threads for async messenger OPTION(ms_async_set_affinity, OPT_BOOL, true) // example: ms_async_affinity_cores = 0,1 // The number of coreset is expected to equal to ms_async_op_threads, otherwise diff --git a/src/global/global_init.cc b/src/global/global_init.cc index cda4438da9039..033516240b6c5 100644 --- a/src/global/global_init.cc +++ b/src/global/global_init.cc @@ -339,6 +339,7 @@ int global_init_prefork(CephContext *cct) return -1; } + cct->notify_pre_fork(); // stop log thread cct->_log->flush(); cct->_log->stop(); @@ -370,6 +371,7 @@ void global_init_postfork_start(CephContext *cct) { // restart log thread cct->_log->start(); + cct->notify_post_fork(); /* This is the old trick where we make file descriptors 0, 1, and possibly 2 * point to /dev/null. diff --git a/src/msg/Makefile.am b/src/msg/Makefile.am index 3081566d80d65..f05788d69677d 100644 --- a/src/msg/Makefile.am +++ b/src/msg/Makefile.am @@ -22,6 +22,8 @@ libmsg_la_SOURCES += \ msg/async/AsyncMessenger.cc \ msg/async/Event.cc \ msg/async/net_handler.cc \ + msg/async/Stack.cc \ + msg/async/PosixStack.cc \ msg/async/EventSelect.cc if LINUX @@ -47,6 +49,8 @@ noinst_HEADERS += \ msg/async/Event.h \ msg/async/EventEpoll.h \ msg/async/EventSelect.h \ + msg/async/Stack.h \ + msg/async/PosixStack.h \ msg/async/net_handler.h if LINUX diff --git a/src/msg/async/AsyncConnection.cc b/src/msg/async/AsyncConnection.cc index 80756a0e27825..96142c3b661e5 100644 --- a/src/msg/async/AsyncConnection.cc +++ b/src/msg/async/AsyncConnection.cc @@ -14,8 +14,6 @@ * */ -#include -#include #include #include "include/Context.h" @@ -23,8 +21,6 @@ #include "AsyncMessenger.h" #include "AsyncConnection.h" -#include "include/sock_compat.h" - // Constant to limit starting sequence number to 2^31. Nothing special about it, just a big number. PLR #define SEQ_MASK 0x7fffffff @@ -32,8 +28,9 @@ #undef dout_prefix #define dout_prefix _conn_prefix(_dout) ostream& AsyncConnection::_conn_prefix(std::ostream *_dout) { + int fd = (cs && state != STATE_CLOSED) ? cs.fd() : -1; return *_dout << "-- " << async_msgr->get_myinst().addr << " >> " << peer_addr << " conn(" << this - << " sd=" << sd << " :" << port + << " sd=" << fd << " :" << port << " s=" << get_state_name(state) << " pgs=" << peer_global_seq << " cs=" << connect_seq @@ -122,7 +119,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu Worker *w) : Connection(cct, m), delay_state(NULL), async_msgr(m), conn_id(q->get_id()), logger(w->get_perf_counter()), global_seq(0), connect_seq(0), peer_global_seq(0), - out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), sd(-1), port(-1), + out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(STATE_NONE), port(-1), dispatch_queue(q), can_write(WriteStatus::NOWRITE), open_write(false), keepalive(false), recv_buf(NULL), recv_max_prefetch(MAX(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)), @@ -130,7 +127,7 @@ AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQu last_active(ceph::coarse_mono_clock::now()), inactive_timeout_us(cct->_conf->ms_tcp_read_timeout*1000*1000), got_bad_auth(false), authorizer(NULL), replacing(false), - is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), net(cct), + is_reset_from_peer(false), once_ready(false), state_buffer(NULL), state_offset(0), worker(w), center(&w->center) { read_handler = new C_handle_read(this); @@ -167,203 +164,52 @@ void AsyncConnection::maybe_start_delay_thread() /* return -1 means `fd` occurs error or closed, it should be closed * return 0 means EAGAIN or EINTR */ -ssize_t AsyncConnection::read_bulk(int fd, char *buf, unsigned len) +ssize_t AsyncConnection::read_bulk(char *buf, unsigned len) { ssize_t nread; again: - nread = ::read(fd, buf, len); - if (nread == -1) { - if (errno == EAGAIN) { + nread = cs.read(buf, len); + if (nread < 0) { + if (nread == -EAGAIN) { nread = 0; - } else if (errno == EINTR) { + } else if (nread == -EINTR) { goto again; } else { - ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << fd - << " : "<< strerror(errno) << dendl; + ldout(async_msgr->cct, 1) << __func__ << " reading from fd=" << cs.fd() + << " : "<< strerror(nread) << dendl; return -1; } } else if (nread == 0) { ldout(async_msgr->cct, 1) << __func__ << " peer close file descriptor " - << fd << dendl; + << cs.fd() << dendl; return -1; } return nread; } -/* - SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL - http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html - http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html -*/ -void AsyncConnection::suppress_sigpipe() -{ -#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) - /* - We want to ignore possible SIGPIPE that we can generate on write. - SIGPIPE is delivered *synchronously* and *only* to the thread - doing the write. So if it is reported as already pending (which - means the thread blocks it), then we do nothing: if we generate - SIGPIPE, it will be merged with the pending one (there's no - queuing), and that suits us well. If it is not pending, we block - it in this thread (and we avoid changing signal action, because it - is per-process). - */ - sigset_t pending; - sigemptyset(&pending); - sigpending(&pending); - sigpipe_pending = sigismember(&pending, SIGPIPE); - if (!sigpipe_pending) { - sigset_t blocked; - sigemptyset(&blocked); - pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked); - - /* Maybe is was blocked already? */ - sigpipe_unblock = ! sigismember(&blocked, SIGPIPE); - } -#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ -} - - -void AsyncConnection::restore_sigpipe() -{ -#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) - /* - If SIGPIPE was pending already we do nothing. Otherwise, if it - become pending (i.e., we generated it), then we sigwait() it (thus - clearing pending status). Then we unblock SIGPIPE, but only if it - were us who blocked it. - */ - if (!sigpipe_pending) { - sigset_t pending; - sigemptyset(&pending); - sigpending(&pending); - if (sigismember(&pending, SIGPIPE)) { - /* - Protect ourselves from a situation when SIGPIPE was sent - by the user to the whole process, and was delivered to - other thread before we had a chance to wait for it. - */ - static const struct timespec nowait = { 0, 0 }; - TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait)); - } - - if (sigpipe_unblock) - pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL); - } -#endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ -} - -// return the length of msg needed to be sent, -// < 0 means error occured -ssize_t AsyncConnection::do_sendmsg(struct msghdr &msg, unsigned len, bool more) -{ - suppress_sigpipe(); - - while (len > 0) { - ssize_t r; -#if defined(MSG_NOSIGNAL) - r = ::sendmsg(sd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); -#else - r = ::sendmsg(sd, &msg, (more ? MSG_MORE : 0)); -#endif /* defined(MSG_NOSIGNAL) */ - - if (r == 0) { - ldout(async_msgr->cct, 10) << __func__ << " sendmsg got r==0!" << dendl; - } else if (r < 0) { - if (errno == EINTR) { - continue; - } else if (errno == EAGAIN) { - break; - } else { - ldout(async_msgr->cct, 1) << __func__ << " sendmsg error: " << cpp_strerror(errno) << dendl; - restore_sigpipe(); - return r; - } - } - - len -= r; - if (len == 0) break; - - // hrmph. drain r bytes from the front of our message. - ldout(async_msgr->cct, 20) << __func__ << " short write did " << r << ", still have " << len << dendl; - while (r > 0) { - if (msg.msg_iov[0].iov_len <= (size_t)r) { - // drain this whole item - r -= msg.msg_iov[0].iov_len; - msg.msg_iov++; - msg.msg_iovlen--; - } else { - msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r; - msg.msg_iov[0].iov_len -= r; - break; - } - } - } - restore_sigpipe(); - return (ssize_t)len; -} - // return the remaining bytes, it may larger than the length of ptr // else return < 0 means error ssize_t AsyncConnection::_try_send(bool more) { - if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; - ::shutdown(sd, SHUT_RDWR); - } - } - - uint64_t sent_bytes = 0; - list::const_iterator pb = outcoming_bl.buffers().begin(); - uint64_t left_pbrs = outcoming_bl.buffers().size(); - while (left_pbrs) { - struct msghdr msg; - uint64_t size = MIN(left_pbrs, ASYNC_IOV_MAX); - left_pbrs -= size; - memset(&msg, 0, sizeof(msg)); - msg.msg_iovlen = 0; - msg.msg_iov = msgvec; - unsigned msglen = 0; - while (size > 0) { - msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()); - msgvec[msg.msg_iovlen].iov_len = pb->length(); - msg.msg_iovlen++; - msglen += pb->length(); - ++pb; - size--; + cs.shutdown(); } - - ssize_t r = do_sendmsg(msg, msglen, left_pbrs || more); - if (r < 0) - return r; - - // "r" is the remaining length - sent_bytes += msglen - r; - if (r > 0) { - ldout(async_msgr->cct, 5) << __func__ << " remaining " << r - << " needed to be sent, creating event for writing" - << dendl; - break; - } - // only "r" == 0 continue } - // trim already sent for outcoming_bl - if (sent_bytes) { - if (sent_bytes < outcoming_bl.length()) { - outcoming_bl.splice(0, sent_bytes); - } else { - outcoming_bl.clear(); - } + ssize_t r = cs.send(outcoming_bl, more); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " send error: " << cpp_strerror(r) << dendl; + return r; } - ldout(async_msgr->cct, 20) << __func__ << " sent bytes " << sent_bytes + ldout(async_msgr->cct, 10) << __func__ << " sent bytes " << r << " remaining bytes " << outcoming_bl.length() << dendl; if (!open_write && is_queued()) { if (center->in_thread()) { - center->create_file_event(sd, EVENT_WRITABLE, write_handler); + center->create_file_event(cs.fd(), EVENT_WRITABLE, write_handler); open_write = true; } else { center->dispatch_event_external(write_handler); @@ -372,7 +218,7 @@ ssize_t AsyncConnection::_try_send(bool more) if (open_write && !is_queued()) { if (center->in_thread()) { - center->delete_file_event(sd, EVENT_WRITABLE); + center->delete_file_event(cs.fd(), EVENT_WRITABLE); open_write = false; } else { center->dispatch_event_external(write_handler); @@ -398,10 +244,10 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p) ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is " << state_offset << dendl; - if (async_msgr->cct->_conf->ms_inject_socket_failures && sd >= 0) { + if (async_msgr->cct->_conf->ms_inject_socket_failures && cs) { if (rand() % async_msgr->cct->_conf->ms_inject_socket_failures == 0) { ldout(async_msgr->cct, 0) << __func__ << " injecting socket failure" << dendl; - ::shutdown(sd, SHUT_RDWR); + cs.shutdown(); } } @@ -426,7 +272,7 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p) if (len > recv_max_prefetch) { /* this was a large read, we don't prefetch for these */ do { - r = read_bulk(sd, p+state_offset, left); + r = read_bulk(p+state_offset, left); ldout(async_msgr->cct, 25) << __func__ << " read_bulk left is " << left << " got " << r << dendl; if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " read failed" << dendl; @@ -440,7 +286,7 @@ ssize_t AsyncConnection::read_until(unsigned len, char *p) } while (r > 0); } else { do { - r = read_bulk(sd, recv_buf+recv_end, recv_max_prefetch); + r = read_bulk(recv_buf+recv_end, recv_max_prefetch); ldout(async_msgr->cct, 25) << __func__ << " read_bulk recv_end is " << recv_end << " left is " << left << " got " << r << dendl; if (r < 0) { @@ -1011,35 +857,35 @@ ssize_t AsyncConnection::_process_connection() global_seq = async_msgr->get_global_seq(); // close old socket. this is safe because we stopped the reader thread above. - if (sd >= 0) { - center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); - ::close(sd); + if (cs) { + center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); + cs.close(); } - sd = net.nonblock_connect(get_peer_addr()); - if (sd < 0) { + SocketOptions opts; + opts.priority = async_msgr->get_socket_priority(); + r = worker->connect(get_peer_addr(), opts, &cs); + if (r < 0) goto fail; - } - center->create_file_event(sd, EVENT_READABLE, read_handler); + center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); state = STATE_CONNECTING_RE; break; } case STATE_CONNECTING_RE: { - r = net.reconnect(get_peer_addr(), sd); + r = cs.is_connected(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " reconnect failed " << dendl; goto fail; - } else if (r > 0) { + } else if (r == 0) { ldout(async_msgr->cct, 10) << __func__ << " nonblock connect inprogress" << dendl; - center->create_file_event(sd, EVENT_WRITABLE, read_handler); + center->create_file_event(cs.fd(), EVENT_WRITABLE, read_handler); break; } - center->delete_file_event(sd, EVENT_WRITABLE); - net.set_priority(sd, async_msgr->get_socket_priority()); + center->delete_file_event(cs.fd(), EVENT_WRITABLE); ldout(async_msgr->cct, 10) << __func__ << " connect successfully, ready to send banner" << dendl; bufferlist bl; @@ -1092,7 +938,7 @@ ssize_t AsyncConnection::_process_connection() goto fail; } ldout(async_msgr->cct, 20) << __func__ << " connect read peer addr " - << paddr << " on socket " << sd << dendl; + << paddr << " on socket " << cs.fd() << dendl; if (peer_addr != paddr) { if (paddr.is_blank_ip() && peer_addr.get_port() == paddr.get_port() && peer_addr.get_nonce() == paddr.get_nonce()) { @@ -1139,7 +985,7 @@ ssize_t AsyncConnection::_process_connection() << async_msgr->get_myaddr() << dendl; } else { ldout(async_msgr->cct, 2) << __func__ << " connect couldn't write my addr, " - << cpp_strerror(errno) << dendl; + << cpp_strerror(r) << dendl; goto fail; } @@ -1185,7 +1031,7 @@ ssize_t AsyncConnection::_process_connection() ldout(async_msgr->cct, 10) << __func__ << " continue send reply " << dendl; } else { ldout(async_msgr->cct, 2) << __func__ << " connect couldn't send reply " - << cpp_strerror(errno) << dendl; + << cpp_strerror(r) << dendl; goto fail; } @@ -1340,30 +1186,14 @@ ssize_t AsyncConnection::_process_connection() case STATE_ACCEPTING: { bufferlist bl; - - if (net.set_nonblock(sd) < 0) - goto fail; - - net.set_socket_options(sd, async_msgr->cct->_conf->ms_tcp_nodelay, async_msgr->cct->_conf->ms_tcp_rcvbuf); - net.set_priority(sd, async_msgr->get_socket_priority()); - center->create_file_event(sd, EVENT_READABLE, read_handler); + center->create_file_event(cs.fd(), EVENT_READABLE, read_handler); bl.append(CEPH_BANNER, strlen(CEPH_BANNER)); ::encode(async_msgr->get_myaddr(), bl, 0); // legacy port = async_msgr->get_myaddr().get_port(); - // and peer's socket addr (they might not know their ip) - sockaddr_storage ss; - socklen_t len = sizeof(ss); - r = ::getpeername(sd, (sockaddr*)&ss, &len); - if (r < 0) { - ldout(async_msgr->cct, 0) << __func__ << " failed to getpeername " - << cpp_strerror(errno) << dendl; - goto fail; - } - socket_addr.set_sockaddr((sockaddr*)&ss); ::encode(socket_addr, bl, 0); // legacy - ldout(async_msgr->cct, 1) << __func__ << " sd=" << sd << " " << socket_addr << dendl; + ldout(async_msgr->cct, 1) << __func__ << " sd=" << cs.fd() << " " << socket_addr << dendl; r = try_send(bl); if (r == 0) { @@ -1813,7 +1643,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->is_reset_from_peer = true; } - center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); + center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); // Clean up output buffer existing->outcoming_bl.clear(); @@ -1824,13 +1654,13 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis existing->requeue_sent(); existing->reset_recv_state(); - int new_fd = sd; + auto temp_cs = std::move(cs); EventCenter *new_center = center; Worker *new_worker = worker; // avoid _stop shutdown replacing socket - sd = -1; // queue a reset on the new connection, which we're dumping for the old _stop(); + dispatch_queue->queue_reset(this); ldout(async_msgr->cct, 1) << __func__ << " stop myself to swap existing" << dendl; existing->can_write = WriteStatus::REPLACING; @@ -1844,25 +1674,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // there shouldn't exist any buffer assert(recv_start == recv_end); - existing->write_lock.unlock(); - // new sd now isn't registered any event while origin events - // have been deleted. - // previous existing->sd now is still open, event will continue to - // notify previous existing->center from now. - // From now, no one will dispatch event to `existing` - // Note: we must use async dispatch instead of execute this inline - // even existing->center == center. Because we must ensure below - // event executed after all pending external events like - // "dispatch_state->queue" - existing->center->submit_to( - existing->center->get_id(), - [existing, new_fd, new_worker, new_center, connect, reply, authorizer_reply]() mutable { + auto deactivate_existing = std::bind( + [existing, new_worker, new_center, connect, reply, authorizer_reply](ConnectedSocket &cs) mutable { // we need to delete time event in original thread { std::lock_guard l(existing->lock); if (existing->state == STATE_NONE) { existing->shutdown_socket(); - existing->sd = new_fd; + existing->cs = std::move(cs); existing->worker->references--; new_worker->references++; existing->logger = new_worker->get_perf_counter(); @@ -1871,7 +1690,7 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis if (existing->delay_state) existing->delay_state->set_center(new_center); } else if (existing->state == STATE_CLOSED) { - ::close(new_fd); + cs.close(); return ; } else { assert(0); @@ -1881,15 +1700,14 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis // Before changing existing->center, it may already exists some events in existing->center's queue. // Then if we mark down `existing`, it will execute in another thread and clean up connection. // Previous event will result in segment fault - auto transfer_existing = [existing, new_fd, connect, reply, authorizer_reply]() mutable { + auto transfer_existing = [existing, connect, reply, authorizer_reply]() mutable { std::lock_guard l(existing->lock); if (existing->state == STATE_CLOSED) return ; - assert(new_fd == existing->sd); assert(existing->state == STATE_NONE); - + existing->state = STATE_ACCEPTING_WAIT_CONNECT_MSG; - existing->center->create_file_event(existing->sd, EVENT_READABLE, existing->read_handler); + existing->center->create_file_event(existing->cs.fd(), EVENT_READABLE, existing->read_handler); reply.global_seq = existing->peer_global_seq; if (existing->_reply_accept(CEPH_MSGR_TAG_RETRY_GLOBAL, connect, reply, authorizer_reply) < 0) { // handle error @@ -1901,8 +1719,11 @@ ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlis else existing->center->submit_to( existing->center->get_id(), std::move(transfer_existing), true); - }, true); + }, std::move(temp_cs)); + existing->center->submit_to( + existing->center->get_id(), std::move(deactivate_existing), true); + existing->write_lock.unlock(); existing->lock.unlock(); return 0; } @@ -2012,13 +1833,14 @@ void AsyncConnection::_connect() center->dispatch_event_external(read_handler); } -void AsyncConnection::accept(int incoming) +void AsyncConnection::accept(ConnectedSocket socket, entity_addr_t &addr) { - ldout(async_msgr->cct, 10) << __func__ << " sd=" << incoming << dendl; - assert(sd < 0); + ldout(async_msgr->cct, 10) << __func__ << " sd=" << socket.fd() << dendl; + assert(socket.fd() > 0); std::lock_guard l(lock); - sd = incoming; + cs = std::move(socket); + socket_addr = addr; state = STATE_ACCEPTING; // rescheduler connection in order to avoid lock dep center->dispatch_event_external(read_handler); @@ -2184,8 +2006,8 @@ void AsyncConnection::fault() } write_lock.lock(); - shutdown_socket(); can_write = WriteStatus::NOWRITE; + shutdown_socket(); open_write = false; // queue delayed items immediately @@ -2407,7 +2229,7 @@ ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl, bool more) ssize_t rc = _try_send(more); if (rc < 0) { ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", " - << cpp_strerror(errno) << dendl; + << cpp_strerror(rc) << dendl; } else if (rc == 0) { ldout(async_msgr->cct, 10) << __func__ << " sending " << m << " done." << dendl; } else { @@ -2617,7 +2439,7 @@ void AsyncConnection::handle_write() if (state == STATE_STANDBY && !policy.server && is_queued()) { ldout(async_msgr->cct, 10) << __func__ << " policy.server is false" << dendl; _connect(); - } else if (sd >= 0 && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) { + } else if (cs && state != STATE_NONE && state != STATE_CONNECTING && state != STATE_CONNECTING_RE && state != STATE_CLOSED) { r = _try_send(); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; diff --git a/src/msg/async/AsyncConnection.h b/src/msg/async/AsyncConnection.h index 3988bf9a7447b..9d6c3435eef64 100644 --- a/src/msg/async/AsyncConnection.h +++ b/src/msg/async/AsyncConnection.h @@ -34,7 +34,7 @@ using namespace std; #include "msg/Messenger.h" #include "Event.h" -#include "net_handler.h" +#include "Stack.h" class AsyncMessenger; class Worker; @@ -50,9 +50,7 @@ static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); */ class AsyncConnection : public Connection { - ssize_t read_bulk(int fd, char *buf, unsigned len); - void suppress_sigpipe(); - void restore_sigpipe(); + ssize_t read_bulk(char *buf, unsigned len); ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more); ssize_t try_send(bufferlist &bl, bool more=false) { std::lock_guard l(write_lock); @@ -110,11 +108,10 @@ class AsyncConnection : public Connection { center->delete_time_event(last_tick_id); last_tick_id = 0; } - if (sd >= 0) { - center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); - ::shutdown(sd, SHUT_RDWR); - ::close(sd); - sd = -1; + if (cs) { + center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE); + cs.shutdown(); + cs.close(); } } Message *_get_next_outgoing(bufferlist *bl) { @@ -209,7 +206,7 @@ class AsyncConnection : public Connection { _connect(); } // Only call when AsyncConnection first construct - void accept(int sd); + void accept(ConnectedSocket socket, entity_addr_t &addr); int send_message(Message *m) override; void send_keepalive() override; @@ -303,7 +300,7 @@ class AsyncConnection : public Connection { atomic64_t ack_left, in_seq; int state; int state_after_send; - int sd; + ConnectedSocket cs; int port; Messenger::Policy policy; @@ -372,17 +369,10 @@ class AsyncConnection : public Connection { char *state_buffer; // used only by "read_until" uint64_t state_offset; - NetHandler net; Worker *worker; EventCenter *center; ceph::shared_ptr session_security; -#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) - sigset_t sigpipe_mask; - bool sigpipe_pending; - bool sigpipe_unblock; -#endif - public: // used by eventcallback void handle_write(); diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 53eff733e2703..a7270cd52c712 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -16,19 +16,14 @@ #include "acconfig.h" -#include #include #include #include "AsyncMessenger.h" -#include "include/str_list.h" -#include "common/strtol.h" #include "common/config.h" #include "common/Timer.h" #include "common/errno.h" -#include "auth/Crypto.h" -#include "include/Spinlock.h" #define dout_subsys ceph_subsys_ms #undef dout_prefix @@ -41,14 +36,6 @@ static ostream& _prefix(std::ostream *_dout, Processor *p) { return *_dout << " Processor -- "; } -static ostream& _prefix(std::ostream *_dout, Worker *w) { - return *_dout << " Worker -- "; -} - -static ostream& _prefix(std::ostream *_dout, WorkerPool *p) { - return *_dout << " WorkerPool -- "; -} - /******************* * Processor @@ -64,13 +51,9 @@ class Processor::C_processor_accept : public EventCallback { } }; -Processor::Processor(AsyncMessenger *r, CephContext *c, uint64_t n) - : msgr(r), - net(c), - worker(NULL), - listen_sd(-1), - nonce(n), - listen_handler(new C_processor_accept(this)) {} +Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n) + : msgr(r), net(c), worker(w), nonce(n), + listen_handler(new C_processor_accept(this)) {} int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) { @@ -90,30 +73,16 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET; } - /* socket creation */ - listen_sd = ::socket(family, SOCK_STREAM, 0); - if (listen_sd < 0) { - lderr(msgr->cct) << __func__ << " unable to create socket: " - << cpp_strerror(errno) << dendl; - return -errno; - } - - int r = net.set_nonblock(listen_sd); - if (r < 0) { - ::close(listen_sd); - listen_sd = -1; - return r; - } - net.set_close_on_exec(listen_sd); - net.set_socket_options(listen_sd, msgr->cct->_conf->ms_tcp_nodelay, msgr->cct->_conf->ms_tcp_rcvbuf); + SocketOptions opts; + opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; + opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; // use whatever user specified (if anything) entity_addr_t listen_addr = bind_addr; listen_addr.set_family(family); /* bind to port */ - int rc = -1; - r = -1; + int r = -1; for (int i = 0; i < conf->ms_bind_retry_count; i++) { if (i > 0) { @@ -123,22 +92,12 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) } if (listen_addr.get_port()) { - // specific port - // reuse addr+port when possible - int on = 1; - rc = ::setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); - if (rc < 0) { - lderr(msgr->cct) << __func__ << " unable to setsockopt: " << cpp_strerror(errno) << dendl; - r = -errno; - continue; - } - - rc = ::bind(listen_sd, listen_addr.get_sockaddr(), - listen_addr.get_sockaddr_len()); - if (rc < 0) { + worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() { + r = worker->listen(listen_addr, opts, &listen_socket); + }, false); + if (r < 0) { lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr - << ": " << cpp_strerror(errno) << dendl; - r = -errno; + << ": " << cpp_strerror(r) << dendl; continue; } } else { @@ -148,60 +107,34 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) continue; listen_addr.set_port(port); - rc = ::bind(listen_sd, listen_addr.get_sockaddr(), - listen_addr.get_sockaddr_len()); - if (rc == 0) + worker->center.submit_to(worker->center.get_id(), [this, &listen_addr, &opts, &r]() { + r = worker->listen(listen_addr, opts, &listen_socket); + }, false); + if (r == 0) break; } - if (rc < 0) { + if (r < 0) { lderr(msgr->cct) << __func__ << " unable to bind to " << listen_addr << " on any port in range " << msgr->cct->_conf->ms_bind_port_min << "-" << msgr->cct->_conf->ms_bind_port_max << ": " - << cpp_strerror(errno) << dendl; - r = -errno; + << cpp_strerror(r) << dendl; listen_addr.set_port(0); // Clear port before retry, otherwise we shall fail again. continue; } ldout(msgr->cct, 10) << __func__ << " bound on random port " << listen_addr << dendl; } - if (rc == 0) + if (r == 0) break; } // It seems that binding completely failed, return with that exit status - if (rc < 0) { + if (r < 0) { lderr(msgr->cct) << __func__ << " was unable to bind after " << conf->ms_bind_retry_count - << " attempts: " << cpp_strerror(errno) << dendl; - ::close(listen_sd); - listen_sd = -1; + << " attempts: " << cpp_strerror(r) << dendl; return r; } - // what port did we get? - sockaddr_storage ss; - socklen_t llen = sizeof(ss); - rc = getsockname(listen_sd, (sockaddr*)&ss, &llen); - if (rc < 0) { - rc = -errno; - lderr(msgr->cct) << __func__ << " failed getsockname: " << cpp_strerror(rc) << dendl; - ::close(listen_sd); - listen_sd = -1; - return rc; - } - listen_addr.set_sockaddr((sockaddr*)&ss); - ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl; - // listen! - rc = ::listen(listen_sd, 128); - if (rc < 0) { - rc = -errno; - lderr(msgr->cct) << __func__ << " unable to listen on " << listen_addr - << ": " << cpp_strerror(rc) << dendl; - ::close(listen_sd); - listen_sd = -1; - return rc; - } - msgr->set_myaddr(bind_addr); if (bind_addr != entity_addr_t()) msgr->learned_addr(bind_addr); @@ -237,47 +170,49 @@ int Processor::rebind(const set& avoid_ports) return bind(addr, new_avoid); } -void Processor::start(Worker *w) +void Processor::start() { - ldout(msgr->cct, 1) << __func__ << " " << dendl; + ldout(msgr->cct, 1) << __func__ << dendl; // start thread - if (listen_sd >= 0) { - worker = w; + if (listen_socket) { worker->center.submit_to(worker->center.get_id(), [this]() { - worker->center.create_file_event(listen_sd, EVENT_READABLE, listen_handler); }); + worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); }, false); } } void Processor::accept() { - ldout(msgr->cct, 10) << __func__ << " listen_sd=" << listen_sd << dendl; + ldout(msgr->cct, 10) << __func__ << " listen_fd=" << listen_socket.fd() << dendl; + SocketOptions opts; + opts.nodelay = msgr->cct->_conf->ms_tcp_nodelay; + opts.rcbuf_size = msgr->cct->_conf->ms_tcp_rcvbuf; + opts.priority = msgr->get_socket_priority(); while (true) { - sockaddr_storage ss; - socklen_t slen = sizeof(ss); - int sd = ::accept(listen_sd, (sockaddr*)&ss, &slen); - if (sd >= 0) { - net.set_close_on_exec(sd); - ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl; - - msgr->add_accept(sd); + entity_addr_t addr; + ConnectedSocket cli_socket; + int r = listen_socket.accept(&cli_socket, opts, &addr); + if (r == 0) { + ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << cli_socket.fd() << dendl; + + msgr->add_accept(worker, std::move(cli_socket), addr); continue; } else { - if (errno == EINTR) { + if (r == -EINTR) { continue; - } else if (errno == EAGAIN) { + } else if (r == -EAGAIN) { break; - } else if (errno == EMFILE || errno == ENFILE) { - lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << sd - << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } else if (r == -EMFILE || r == -ENFILE) { + lderr(msgr->cct) << __func__ << " open file descriptions limit reached sd = " << listen_socket.fd() + << " errno " << r << " " << cpp_strerror(r) << dendl; break; - } else if (errno == ECONNABORTED) { - ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << sd - << " errno " << errno << " " << cpp_strerror(errno) << dendl; + } else if (r == -ECONNABORTED) { + ldout(msgr->cct, 0) << __func__ << " it was closed because of rst arrived sd = " << listen_socket.fd() + << " errno " << r << " " << cpp_strerror(r) << dendl; continue; } else { - lderr(msgr->cct) << __func__ << " no incoming connection? sd = " << sd - << " errno " << errno << " " << cpp_strerror(errno) << dendl; + lderr(msgr->cct) << __func__ << " no incoming connection?" + << " errno " << r << " " << cpp_strerror(r) << dendl; break; } } @@ -288,208 +223,25 @@ void Processor::stop() { ldout(msgr->cct,10) << __func__ << dendl; - if (listen_sd >= 0) { + if (listen_socket) { worker->center.submit_to(worker->center.get_id(), [this]() { - worker->center.delete_file_event(listen_sd, EVENT_READABLE); - ::shutdown(listen_sd, SHUT_RDWR); - ::close(listen_sd); - listen_sd = -1; - }); - } -} - -void Worker::stop() -{ - ldout(cct, 10) << __func__ << dendl; - done = true; - center.wakeup(); -} - -class WorkerPool { - CephContext *cct; - vector workers; - vector coreids; - // Used to indicate whether thread started - bool started; - Mutex barrier_lock; - Cond barrier_cond; - atomic_t barrier_count; - simple_spinlock_t pool_spin = SIMPLE_SPINLOCK_INITIALIZER; - - class C_barrier : public EventCallback { - WorkerPool *pool; - public: - explicit C_barrier(WorkerPool *p): pool(p) {} - void do_request(int id) { - Mutex::Locker l(pool->barrier_lock); - pool->barrier_count.dec(); - pool->barrier_cond.Signal(); - delete this; - } - }; - friend class C_barrier; - public: - std::atomic_uint pending; - explicit WorkerPool(CephContext *c); - WorkerPool(const WorkerPool &) = delete; - WorkerPool& operator=(const WorkerPool &) = delete; - virtual ~WorkerPool(); - void start(); - Worker *get_worker(); - int get_cpuid(int id) { - if (coreids.empty()) - return -1; - return coreids[id % coreids.size()]; - } - void barrier(); -}; - -void *Worker::entry() -{ - ldout(cct, 10) << __func__ << " starting" << dendl; - if (cct->_conf->ms_async_set_affinity) { - int cid = pool->get_cpuid(id); - if (cid >= 0 && set_affinity(cid)) { - ldout(cct, 0) << __func__ << " sched_setaffinity failed: " - << cpp_strerror(errno) << dendl; - } - } - - center.set_owner(); - pool->pending--; - while (!done) { - ldout(cct, 20) << __func__ << " calling event process" << dendl; - - int r = center.process_events(EventMaxWaitUs); - if (r < 0) { - ldout(cct, 20) << __func__ << " process events failed: " - << cpp_strerror(errno) << dendl; - // TODO do something? - } - } - - return 0; -} - -/******************* - * WorkerPool - *******************/ -WorkerPool::WorkerPool(CephContext *c): cct(c), started(false), - barrier_lock("WorkerPool::WorkerPool::barrier_lock"), - barrier_count(0), pending(0) -{ - assert(cct->_conf->ms_async_op_threads > 0); - // make sure user won't try to force some crazy number of worker threads - assert(cct->_conf->ms_async_max_op_threads >= cct->_conf->ms_async_op_threads && - cct->_conf->ms_async_op_threads <= 32); - for (int i = 0; i < cct->_conf->ms_async_op_threads; ++i) { - Worker *w = new Worker(cct, this, i); - workers.push_back(w); - } - vector corestrs; - get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs); - for (vector::iterator it = corestrs.begin(); - it != corestrs.end(); ++it) { - string err; - int coreid = strict_strtol(it->c_str(), 10, &err); - if (err == "") - coreids.push_back(coreid); - else - lderr(cct) << __func__ << " failed to parse " << *it << " in " << cct->_conf->ms_async_affinity_cores << dendl; - } - -} - -WorkerPool::~WorkerPool() -{ - for (uint64_t i = 0; i < workers.size(); ++i) { - if (workers[i]->is_started()) { - workers[i]->stop(); - workers[i]->join(); - } - delete workers[i]; - } -} - -void WorkerPool::start() -{ - if (!started) { - for (uint64_t i = 0; i < workers.size(); ++i) { - pending++; - workers[i]->create("ms_async_worker"); - } - started = true; + worker->center.delete_file_event(listen_socket.fd(), EVENT_READABLE); + listen_socket.abort_accept(); + }, false); } - while (pending) - usleep(50); } -Worker* WorkerPool::get_worker() -{ - ldout(cct, 10) << __func__ << dendl; - - // start with some reasonably large number - unsigned min_load = std::numeric_limits::max(); - Worker* current_best = nullptr; - - simple_spin_lock(&pool_spin); - // find worker with least references - // tempting case is returning on references == 0, but in reality - // this will happen so rarely that there's no need for special case. - for (auto p = workers.begin(); p != workers.end(); ++p) { - unsigned worker_load = (*p)->references.load(); - ldout(cct, 20) << __func__ << " Worker " << *p << " load: " << worker_load << dendl; - if (worker_load < min_load) { - current_best = *p; - min_load = worker_load; - } - } - // if minimum load exceeds amount of workers, make a new worker - // logic behind this is that we're not going to create new worker - // just because others have *some* load, we'll defer worker creation - // until others have *plenty* of load. This will cause new worker - // to get assigned to all new connections *unless* one or more - // of workers get their load reduced - in that case, this worker - // will be assigned to new connection. - // TODO: add more logic and heuristics, so connections known to be - // of light workload (heartbeat service, etc.) won't overshadow - // heavy workload (clients, etc). - if (!current_best || ((workers.size() < (unsigned)cct->_conf->ms_async_max_op_threads) - && (min_load > workers.size()))) { - ldout(cct, 20) << __func__ << " creating worker" << dendl; - current_best = new Worker(cct, this, workers.size()); - workers.push_back(current_best); - pending++; - current_best->create("ms_async_worker"); - } else { - ldout(cct, 20) << __func__ << " picked " << current_best - << " as best worker with load " << min_load << dendl; +struct StackSingleton { + std::shared_ptr stack; + StackSingleton(CephContext *c) { + stack = NetworkStack::create(c, c->_conf->ms_async_transport_type); } - - ++current_best->references; - simple_spin_unlock(&pool_spin); - - while (pending) - usleep(50); - assert(current_best); - return current_best; -} - -void WorkerPool::barrier() -{ - ldout(cct, 10) << __func__ << " started." << dendl; - for (vector::iterator it = workers.begin(); it != workers.end(); ++it) { - barrier_count.inc(); - (*it)->center.dispatch_event_external(EventCallbackRef(new C_barrier(this))); + ~StackSingleton() { + stack->stop(); } - ldout(cct, 10) << __func__ << " wait for " << barrier_count.read() << " barrier" << dendl; - Mutex::Locker l(barrier_lock); - while (barrier_count.read()) - barrier_cond.Wait(barrier_lock); +}; - ldout(cct, 10) << __func__ << " end." << dendl; -} class C_handle_reap : public EventCallback { AsyncMessenger *msgr; @@ -509,7 +261,6 @@ class C_handle_reap : public EventCallback { AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, string mname, uint64_t _nonce, uint64_t features) : SimplePolicyMessenger(cct, name,mname, _nonce), - processor(this, cct, _nonce), dispatch_queue(cct, this, mname), lock("AsyncMessenger::lock"), nonce(_nonce), need_addr(true), did_bind(false), @@ -517,13 +268,20 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, cluster_protocol(0), stopped(true) { ceph_spin_init(&global_seq_lock); - // uniq name for CephContext to distinguish from other objects - cct->lookup_or_create_singleton_object(pool, "AsyncMessenger::WorkerPool"); - local_worker = pool->get_worker(); + StackSingleton *single; + cct->lookup_or_create_singleton_object(single, "AsyncMessenger::NetworkStack"); + stack = single->stack.get(); + stack->start(); + local_worker = stack->get_worker(); local_connection = new AsyncConnection(cct, this, &dispatch_queue, local_worker); local_features = features; init_local_connection(); reap_handler = new C_handle_reap(this); + unsigned processor_num = 1; + if (stack->support_local_listen_table()) + processor_num = stack->get_num_worker(); + for (unsigned i = 0; i < processor_num; ++i) + processors.push_back(new Processor(this, stack->get_worker(i), cct, _nonce)); } /** @@ -535,16 +293,18 @@ AsyncMessenger::~AsyncMessenger() delete reap_handler; assert(!did_bind); // either we didn't bind or we shut down the Processor local_connection->mark_down(); + for (auto &&p : processors) + delete p; } void AsyncMessenger::ready() { ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; + stack->start(); Mutex::Locker l(lock); - pool->start(); - Worker *w = pool->get_worker(); - processor.start(w); + for (auto &&p : processors) + p->start(); dispatch_queue.start(); } @@ -552,17 +312,18 @@ int AsyncMessenger::shutdown() { ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; + // done! clean up. + for (auto &&p : processors) + p->stop(); mark_down_all(); // break ref cycles on the loopback connection local_connection->set_priv(NULL); - // done! clean up. - processor.stop(); did_bind = false; lock.Lock(); stop_cond.Signal(); stopped = true; lock.Unlock(); - pool->barrier(); + stack->drain(); return 0; } @@ -580,7 +341,25 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr) // bind to a socket set avoid_ports; - int r = processor.bind(bind_addr, avoid_ports); + int r = 0; + unsigned i = 0; + for (auto &&p : processors) { + r = p->bind(bind_addr, avoid_ports); + if (r < 0) { + // Note: this is related to local tcp listen table problem. + // Posix(default kernel implementation) backend shares listen table + // in the kernel, so all threads can use the same listen table naturally + // and only one thread need to bind. But other backends(like dpdk) uses local + // listen table, we need to bind/listen tcp port for each worker. So if the + // first worker failed to bind, it could be think the normal error then handle + // it, like port is used case. But if the first worker successfully to bind + // but the second worker failed, it's not expected and we need to assert + // here + assert(i == 0); + break; + } + ++i; + } if (r >= 0) did_bind = true; return r; @@ -591,12 +370,20 @@ int AsyncMessenger::rebind(const set& avoid_ports) ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl; assert(did_bind); - processor.stop(); + for (auto &&p : processors) + p->stop(); mark_down_all(); - int r = processor.rebind(avoid_ports); - if (r == 0) { - Worker *w = pool->get_worker(); - processor.start(w); + unsigned i = 0; + int r = 0; + for (auto &&p : processors) { + r = p->rebind(avoid_ports); + if (r == 0) { + p->start(); + } else { + assert(i == 0); + break; + } + i++; } return r; } @@ -644,19 +431,20 @@ void AsyncMessenger::wait() // close all connections shutdown_connections(false); - pool->barrier(); + stack->drain(); ldout(cct, 10) << __func__ << ": done." << dendl; ldout(cct, 1) << __func__ << " complete." << dendl; started = false; } -AsyncConnectionRef AsyncMessenger::add_accept(int sd) +AsyncConnectionRef AsyncMessenger::add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr) { lock.Lock(); - Worker *w = pool->get_worker(); + if (!stack->support_local_listen_table()) + w = stack->get_worker(); AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); - conn->accept(sd); + conn->accept(std::move(cli_socket), addr); accepting_conns.insert(conn); lock.Unlock(); return conn; @@ -671,7 +459,7 @@ AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int << ", creating connection and registering" << dendl; // create connection - Worker *w = pool->get_worker(); + Worker *w = stack->get_worker(); AsyncConnectionRef conn = new AsyncConnection(cct, this, &dispatch_queue, w); conn->connect(addr, type); assert(!conns.count(addr)); diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 8125cff2f91ce..47898f49d91d4 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -36,70 +36,9 @@ using namespace std; #include "include/assert.h" #include "AsyncConnection.h" #include "Event.h" -#include "common/simple_spin.h" class AsyncMessenger; -class WorkerPool; - -enum { - l_msgr_first = 94000, - l_msgr_recv_messages, - l_msgr_send_messages, - l_msgr_send_messages_inline, - l_msgr_recv_bytes, - l_msgr_send_bytes, - l_msgr_created_connections, - l_msgr_active_connections, - l_msgr_last, -}; - - -class Worker : public Thread { - static const uint64_t InitEventNumber = 5000; - static const uint64_t EventMaxWaitUs = 30000000; - CephContext *cct; - WorkerPool *pool; - bool done; - int id; - PerfCounters *perf_logger; - - public: - EventCenter center; - std::atomic_uint references; - Worker(CephContext *c, WorkerPool *p, int i) - : cct(c), pool(p), done(false), id(i), perf_logger(NULL), center(c), references(0) { - center.init(InitEventNumber, i); - char name[128]; - sprintf(name, "AsyncMessenger::Worker-%d", id); - // initialize perf_logger - PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); - - plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); - plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); - plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages"); - plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); - plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); - plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); - - perf_logger = plb.create_perf_counters(); - cct->get_perfcounters_collection()->add(perf_logger); - } - ~Worker() { - if (perf_logger) { - cct->get_perfcounters_collection()->remove(perf_logger); - delete perf_logger; - } - } - void *entry(); - void stop(); - PerfCounters *get_perf_counter() { return perf_logger; } - void release_worker() { - int oldref = references.fetch_sub(1); - assert(oldref > 0); - } -}; /** * If the Messenger binds to a specific address, the Processor runs @@ -109,20 +48,20 @@ class Processor { AsyncMessenger *msgr; NetHandler net; Worker *worker; - int listen_sd; + ServerSocket listen_socket; uint64_t nonce; EventCallbackRef listen_handler; class C_processor_accept; public: - Processor(AsyncMessenger *r, CephContext *c, uint64_t n); + Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n); ~Processor() { delete listen_handler; }; void stop(); int bind(const entity_addr_t &bind_addr, const set& avoid_ports); int rebind(const set& avoid_port); - void start(Worker *w); + void start(); void accept(); }; @@ -276,9 +215,8 @@ class AsyncMessenger : public SimplePolicyMessenger { private: static const uint64_t ReapDeadConnectionThreshold = 5; - WorkerPool *pool; - - Processor processor; + NetworkStack *stack; + std::vector processors; friend class Processor; DispatchQueue dispatch_queue; @@ -413,7 +351,7 @@ class AsyncMessenger : public SimplePolicyMessenger { } void learned_addr(const entity_addr_t &peer_addr_for_me); - AsyncConnectionRef add_accept(int sd); + AsyncConnectionRef add_accept(Worker *w, ConnectedSocket cli_socket, entity_addr_t &addr); /** * This wraps ms_deliver_get_authorizer. We use it for AsyncConnection. diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 1388ee9bd5a1b..1582dc5a16b34 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -39,14 +39,14 @@ class C_handle_notify : public EventCallback { C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {} void do_request(int fd_or_id) { char c[256]; + int r = 0; do { - center->already_wakeup.set(0); - int r = read(fd_or_id, c, sizeof(c)); + r = read(fd_or_id, c, sizeof(c)); if (r < 0) { - ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl; - break; + if (errno != EAGAIN) + ldout(cct, 1) << __func__ << " read notify pipe failed: " << cpp_strerror(errno) << dendl; } - } while (center->already_wakeup.read()); + } while (r > 0); } }; @@ -135,16 +135,17 @@ EventCenter::~EventCenter() void EventCenter::set_owner() { - cct->lookup_or_create_singleton_object( - global_centers, "AsyncMessenger::EventCenter::global_center"); - assert(global_centers && !global_centers->centers[idx]); - global_centers->centers[idx] = this; owner = pthread_self(); ldout(cct, 1) << __func__ << " idx=" << idx << " owner=" << owner << dendl; - - notify_handler = new C_handle_notify(this, cct); - int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler); - assert(r == 0); + if (!global_centers) { + cct->lookup_or_create_singleton_object( + global_centers, "AsyncMessenger::EventCenter::global_center"); + assert(global_centers); + global_centers->centers[idx] = this; + notify_handler = new C_handle_notify(this, cct); + int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler); + assert(r == 0); + } } int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt) @@ -260,15 +261,15 @@ void EventCenter::delete_time_event(uint64_t id) void EventCenter::wakeup() { - ldout(cct, 1) << __func__ << dendl; - already_wakeup.compare_and_swap(0, 1); - - char buf[1]; - buf[0] = 'c'; - // wake up "event_wait" - int n = write(notify_send_fd, buf, 1); - // FIXME ? - assert(n == 1); + ldout(cct, 1) << __func__ << dendl; + + char buf = 'c'; + // wake up "event_wait" + int n = write(notify_send_fd, &buf, sizeof(buf)); + if (n < 0) { + ldout(cct, 1) << __func__ << " write notify pipe failed: " << cpp_strerror(errno) << dendl; + assert(0); + } } int EventCenter::process_time_events() @@ -361,20 +362,16 @@ int EventCenter::process_events(int timeout_microseconds) if (external_num_events.load()) { external_lock.lock(); - if (external_events.empty()) { - external_lock.unlock(); - } else { - deque cur_process; - cur_process.swap(external_events); - external_num_events.store(0); - external_lock.unlock(); - while (!cur_process.empty()) { - EventCallbackRef e = cur_process.front(); - if (e) - e->do_request(0); - cur_process.pop_front(); - numevents++; - } + deque cur_process; + cur_process.swap(external_events); + external_num_events.store(0); + external_lock.unlock(); + while (!cur_process.empty()) { + EventCallbackRef e = cur_process.front(); + ldout(cct, 20) << __func__ << " do " << e << dendl; + e->do_request(0); + cur_process.pop_front(); + numevents++; } } return numevents; @@ -384,10 +381,11 @@ void EventCenter::dispatch_event_external(EventCallbackRef e) { external_lock.lock(); external_events.push_back(e); + bool wake = !external_num_events.load(); uint64_t num = ++external_num_events; external_lock.unlock(); - if (!in_thread()) + if (!in_thread() && wake) wakeup(); - ldout(cct, 10) << __func__ << " " << e << " pending " << num << dendl; + ldout(cct, 20) << __func__ << " " << e << " pending " << num << dendl; } diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index a320489ceda2c..eb3397623c255 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -127,7 +127,7 @@ class EventCenter { NetHandler net; EventCallbackRef notify_handler; unsigned idx = 10000; - AssociatedCenters *global_centers; + AssociatedCenters *global_centers = nullptr; int process_time_events(); FileEvent *_get_file_event(int fd) { @@ -136,21 +136,18 @@ class EventCenter { } public: - atomic_t already_wakeup; - explicit EventCenter(CephContext *c): cct(c), nevent(0), external_num_events(0), driver(NULL), time_event_next_id(1), notify_receive_fd(-1), notify_send_fd(-1), net(c), - notify_handler(NULL), - already_wakeup(0) { - } + notify_handler(NULL) { } ~EventCenter(); ostream& _event_prefix(std::ostream *_dout); int init(int nevent, unsigned idx); void set_owner(); + pthread_t get_owner() const { return owner; } unsigned get_id() const { return idx; } // Used by internal thread diff --git a/src/msg/async/PosixStack.cc b/src/msg/async/PosixStack.cc new file mode 100644 index 0000000000000..65d0bf64b77f0 --- /dev/null +++ b/src/msg/async/PosixStack.cc @@ -0,0 +1,372 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include +#include + +#include + +#include "PosixStack.h" + +#include "include/buffer.h" +#include "include/str_list.h" +#include "include/sock_compat.h" +#include "common/errno.h" +#include "common/strtol.h" +#include "common/dout.h" +#include "include/assert.h" +#include "common/simple_spin.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << "PosixStack " + +class PosixConnectedSocketImpl final : public ConnectedSocketImpl { + NetHandler &handler; + int _fd; + entity_addr_t sa; + bool connected; +#if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) + sigset_t sigpipe_mask; + bool sigpipe_pending; + bool sigpipe_unblock; +#endif + + public: + explicit PosixConnectedSocketImpl(NetHandler &h, const entity_addr_t &sa, int f, bool connected) + : handler(h), _fd(f), sa(sa), connected(connected) {} + + virtual int is_connected() override { + if (connected) + return 1; + + int r = handler.reconnect(sa, _fd); + if (r == 0) { + connected = true; + return 1; + } else if (r < 0) { + return r; + } else { + return 0; + } + } + + virtual ssize_t zero_copy_read(bufferptr&) override { + return -EOPNOTSUPP; + } + + virtual ssize_t read(char *buf, size_t len) override { + ssize_t r = ::read(_fd, buf, len); + if (r < 0) + r = -errno; + return r; + } + + /* + SIGPIPE suppression - for platforms without SO_NOSIGPIPE or MSG_NOSIGNAL + http://krokisplace.blogspot.in/2010/02/suppressing-sigpipe-in-library.html + http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html + */ + static void suppress_sigpipe() + { + #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) + /* + We want to ignore possible SIGPIPE that we can generate on write. + SIGPIPE is delivered *synchronously* and *only* to the thread + doing the write. So if it is reported as already pending (which + means the thread blocks it), then we do nothing: if we generate + SIGPIPE, it will be merged with the pending one (there's no + queuing), and that suits us well. If it is not pending, we block + it in this thread (and we avoid changing signal action, because it + is per-process). + */ + sigset_t pending; + sigemptyset(&pending); + sigpending(&pending); + sigpipe_pending = sigismember(&pending, SIGPIPE); + if (!sigpipe_pending) { + sigset_t blocked; + sigemptyset(&blocked); + pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &blocked); + + /* Maybe is was blocked already? */ + sigpipe_unblock = ! sigismember(&blocked, SIGPIPE); + } + #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ + } + + static void restore_sigpipe() + { + #if !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) + /* + If SIGPIPE was pending already we do nothing. Otherwise, if it + become pending (i.e., we generated it), then we sigwait() it (thus + clearing pending status). Then we unblock SIGPIPE, but only if it + were us who blocked it. + */ + if (!sigpipe_pending) { + sigset_t pending; + sigemptyset(&pending); + sigpending(&pending); + if (sigismember(&pending, SIGPIPE)) { + /* + Protect ourselves from a situation when SIGPIPE was sent + by the user to the whole process, and was delivered to + other thread before we had a chance to wait for it. + */ + static const struct timespec nowait = { 0, 0 }; + TEMP_FAILURE_RETRY(sigtimedwait(&sigpipe_mask, NULL, &nowait)); + } + + if (sigpipe_unblock) + pthread_sigmask(SIG_UNBLOCK, &sigpipe_mask, NULL); + } + #endif /* !defined(MSG_NOSIGNAL) && !defined(SO_NOSIGPIPE) */ + } + + // return the sent length + // < 0 means error occured + static ssize_t do_sendmsg(int fd, struct msghdr &msg, unsigned len, bool more) + { + suppress_sigpipe(); + + ssize_t sent = 0; + while (1) { + ssize_t r; + #if defined(MSG_NOSIGNAL) + r = ::sendmsg(fd, &msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); + #else + r = ::sendmsg(fd, &msg, (more ? MSG_MORE : 0)); + #endif /* defined(MSG_NOSIGNAL) */ + + if (r < 0) { + if (errno == EINTR) { + continue; + } else if (errno == EAGAIN) { + break; + } + return -errno; + } + + sent += r; + if (len == sent) break; + + while (r > 0) { + if (msg.msg_iov[0].iov_len <= (size_t)r) { + // drain this whole item + r -= msg.msg_iov[0].iov_len; + msg.msg_iov++; + msg.msg_iovlen--; + } else { + msg.msg_iov[0].iov_base = (char *)msg.msg_iov[0].iov_base + r; + msg.msg_iov[0].iov_len -= r; + break; + } + } + } + restore_sigpipe(); + return (ssize_t)sent; + } + + virtual ssize_t send(bufferlist &bl, bool more) { + ssize_t sent_bytes = 0; + std::list::const_iterator pb = bl.buffers().begin(); + uint64_t left_pbrs = bl.buffers().size(); + while (left_pbrs) { + struct msghdr msg; + struct iovec msgvec[IOV_MAX]; + uint64_t size = MIN(left_pbrs, IOV_MAX); + left_pbrs -= size; + memset(&msg, 0, sizeof(msg)); + msg.msg_iovlen = 0; + msg.msg_iov = msgvec; + unsigned msglen = 0; + while (size > 0) { + msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()); + msgvec[msg.msg_iovlen].iov_len = pb->length(); + msg.msg_iovlen++; + msglen += pb->length(); + ++pb; + size--; + } + + ssize_t r = do_sendmsg(_fd, msg, msglen, left_pbrs || more); + if (r < 0) + return r; + + // "r" is the remaining length + sent_bytes += r; + if (r < msglen) + break; + // only "r" == 0 continue + } + + if (sent_bytes) { + bufferlist swapped; + if (sent_bytes < bl.length()) { + bl.splice(sent_bytes, bl.length()-sent_bytes, &swapped); + bl.swap(swapped); + } else { + bl.clear(); + } + } + + return sent_bytes; + } + virtual void shutdown() { + ::shutdown(_fd, SHUT_RDWR); + } + virtual void close() { + ::close(_fd); + } + virtual int fd() const override { + return _fd; + } + friend class PosixServerSocketImpl; + friend class PosixNetworkStack; +}; + +class PosixServerSocketImpl : public ServerSocketImpl { + NetHandler &handler; + entity_addr_t sa; + int _fd; + + public: + explicit PosixServerSocketImpl(NetHandler &h, const entity_addr_t &sa, int f): handler(h), sa(sa), _fd(f) {} + virtual int accept(ConnectedSocket *sock, const SocketOptions &opts, entity_addr_t *out) override; + virtual void abort_accept() override { + ::close(_fd); + } + virtual int fd() const override { + return _fd; + } +}; + +int PosixServerSocketImpl::accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) { + assert(sock); + sockaddr_storage ss; + socklen_t slen = sizeof(ss); + int sd = ::accept(_fd, (sockaddr*)&ss, &slen); + if (sd < 0) { + return -errno; + } + + handler.set_close_on_exec(sd); + int r = handler.set_nonblock(sd); + if (r < 0) { + ::close(sd); + return -errno; + } + + r = handler.set_socket_options(sd, opt.nodelay, opt.rcbuf_size); + if (r < 0) { + ::close(sd); + return -errno; + } + handler.set_priority(sd, opt.priority); + + std::unique_ptr csi(new PosixConnectedSocketImpl(handler, *out, sd, true)); + *sock = ConnectedSocket(std::move(csi)); + if (out) + out->set_sockaddr((sockaddr*)&ss); + return 0; +} + +void PosixWorker::initialize() +{ +} + +int PosixWorker::listen(entity_addr_t &sa, const SocketOptions &opt, + ServerSocket *sock) +{ + int listen_sd = net.create_socket(sa.get_family(), true); + if (listen_sd < 0) { + return -errno; + } + + int r = net.set_nonblock(listen_sd); + if (r < 0) { + ::close(listen_sd); + return -errno; + } + + net.set_close_on_exec(listen_sd); + r = net.set_socket_options(listen_sd, opt.nodelay, opt.rcbuf_size); + if (r < 0) { + ::close(listen_sd); + return -errno; + } + + r = ::bind(listen_sd, sa.get_sockaddr(), sa.get_sockaddr_len()); + if (r < 0) { + r = -errno; + ldout(cct, 10) << __func__ << " unable to bind to " << sa.get_sockaddr() + << ": " << cpp_strerror(r) << dendl; + ::close(listen_sd); + return r; + } + + r = ::listen(listen_sd, 128); + if (r < 0) { + r = -errno; + lderr(cct) << __func__ << " unable to listen on " << sa << ": " << cpp_strerror(r) << dendl; + ::close(listen_sd); + return r; + } + + *sock = ServerSocket( + std::unique_ptr( + new PosixServerSocketImpl(net, sa, listen_sd))); + return 0; +} + +int PosixWorker::connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) { + int sd; + + if (opts.nonblock) { + sd = net.nonblock_connect(addr); + } else { + sd = net.connect(addr); + } + + if (sd < 0) { + ::close(sd); + return -errno; + } + + net.set_priority(sd, opts.priority); + *socket = ConnectedSocket( + std::unique_ptr(new PosixConnectedSocketImpl(net, addr, sd, !opts.nonblock))); + return 0; +} + +PosixNetworkStack::PosixNetworkStack(CephContext *c, const string &t) + : NetworkStack(c, t) +{ + vector corestrs; + get_str_vec(cct->_conf->ms_async_affinity_cores, corestrs); + for (auto & corestr : corestrs) { + string err; + int coreid = strict_strtol(corestr.c_str(), 10, &err); + if (err == "") + coreids.push_back(coreid); + else + lderr(cct) << __func__ << " failed to parse " << corestr << " in " << cct->_conf->ms_async_affinity_cores << dendl; + } +} diff --git a/src/msg/async/PosixStack.h b/src/msg/async/PosixStack.h new file mode 100644 index 0000000000000..149db320d8df2 --- /dev/null +++ b/src/msg/async/PosixStack.h @@ -0,0 +1,61 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_ASYNC_POSIXSTACK_H +#define CEPH_MSG_ASYNC_POSIXSTACK_H + +#include + +#include "msg/msg_types.h" +#include "msg/async/net_handler.h" + +#include "Stack.h" + +class PosixWorker : public Worker { + NetHandler net; + std::thread t; + virtual void initialize(); + public: + PosixWorker(CephContext *c, unsigned i) + : Worker(c, i), net(c) {} + virtual int listen(entity_addr_t &sa, const SocketOptions &opt, + ServerSocket *socks) override; + virtual int connect(const entity_addr_t &addr, const SocketOptions &opts, ConnectedSocket *socket) override; +}; + +class PosixNetworkStack : public NetworkStack { + vector coreids; + vector threads; + + public: + explicit PosixNetworkStack(CephContext *c, const string &t); + + int get_cpuid(int id) const { + if (coreids.empty()) + return -1; + return coreids[id % coreids.size()]; + } + virtual void spawn_worker(unsigned i, std::function &&func) override { + threads.resize(i+1); + threads[i] = std::move(std::thread(func)); + } + virtual void join_worker(unsigned i) override { + assert(threads.size() > i && threads[i].joinable()); + threads[i].join(); + } +}; + +#endif //CEPH_MSG_ASYNC_POSIXSTACK_H diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc new file mode 100644 index 0000000000000..25e1d5862f1a9 --- /dev/null +++ b/src/msg/async/Stack.cc @@ -0,0 +1,176 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSky + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "common/Cond.h" +#include "common/errno.h" +#include "PosixStack.h" + +#include "common/dout.h" +#include "include/assert.h" + +#define dout_subsys ceph_subsys_ms +#undef dout_prefix +#define dout_prefix *_dout << "stack " + +void NetworkStack::add_thread(unsigned i, std::function &thread) +{ + Worker *w = workers[i]; + thread = std::move( + [this, w]() { + const uint64_t EventMaxWaitUs = 30000000; + w->center.set_owner(); + ldout(cct, 10) << __func__ << " starting" << dendl; + w->initialize(); + w->init_done(); + while (!w->done) { + ldout(cct, 30) << __func__ << " calling event process" << dendl; + + int r = w->center.process_events(EventMaxWaitUs); + if (r < 0) { + ldout(cct, 20) << __func__ << " process events failed: " + << cpp_strerror(errno) << dendl; + // TODO do something? + } + } + w->reset(); + } + ); +} + +std::shared_ptr NetworkStack::create(CephContext *c, const string &t) +{ + if (t == "posix") + return std::make_shared(c, t); + + return nullptr; +} + +Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i) +{ + if (type == "posix") + return new PosixWorker(c, i); + return nullptr; +} + +NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c) +{ + const uint64_t InitEventNumber = 5000; + num_workers = cct->_conf->ms_async_op_threads; + for (unsigned i = 0; i < num_workers; ++i) { + Worker *w = create_worker(cct, type, i); + w->center.init(InitEventNumber, i); + workers.push_back(w); + } + cct->register_fork_watcher(this); +} + +void NetworkStack::start() +{ + pool_spin.lock(); + if (started) { + pool_spin.unlock(); + return ; + } + + if (started) { + return ; + } + for (unsigned i = 0; i < num_workers; ++i) { + if (workers[i]->is_init()) + continue; + std::function thread; + add_thread(i, thread); + spawn_worker(i, std::move(thread)); + } + started = true; + pool_spin.unlock(); + + for (unsigned i = 0; i < num_workers; ++i) + workers[i]->wait_for_init(); +} + +Worker* NetworkStack::get_worker() +{ + ldout(cct, 10) << __func__ << dendl; + + // start with some reasonably large number + unsigned min_load = std::numeric_limits::max(); + Worker* current_best = nullptr; + + pool_spin.lock(); + // find worker with least references + // tempting case is returning on references == 0, but in reality + // this will happen so rarely that there's no need for special case. + for (unsigned i = 0; i < num_workers; ++i) { + unsigned worker_load = workers[i]->references.load(); + if (worker_load < min_load) { + current_best = workers[i]; + min_load = worker_load; + } + } + + pool_spin.unlock(); + assert(current_best); + ++current_best->references; + return current_best; +} + +void NetworkStack::stop() +{ + Spinlock::Locker l(pool_spin); + for (unsigned i = 0; i < num_workers; ++i) { + workers[i]->done = true; + workers[i]->center.wakeup(); + join_worker(i); + } + started = false; +} + +class C_drain : public EventCallback { + Mutex drain_lock; + Cond drain_cond; + std::atomic drain_count; + + public: + explicit C_drain(size_t c) + : drain_lock("C_drain::drain_lock"), + drain_count(c) {} + void do_request(int id) { + Mutex::Locker l(drain_lock); + drain_count--; + drain_cond.Signal(); + } + void wait() { + Mutex::Locker l(drain_lock); + while (drain_count.load()) + drain_cond.Wait(drain_lock); + } +}; + +void NetworkStack::drain() +{ + ldout(cct, 10) << __func__ << " started." << dendl; + pthread_t cur = pthread_self(); + pool_spin.lock(); + C_drain drain(num_workers); + for (unsigned i = 0; i < num_workers; ++i) { + assert(cur != workers[i]->center.get_owner()); + workers[i]->center.dispatch_event_external(EventCallbackRef(&drain)); + } + pool_spin.unlock(); + drain.wait(); + ldout(cct, 10) << __func__ << " end." << dendl; +} diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h new file mode 100644 index 0000000000000..c397f942612be --- /dev/null +++ b/src/msg/async/Stack.h @@ -0,0 +1,339 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSKY + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef CEPH_MSG_ASYNC_STACK_H +#define CEPH_MSG_ASYNC_STACK_H + +#include "include/Spinlock.h" +#include "common/perf_counters.h" +#include "common/simple_spin.h" +#include "msg/msg_types.h" +#include "msg/async/Event.h" + +class ConnectedSocketImpl { + public: + virtual ~ConnectedSocketImpl() {} + virtual int is_connected() = 0; + virtual ssize_t read(char*, size_t) = 0; + virtual ssize_t zero_copy_read(bufferptr&) = 0; + virtual ssize_t send(bufferlist &bl, bool more) = 0; + virtual void shutdown() = 0; + virtual void close() = 0; + virtual int fd() const = 0; +}; + +class ConnectedSocket; +struct SocketOptions { + bool nonblock = true; + bool nodelay = true; + int rcbuf_size = 0; + int priority = -1; +}; + +/// \cond internal +class ServerSocketImpl { + public: + virtual ~ServerSocketImpl() {} + virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) = 0; + virtual void abort_accept() = 0; + /// Get file descriptor + virtual int fd() const = 0; +}; +/// \endcond + +/// \addtogroup networking-module +/// @{ + +/// A TCP (or other stream-based protocol) connection. +/// +/// A \c ConnectedSocket represents a full-duplex stream between +/// two endpoints, a local endpoint and a remote endpoint. +class ConnectedSocket { + std::unique_ptr _csi; + + public: + /// Constructs a \c ConnectedSocket not corresponding to a connection + ConnectedSocket() {}; + /// \cond internal + explicit ConnectedSocket(std::unique_ptr csi) + : _csi(std::move(csi)) {} + /// \endcond + ~ConnectedSocket() { + if (_csi) + _csi->close(); + } + /// Moves a \c ConnectedSocket object. + ConnectedSocket(ConnectedSocket&& cs) = default; + /// Move-assigns a \c ConnectedSocket object. + ConnectedSocket& operator=(ConnectedSocket&& cs) = default; + + int is_connected() { + return _csi->is_connected(); + } + /// Read the input stream with copy. + /// + /// Copy an object returning data sent from the remote endpoint. + ssize_t read(char* buf, size_t len) { + return _csi->read(buf, len); + } + /// Gets the input stream. + /// + /// Gets an object returning data sent from the remote endpoint. + ssize_t zero_copy_read(bufferptr &data) { + return _csi->zero_copy_read(data); + } + /// Gets the output stream. + /// + /// Gets an object that sends data to the remote endpoint. + ssize_t send(bufferlist &bl, bool more) { + return _csi->send(bl, more); + } + /// Disables output to the socket. + /// + /// Current or future writes that have not been successfully flushed + /// will immediately fail with an error. This is useful to abort + /// operations on a socket that is not making progress due to a + /// peer failure. + void shutdown() { + return _csi->shutdown(); + } + /// Disables input from the socket. + /// + /// Current or future reads will immediately fail with an error. + /// This is useful to abort operations on a socket that is not making + /// progress due to a peer failure. + void close() { + _csi->close(); + _csi.reset(); + } + + /// Get file descriptor + int fd() const { + return _csi->fd(); + } + + explicit operator bool() const { + return _csi.get(); + } +}; +/// @} + +/// \addtogroup networking-module +/// @{ + +/// A listening socket, waiting to accept incoming network connections. +class ServerSocket { + std::unique_ptr _ssi; + public: + /// Constructs a \c ServerSocket not corresponding to a connection + ServerSocket() {} + /// \cond internal + explicit ServerSocket(std::unique_ptr ssi) + : _ssi(std::move(ssi)) {} + ~ServerSocket() { + if (_ssi) + _ssi->abort_accept(); + } + /// \endcond + /// Moves a \c ServerSocket object. + ServerSocket(ServerSocket&& ss) = default; + /// Move-assigns a \c ServerSocket object. + ServerSocket& operator=(ServerSocket&& cs) = default; + + /// Accepts the next connection to successfully connect to this socket. + /// + /// \Accepts a \ref ConnectedSocket representing the connection, and + /// a \ref entity_addr_t describing the remote endpoint. + int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) { + return _ssi->accept(sock, opt, out); + } + + /// Stops any \ref accept() in progress. + /// + /// Current and future \ref accept() calls will terminate immediately + /// with an error. + void abort_accept() { + _ssi->abort_accept(); + _ssi.reset(); + } + + /// Get file descriptor + int fd() const { + return _ssi->fd(); + } + + explicit operator bool() const { + return _ssi.get(); + } +}; +/// @} + +class NetworkStack; + +enum { + l_msgr_first = 94000, + l_msgr_recv_messages, + l_msgr_send_messages, + l_msgr_send_messages_inline, + l_msgr_recv_bytes, + l_msgr_send_bytes, + l_msgr_created_connections, + l_msgr_active_connections, + l_msgr_last, +}; + +class Worker { + std::mutex init_lock; + std::condition_variable init_cond; + bool init = false; + + public: + bool done = false; + + CephContext *cct; + PerfCounters *perf_logger; + unsigned id; + + std::atomic_uint references; + EventCenter center; + + Worker(const Worker&) = delete; + Worker& operator=(const Worker&) = delete; + + Worker(CephContext *c, unsigned i) + : cct(c), perf_logger(NULL), id(i), references(0), center(c) { + char name[128]; + sprintf(name, "AsyncMessenger::Worker-%d", id); + // initialize perf_logger + PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last); + + plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages"); + plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages"); + plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages"); + plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes"); + plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number"); + plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number"); + + perf_logger = plb.create_perf_counters(); + cct->get_perfcounters_collection()->add(perf_logger); + } + virtual ~Worker() { + if (perf_logger) { + cct->get_perfcounters_collection()->remove(perf_logger); + delete perf_logger; + } + } + + virtual int listen(entity_addr_t &addr, + const SocketOptions &opts, ServerSocket *) = 0; + virtual int connect(const entity_addr_t &addr, + const SocketOptions &opts, ConnectedSocket *socket) = 0; + + virtual void initialize() {} + PerfCounters *get_perf_counter() { return perf_logger; } + void release_worker() { + int oldref = references.fetch_sub(1); + assert(oldref > 0); + } + void init_done() { + init_lock.lock(); + init = true; + init_cond.notify_all(); + init_lock.unlock(); + } + bool is_init() { + std::lock_guard l(init_lock); + return init; + } + void wait_for_init() { + std::unique_lock l(init_lock); + while (!init) + init_cond.wait(l); + } + void reset() { + init_lock.lock(); + init = false; + init_cond.notify_all(); + init_lock.unlock(); + done = false; + } +}; + +class NetworkStack : public CephContext::ForkWatcher { + std::string type; + unsigned num_workers = 0; + Spinlock pool_spin; + bool started = false; + + void add_thread(unsigned i, std::function &ts); + + protected: + CephContext *cct; + vector workers; + // Used to indicate whether thread started + + explicit NetworkStack(CephContext *c, const string &t); + public: + virtual ~NetworkStack() { + for (auto &&w : workers) + delete w; + } + + static std::shared_ptr create( + CephContext *c, const string &type); + + static Worker* create_worker( + CephContext *c, const string &t, unsigned i); + // backend need to override this method if supports zero copy read + virtual bool support_zero_copy_read() const { return false; } + // backend need to override this method if backend doesn't support shared + // listen table. + // For example, posix backend has in kernel global listen table. If one + // thread bind a port, other threads also aware this. + // But for dpdk backend, we maintain listen table in each thread. So we + // need to let each thread do binding port. + virtual bool support_local_listen_table() const { return false; } + + void start(); + void stop(); + virtual Worker *get_worker(); + Worker *get_worker(unsigned i) { + return workers[i]; + } + void drain(); + unsigned get_num_worker() const { + return num_workers; + } + + // direct is used in tests only + virtual void spawn_worker(unsigned i, std::function &&) = 0; + virtual void join_worker(unsigned i) = 0; + + virtual void handle_pre_fork() override { + stop(); + } + + virtual void handle_post_fork() override { + start(); + } + + private: + NetworkStack(const NetworkStack &); + NetworkStack& operator=(const NetworkStack &); +}; + +#endif //CEPH_MSG_ASYNC_STACK_H diff --git a/src/msg/async/net_handler.cc b/src/msg/async/net_handler.cc index d092a6609926f..c4c2bfe4b09ab 100644 --- a/src/msg/async/net_handler.cc +++ b/src/msg/async/net_handler.cc @@ -89,19 +89,20 @@ void NetHandler::set_close_on_exec(int sd) } } -void NetHandler::set_socket_options(int sd, bool nodelay, int size) +int NetHandler::set_socket_options(int sd, bool nodelay, int size) { + int r = 0; // disable Nagle algorithm? if (nodelay) { int flag = 1; - int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); + r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); if (r < 0) { r = -errno; ldout(cct, 0) << "couldn't set TCP_NODELAY: " << cpp_strerror(r) << dendl; } } if (size) { - int r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size)); + r = ::setsockopt(sd, SOL_SOCKET, SO_RCVBUF, (void*)&size, sizeof(size)); if (r < 0) { r = -errno; ldout(cct, 0) << "couldn't set SO_RCVBUF to " << size << ": " << cpp_strerror(r) << dendl; @@ -111,12 +112,13 @@ void NetHandler::set_socket_options(int sd, bool nodelay, int size) // block ESIGPIPE #ifdef SO_NOSIGPIPE int val = 1; - int r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val)); + r = ::setsockopt(sd, SOL_SOCKET, SO_NOSIGPIPE, (void*)&val, sizeof(val)); if (r) { r = -errno; ldout(cct,0) << "couldn't set SO_NOSIGPIPE: " << cpp_strerror(r) << dendl; } #endif + return r; } void NetHandler::set_priority(int sd, int prio) diff --git a/src/msg/async/net_handler.h b/src/msg/async/net_handler.h index eb43f54f7bb4f..311276dba8bab 100644 --- a/src/msg/async/net_handler.h +++ b/src/msg/async/net_handler.h @@ -20,16 +20,15 @@ namespace ceph { class NetHandler { - private: - int create_socket(int domain, bool reuse_addr=false); int generic_connect(const entity_addr_t& addr, bool nonblock); CephContext *cct; public: + int create_socket(int domain, bool reuse_addr=false); explicit NetHandler(CephContext *c): cct(c) {} int set_nonblock(int sd); void set_close_on_exec(int sd); - void set_socket_options(int sd, bool nodelay, int size); + int set_socket_options(int sd, bool nodelay, int size); int connect(const entity_addr_t &addr); /** diff --git a/src/test/Makefile-server.am b/src/test/Makefile-server.am index 29c09da349f94..85117028d7c40 100644 --- a/src/test/Makefile-server.am +++ b/src/test/Makefile-server.am @@ -8,6 +8,11 @@ ceph_test_msgr_LDADD = $(LIBOS) $(UNITTEST_LDADD) $(CEPH_GLOBAL) ceph_test_msgr_CXXFLAGS = $(UNITTEST_CXXFLAGS) bin_DEBUGPROGRAMS += ceph_test_msgr +ceph_test_async_networkstack_SOURCES = test/msgr/test_async_networkstack.cc +ceph_test_async_networkstack_LDADD = $(LIBOS) $(UNITTEST_LDADD) $(CEPH_GLOBAL) +ceph_test_async_networkstack_CXXFLAGS = $(UNITTEST_CXXFLAGS) +bin_DEBUGPROGRAMS += ceph_test_async_networkstack + ceph_test_trans_SOURCES = test/test_trans.cc ceph_test_trans_LDADD = $(LIBOS) $(CEPH_GLOBAL) bin_DEBUGPROGRAMS += ceph_test_trans diff --git a/src/test/msgr/CMakeLists.txt b/src/test/msgr/CMakeLists.txt index 4eb10acfa17a6..a792233ffc749 100644 --- a/src/test/msgr/CMakeLists.txt +++ b/src/test/msgr/CMakeLists.txt @@ -14,6 +14,14 @@ set_target_properties(ceph_test_msgr PROPERTIES COMPILE_FLAGS ${UNITTEST_CXX_FLAGS}) target_link_libraries(ceph_test_msgr os global ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} ${UNITTEST_LIBS}) +# ceph_test_async_networkstack +add_executable(ceph_test_async_networkstack + test_async_networkstack.cc + ) +set_target_properties(ceph_test_async_networkstack PROPERTIES COMPILE_FLAGS + ${UNITTEST_CXX_FLAGS}) +target_link_libraries(ceph_test_async_networkstack global ${BLKID_LIBRARIES} ${CMAKE_DL_LIBS} ${UNITTEST_LIBS}) + #ceph_perf_msgr_server add_executable(ceph_perf_msgr_server perf_msgr_server.cc) set_target_properties(ceph_perf_msgr_server PROPERTIES COMPILE_FLAGS @@ -29,6 +37,7 @@ target_link_libraries(ceph_perf_msgr_client os global ${UNITTEST_LIBS}) install(TARGETS ceph_test_async_driver ceph_test_msgr + ceph_test_async_networkstack ceph_perf_msgr_server ceph_perf_msgr_client DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/test/msgr/test_async_networkstack.cc b/src/test/msgr/test_async_networkstack.cc new file mode 100644 index 0000000000000..fa14e81c6d6dc --- /dev/null +++ b/src/test/msgr/test_async_networkstack.cc @@ -0,0 +1,1045 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2016 XSky + * + * Author: Haomai Wang + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "acconfig.h" +#include "include/Context.h" +#include "common/ceph_argparse.h" +#include "global/global_init.h" + +#include "msg/async/Event.h" +#include "msg/async/Stack.h" + +#if GTEST_HAS_PARAM_TEST + +class NetworkWorkerTest : public ::testing::TestWithParam { + public: + std::shared_ptr stack; + string addr, port_addr; + + NetworkWorkerTest() {} + virtual void SetUp() { + cerr << __func__ << " start set up " << GetParam() << std::endl; + addr = "127.0.0.1:15000"; + port_addr = "127.0.0.1:15001"; + stack = NetworkStack::create(g_ceph_context, GetParam()); + stack->start(); + } + virtual void TearDown() { + stack->stop(); + } + string get_addr() const { + return addr; + } + string get_ip_different_port() const { + return port_addr; + } + string get_different_ip() const { + return "10.0.123.100:4323"; + } + EventCenter *get_center(unsigned i) { + return &stack->get_worker(i)->center; + } + Worker *get_worker(unsigned i) { + return stack->get_worker(i); + } + template + class C_dispatch : public EventCallback { + Worker *worker; + func f; + std::atomic_bool done; + public: + C_dispatch(Worker *w, func &&_f): worker(w), f(std::move(_f)), done(false) {} + void do_request(int id) { + f(worker); + done = true; + } + void wait() { + int us = 1000 * 1000 * 1000; + while (!done) { + ASSERT_TRUE(us > 0); + usleep(100); + us -= 100; + } + } + }; + template + void exec_events(func &&f) { + std::vector*> dis; + for (unsigned i = 0; i < stack->get_num_worker(); ++i) { + Worker *w = stack->get_worker(i); + C_dispatch *e = new C_dispatch(w, std::move(f)); + stack->get_worker(i)->center.dispatch_event_external(e); + dis.push_back(e); + } + + for (auto &&e : dis) { + e->wait(); + delete e; + } + } +}; + +class C_poll : public EventCallback { + EventCenter *center; + std::atomic woken; + static const int sleepus = 500; + + public: + C_poll(EventCenter *c): center(c), woken(false) {} + void do_request(int r) { + woken = true; + } + bool poll(int milliseconds) { + auto start = ceph::coarse_real_clock::now(g_ceph_context); + while (!woken) { + center->process_events(sleepus); + usleep(sleepus); + auto r = std::chrono::duration_cast( + ceph::coarse_real_clock::now(g_ceph_context) - start); + if (r >= std::chrono::milliseconds(milliseconds)) + break; + } + return woken; + } + void reset() { + woken = false; + } +}; + +TEST_P(NetworkWorkerTest, SimpleTest) { + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + std::atomic_bool accepted(false); + std::atomic_bool *accepted_p = &accepted; + + exec_events([this, accepted_p, bind_addr](Worker *worker) mutable { + entity_addr_t cli_addr; + SocketOptions options; + ServerSocket bind_socket; + EventCenter *center = &worker->center; + ssize_t r = 0; + if (stack->support_local_listen_table() || worker->id == 0) + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + + ConnectedSocket cli_socket, srv_socket; + if (worker->id == 0) { + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + } + + bool is_my_accept = false; + if (bind_socket) { + C_poll cb(center); + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + if (cb.poll(500)) { + *accepted_p = true; + is_my_accept = true; + } + ASSERT_TRUE(*accepted_p); + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + + if (is_my_accept) { + r = bind_socket.accept(&srv_socket, options, &cli_addr); + ASSERT_EQ(0, r); + ASSERT_TRUE(srv_socket.fd() > 0); + } + + if (worker->id == 0) { + C_poll cb(center); + center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb); + r = cli_socket.is_connected(); + if (r == 0) { + ASSERT_EQ(true, cb.poll(500)); + r = cli_socket.is_connected(); + } + ASSERT_EQ(1, r); + center->delete_file_event(cli_socket.fd(), EVENT_READABLE); + } + + const char *message = "this is a new message"; + int len = strlen(message); + bufferlist bl; + bl.append(message, len); + if (worker->id == 0) { + r = cli_socket.send(bl, false); + ASSERT_EQ(len, r); + } + + char buf[1024]; + C_poll cb(center); + if (is_my_accept) { + center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb); + { + r = srv_socket.read(buf, sizeof(buf)); + while (r == -EAGAIN) { + ASSERT_TRUE(cb.poll(500)); + r = srv_socket.read(buf, sizeof(buf)); + } + ASSERT_EQ(len, r); + ASSERT_EQ(0, memcmp(buf, message, len)); + } + bind_socket.abort_accept(); + } + if (worker->id == 0) { + cli_socket.shutdown(); + // ack delay is 200 ms + } + + bl.clear(); + bl.append(message, len); + if (worker->id == 0) { + r = cli_socket.send(bl, false); + ASSERT_EQ(-EPIPE, r); + } + if (is_my_accept) { + cb.reset(); + ASSERT_TRUE(cb.poll(500)); + r = srv_socket.read(buf, sizeof(buf)); + if (r == -EAGAIN) { + cb.reset(); + ASSERT_TRUE(cb.poll(1000*500)); + r = srv_socket.read(buf, sizeof(buf)); + } + ASSERT_EQ(0, r); + center->delete_file_event(srv_socket.fd(), EVENT_READABLE); + srv_socket.close(); + } + }); +} + +TEST_P(NetworkWorkerTest, ConnectFailedTest) { + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + + exec_events([this, bind_addr](Worker *worker) mutable { + EventCenter *center = &worker->center; + entity_addr_t cli_addr; + SocketOptions options; + ServerSocket bind_socket; + int r = 0; + if (stack->support_local_listen_table() || worker->id == 0) + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + + ConnectedSocket cli_socket1, cli_socket2; + if (worker->id == 0) { + ASSERT_TRUE(cli_addr.parse(get_ip_different_port().c_str())); + r = worker->connect(cli_addr, options, &cli_socket1); + ASSERT_EQ(0, r); + C_poll cb(center); + center->create_file_event(cli_socket1.fd(), EVENT_READABLE, &cb); + r = cli_socket1.is_connected(); + if (r == 0) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket1.is_connected(); + } + ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET); + } + + if (worker->id == 1) { + ASSERT_TRUE(cli_addr.parse(get_different_ip().c_str())); + r = worker->connect(cli_addr, options, &cli_socket2); + ASSERT_EQ(0, r); + C_poll cb(center); + center->create_file_event(cli_socket2.fd(), EVENT_READABLE, &cb); + r = cli_socket2.is_connected(); + if (r == 0) { + ASSERT_FALSE(cb.poll(500)); + r = cli_socket2.is_connected(); + } + ASSERT_TRUE(r != 1); + center->delete_file_event(cli_socket2.fd(), EVENT_READABLE); + } + }); +} + +TEST_P(NetworkWorkerTest, ListenTest) { + Worker *worker = get_worker(0); + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + SocketOptions options; + ServerSocket bind_socket1, bind_socket2; + int r = worker->listen(bind_addr, options, &bind_socket1); + ASSERT_EQ(0, r); + + r = worker->listen(bind_addr, options, &bind_socket2); + ASSERT_EQ(-EADDRINUSE, r); +} + +TEST_P(NetworkWorkerTest, AcceptAndCloseTest) { + entity_addr_t bind_addr; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + std::atomic_bool accepted(false); + std::atomic_bool *accepted_p = &accepted; + std::atomic_int unbind_count(stack->get_num_worker()); + std::atomic_int *count_p = &unbind_count; + exec_events([this, bind_addr, accepted_p, count_p](Worker *worker) mutable { + SocketOptions options; + EventCenter *center = &worker->center; + entity_addr_t cli_addr; + int r = 0; + { + ServerSocket bind_socket; + if (stack->support_local_listen_table() || worker->id == 0) + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + + ConnectedSocket srv_socket, cli_socket; + if (bind_socket) { + r = bind_socket.accept(&srv_socket, options, &cli_addr); + ASSERT_EQ(-EAGAIN, r); + } + + C_poll cb(center); + if (worker->id == 0) { + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + ASSERT_TRUE(cb.poll(500)); + } + + if (bind_socket) { + cb.reset(); + cb.poll(500); + ConnectedSocket srv_socket2; + do { + r = bind_socket.accept(&srv_socket2, options, &cli_addr); + usleep(100); + } while (r == -EAGAIN && !*accepted_p); + if (r == 0) + *accepted_p = true; + ASSERT_TRUE(*accepted_p); + // srv_socket2 closed + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + + if (worker->id == 0) { + char buf[100]; + cb.reset(); + center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb); + int i = 3; + while (!i--) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket.read(buf, sizeof(buf)); + if (r == 0) + break; + } + ASSERT_EQ(0, r); + center->delete_file_event(cli_socket.fd(), EVENT_READABLE); + } + + if (bind_socket) + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + if (worker->id == 0) { + *accepted_p = false; + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + cb.reset(); + ASSERT_TRUE(cb.poll(500)); + cli_socket.close(); + } + + if (bind_socket) { + do { + r = bind_socket.accept(&srv_socket, options, &cli_addr); + usleep(100); + } while (r == -EAGAIN && !*accepted_p); + if (r == 0) + *accepted_p = true; + ASSERT_TRUE(*accepted_p); + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + // unbind + } + + --*count_p; + while (*count_p > 0) + usleep(100); + + ConnectedSocket cli_socket; + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + { + C_poll cb(center); + center->create_file_event(cli_socket.fd(), EVENT_READABLE, &cb); + r = cli_socket.is_connected(); + if (r == 0) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket.is_connected(); + } + ASSERT_TRUE(r == -ECONNREFUSED || r == -ECONNRESET); + } + }); +} + +TEST_P(NetworkWorkerTest, ComplexTest) { + entity_addr_t bind_addr; + std::atomic_bool accepted(false); + std::atomic_bool *accepted_p = &accepted; + std::atomic_bool done(false); + std::atomic_bool *done_p = &done; + ASSERT_TRUE(bind_addr.parse(get_addr().c_str())); + exec_events([this, bind_addr, accepted_p, done_p](Worker *worker) mutable { + entity_addr_t cli_addr; + EventCenter *center = &worker->center; + SocketOptions options; + ServerSocket bind_socket; + int r = 0; + if (stack->support_local_listen_table() || worker->id == 0) { + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + } + ConnectedSocket cli_socket, srv_socket; + if (worker->id == 1) { + r = worker->connect(bind_addr, options, &cli_socket); + ASSERT_EQ(0, r); + } + + if (bind_socket) { + C_poll cb(center); + center->create_file_event(bind_socket.fd(), EVENT_READABLE, &cb); + if (cb.poll(500)) { + r = bind_socket.accept(&srv_socket, options, &cli_addr); + ASSERT_EQ(0, r); + *accepted_p = true; + } + ASSERT_TRUE(*accepted_p); + center->delete_file_event(bind_socket.fd(), EVENT_READABLE); + } + + if (worker->id == 1) { + C_poll cb(center); + center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb); + r = cli_socket.is_connected(); + if (r == 0) { + ASSERT_TRUE(cb.poll(500)); + r = cli_socket.is_connected(); + } + ASSERT_EQ(1, r); + center->delete_file_event(cli_socket.fd(), EVENT_READABLE); + } + + const size_t message_size = 10240; + size_t count = 100; + string message(message_size, '!'); + for (size_t i = 0; i < message_size; i += 100) + message[i] = ','; + size_t len = message_size * count; + C_poll cb(center); + if (worker->id == 1) + center->create_file_event(cli_socket.fd(), EVENT_WRITABLE, &cb); + if (srv_socket) + center->create_file_event(srv_socket.fd(), EVENT_READABLE, &cb); + size_t left = len; + len *= 2; + string read_string; + int again_count = 0; + int c = 2; + bufferlist bl; + for (size_t i = 0; i < count; ++i) + bl.push_back(bufferptr((char*)message.data(), message_size)); + while (!*done_p) { + again_count = 0; + if (worker->id == 1) { + if (c > 0) { + ssize_t r = 0; + usleep(100); + if (left > 0) { + r = cli_socket.send(bl, false); + ASSERT_TRUE(r >= 0 || r == -EAGAIN); + if (r > 0) + left -= r; + if (r == -EAGAIN) + ++again_count; + } + if (left == 0) { + --c; + left = message_size * count; + ASSERT_EQ(0U, bl.length()); + for (size_t i = 0; i < count; ++i) + bl.push_back(bufferptr((char*)message.data(), message_size)); + } + } + } + + if (srv_socket) { + char buf[1000]; + if (len > 0) { + r = srv_socket.read(buf, sizeof(buf)); + ASSERT_TRUE(r > 0 || r == -EAGAIN); + if (r > 0) { + read_string.append(buf, r); + len -= r; + } else if (r == -EAGAIN) { + ++again_count; + } + } + if (len == 0) { + for (size_t i = 0; i < read_string.size(); i += message_size) + ASSERT_EQ(0, memcmp(read_string.c_str()+i, message.c_str(), message_size)); + *done_p = true; + } + } + if (again_count) { + cb.reset(); + cb.poll(500); + } + } + if (worker->id == 1) + center->delete_file_event(cli_socket.fd(), EVENT_WRITABLE); + if (srv_socket) + center->delete_file_event(srv_socket.fd(), EVENT_READABLE); + + if (bind_socket) + bind_socket.abort_accept(); + if (srv_socket) + srv_socket.close(); + if (worker->id == 1) + cli_socket.close(); + }); +} + +class StressFactory { + struct Client; + struct Server; + struct ThreadData { + Worker *worker; + std::set clients; + std::set servers; + ~ThreadData() { + for (auto && i : clients) + delete i; + for (auto && i : servers) + delete i; + } + }; + + struct RandomString { + size_t slen; + vector strs; + std::random_device rd; + std::default_random_engine rng; + + RandomString(size_t s): slen(s), rng(rd()) {} + void prepare(size_t n) { + static const char alphabet[] = + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "0123456789"; + + std::uniform_int_distribution<> dist( + 0, sizeof(alphabet) / sizeof(*alphabet) - 2); + + strs.reserve(n); + std::generate_n( + std::back_inserter(strs), strs.capacity(), [&] { + std::string str; + str.reserve(slen); + std::generate_n(std::back_inserter(str), slen, [&]() { + return alphabet[dist(rng)]; + }); + return str; + } + ); + } + std::string &get_random_string() { + std::uniform_int_distribution<> dist( + 0, strs.size() - 1); + return strs[dist(rng)]; + } + }; + struct Message { + size_t idx; + size_t len; + std::string content; + + explicit Message(RandomString &rs, size_t i, size_t l): idx(i) { + size_t slen = rs.slen; + len = std::max(slen, l); + + std::vector strs; + strs.reserve(len / slen); + std::generate_n( + std::back_inserter(strs), strs.capacity(), [&] { + return rs.get_random_string(); + } + ); + len = slen * strs.size(); + content.reserve(len); + for (auto &&s : strs) + content.append(s); + } + bool verify(const char *b, size_t len = 0) const { + return content.compare(0, len, b, 0, len) == 0; + } + }; + + template + class C_delete : public EventCallback { + T *ctxt; + public: + C_delete(T *c): ctxt(c) {} + void do_request(int id) { + delete ctxt; + delete this; + } + }; + + class Client { + StressFactory *factory; + EventCenter *center; + ConnectedSocket socket; + std::deque acking; + std::deque writings; + std::string buffer; + size_t index = 0; + size_t left; + bool write_enabled = false; + size_t read_offset = 0, write_offset = 0; + bool first = true; + bool dead = false; + StressFactory::Message homeless_message; + + class Client_read_handle : public EventCallback { + Client *c; + public: + Client_read_handle(Client *_c): c(_c) {} + void do_request(int id) { + c->do_read_request(); + } + } read_ctxt; + + class Client_write_handle : public EventCallback { + Client *c; + public: + Client_write_handle(Client *_c): c(_c) {} + void do_request(int id) { + c->do_write_request(); + } + } write_ctxt; + + public: + Client(StressFactory *f, EventCenter *cen, ConnectedSocket s, size_t c) + : factory(f), center(cen), socket(std::move(s)), left(c), homeless_message(factory->rs, -1, 1024), + read_ctxt(this), write_ctxt(this) { + center->create_file_event( + socket.fd(), EVENT_READABLE, &read_ctxt); + center->dispatch_event_external(&read_ctxt); + } + void close() { + ASSERT_FALSE(write_enabled); + dead = true; + socket.shutdown(); + center->delete_file_event(socket.fd(), EVENT_READABLE); + center->dispatch_event_external(new C_delete(this)); + } + + void do_read_request() { + if (dead) + return ; + ASSERT_TRUE(socket.is_connected() >= 0); + if (!socket.is_connected()) + return ; + ASSERT_TRUE(!acking.empty() || first); + if (first) { + first = false; + center->dispatch_event_external(&write_ctxt); + if (acking.empty()) + return ; + } + StressFactory::Message *m = acking.front(); + int r = 0; + if (buffer.empty()) + buffer.resize(m->len); + bool must_no = false; + while (true) { + r = socket.read((char*)buffer.data() + read_offset, + m->len - read_offset); + ASSERT_TRUE(r == -EAGAIN || r > 0); + if (r == -EAGAIN) + break; + read_offset += r; + + std::cerr << " client " << this << " receive " << m->idx << " len " << r << " content: " << std::endl; + ASSERT_FALSE(must_no); + if ((m->len - read_offset) == 0) { + ASSERT_TRUE(m->verify(buffer.data(), 0)); + delete m; + acking.pop_front(); + read_offset = 0; + buffer.clear(); + if (acking.empty()) { + m = &homeless_message; + must_no = true; + } else { + m = acking.front(); + buffer.resize(m->len); + } + } + } + if (acking.empty()) { + center->dispatch_event_external(&write_ctxt); + return ; + } + } + + void do_write_request() { + if (dead) + return ; + ASSERT_TRUE(socket.is_connected() > 0); + + while (left > 0 && factory->queue_depth > writings.size() + acking.size()) { + StressFactory::Message *m = new StressFactory::Message( + factory->rs, ++index, + factory->rd() % factory->max_message_length); + std::cerr << " client " << this << " generate message " << m->idx << " length " << m->len << std::endl; + ASSERT_EQ(m->len, m->content.size()); + writings.push_back(m); + --left; + --factory->message_left; + } + + while (!writings.empty()) { + StressFactory::Message *m = writings.front(); + bufferlist bl; + bl.append(m->content.data() + write_offset, m->content.size() - write_offset); + ssize_t r = socket.send(bl, false); + if (r == 0) + break; + std::cerr << " client " << this << " send " << m->idx << " len " << r << " content: " << std::endl; + ASSERT_TRUE(r >= 0); + write_offset += r; + if (write_offset == m->content.size()) { + write_offset = 0; + writings.pop_front(); + acking.push_back(m); + } + } + if (writings.empty() && write_enabled) { + center->delete_file_event(socket.fd(), EVENT_WRITABLE); + write_enabled = false; + } else if (!writings.empty() && !write_enabled) { + ASSERT_EQ(0, center->create_file_event( + socket.fd(), EVENT_WRITABLE, &write_ctxt)); + write_enabled = true; + } + } + + bool finish() const { + return left == 0 && acking.empty() && writings.empty(); + } + }; + friend class Client; + + class Server { + StressFactory *factory; + EventCenter *center; + ConnectedSocket socket; + std::deque buffers; + bool write_enabled = false; + bool dead = false; + + class Server_read_handle : public EventCallback { + Server *s; + public: + Server_read_handle(Server *_s): s(_s) {} + void do_request(int id) { + s->do_read_request(); + } + } read_ctxt; + + class Server_write_handle : public EventCallback { + Server *s; + public: + Server_write_handle(Server *_s): s(_s) {} + void do_request(int id) { + s->do_write_request(); + } + } write_ctxt; + + public: + Server(StressFactory *f, EventCenter *c, ConnectedSocket s): + factory(f), center(c), socket(std::move(s)), read_ctxt(this), write_ctxt(this) { + center->create_file_event(socket.fd(), EVENT_READABLE, &read_ctxt); + center->dispatch_event_external(&read_ctxt); + } + void close() { + ASSERT_FALSE(write_enabled); + socket.shutdown(); + center->delete_file_event(socket.fd(), EVENT_READABLE); + center->dispatch_event_external(new C_delete(this)); + } + void do_read_request() { + if (dead) + return ; + int r = 0; + while (true) { + char buf[4096]; + bufferptr data; + if (factory->zero_copy_read) { + r = socket.zero_copy_read(data); + } else { + r = socket.read(buf, sizeof(buf)); + } + ASSERT_TRUE(r == -EAGAIN || (r >= 0 && (size_t)r <= sizeof(buf))); + if (r == 0) { + ASSERT_TRUE(buffers.empty()); + dead = true; + return ; + } else if (r == -EAGAIN) + break; + if (factory->zero_copy_read) { + buffers.emplace_back(data.c_str(), 0, data.length()); + } else { + buffers.emplace_back(buf, 0, r); + } + std::cerr << " server " << this << " receive " << r << " content: " << std::endl; + } + if (!buffers.empty() && !write_enabled) + center->dispatch_event_external(&write_ctxt); + } + + void do_write_request() { + if (dead) + return ; + + while (!buffers.empty()) { + bufferlist bl; + auto it = buffers.begin(); + for (size_t i = 0; i < buffers.size(); ++i) { + bl.push_back(bufferptr((char*)it->data(), it->size())); + ++it; + } + + ssize_t r = socket.send(bl, false); + std::cerr << " server " << this << " send " << r << std::endl; + if (r == 0) + break; + ASSERT_TRUE(r >= 0); + while (r > 0) { + ASSERT_TRUE(!buffers.empty()); + string &buffer = buffers.front(); + if (r >= (int)buffer.size()) { + r -= (int)buffer.size(); + buffers.pop_front(); + } else { + std::cerr << " server " << this << " sent " << r << std::endl; + buffer = buffer.substr(r, buffer.size()); + break; + } + } + } + if (buffers.empty()) { + if (write_enabled) { + center->delete_file_event(socket.fd(), EVENT_WRITABLE); + write_enabled = false; + } + } else if (!write_enabled) { + ASSERT_EQ(0, center->create_file_event( + socket.fd(), EVENT_WRITABLE, &write_ctxt)); + write_enabled = true; + } + } + + bool finish() { + return dead; + } + }; + friend class Server; + + class C_accept : public EventCallback { + StressFactory *factory; + ServerSocket bind_socket; + ThreadData *t_data; + + public: + C_accept(StressFactory *f, ServerSocket s, ThreadData *data) + : factory(f), bind_socket(std::move(s)), t_data(data) {} + void do_request(int id) { + while (true) { + entity_addr_t cli_addr; + ConnectedSocket srv_socket; + SocketOptions options; + int r = bind_socket.accept(&srv_socket, options, &cli_addr); + if (r == -EAGAIN) { + break; + } + ASSERT_EQ(0, r); + ASSERT_TRUE(srv_socket.fd() > 0); + Server *cb = new Server(factory, &t_data->worker->center, std::move(srv_socket)); + t_data->servers.insert(cb); + } + } + }; + friend class C_accept; + + public: + static const size_t min_client_send_messages = 100; + static const size_t max_client_send_messages = 1000; + std::shared_ptr stack; + RandomString rs; + std::random_device rd; + const size_t client_num, queue_depth, max_message_length; + atomic_int message_count, message_left; + entity_addr_t bind_addr; + bool zero_copy_read; + SocketOptions options; + + explicit StressFactory(std::shared_ptr s, const string &addr, + size_t cli, size_t qd, size_t mc, size_t l, bool zero_copy) + : stack(s), rs(128), client_num(cli), queue_depth(qd), + max_message_length(l), message_count(mc), message_left(mc), + zero_copy_read(zero_copy) { + bind_addr.parse(addr.c_str()); + rs.prepare(100); + } + ~StressFactory() { + } + + void add_client(ThreadData *t_data) { + static Mutex lock("add_client_lock"); + Mutex::Locker l(lock); + ConnectedSocket sock; + int r = t_data->worker->connect(bind_addr, options, &sock); + std::default_random_engine rng(rd()); + std::uniform_int_distribution<> dist( + min_client_send_messages, max_client_send_messages); + ASSERT_EQ(0, r); + int c = dist(rng); + if (c > message_count.load()) + c = message_count.load(); + Client *cb = new Client(this, &t_data->worker->center, std::move(sock), c); + t_data->clients.insert(cb); + message_count -= c; + } + + void drop_client(ThreadData *t_data, Client *c) { + c->close(); + ASSERT_EQ(1U, t_data->clients.erase(c)); + } + + void drop_server(ThreadData *t_data, Server *s) { + s->close(); + ASSERT_EQ(1U, t_data->servers.erase(s)); + } + + void start(Worker *worker) { + int r = 0; + ThreadData t_data; + t_data.worker = worker; + ServerSocket bind_socket; + if (stack->support_local_listen_table() || worker->id == 0) { + r = worker->listen(bind_addr, options, &bind_socket); + ASSERT_EQ(0, r); + } + C_accept *accept_handler = nullptr; + int bind_fd = 0; + if (bind_socket) { + bind_fd = bind_socket.fd(); + accept_handler = new C_accept(this, std::move(bind_socket), &t_data); + ASSERT_EQ(0, worker->center.create_file_event( + bind_fd, EVENT_READABLE, accept_handler)); + } + + int echo_throttle = message_count; + while (message_count > 0 || !t_data.clients.empty() || !t_data.servers.empty()) { + if (message_count > 0 && t_data.clients.size() < client_num && t_data.servers.size() < client_num) + add_client(&t_data); + for (auto &&c : t_data.clients) { + if (c->finish()) { + drop_client(&t_data, c); + break; + } + } + for (auto &&s : t_data.servers) { + if (s->finish()) { + drop_server(&t_data, s); + break; + } + } + + worker->center.process_events(1); + if (echo_throttle > message_left) { + std::cerr << " clients " << t_data.clients.size() << " servers " << t_data.servers.size() + << " message count " << message_left << std::endl; + echo_throttle -= 100; + } + } + if (bind_fd) + worker->center.delete_file_event(bind_fd, EVENT_READABLE); + delete accept_handler; + } +}; + +TEST_P(NetworkWorkerTest, StressTest) { + StressFactory factory(stack, get_addr(), 16, 16, 10000, 1024, false); + StressFactory *f = &factory; + exec_events([f](Worker *worker) mutable { + f->start(worker); + }); + ASSERT_EQ(0, factory.message_left); +} + + +INSTANTIATE_TEST_CASE_P( + NetworkStack, + NetworkWorkerTest, + ::testing::Values( + "posix" + ) +); + +#else + +// Google Test may not support value-parameterized tests with some +// compilers. If we use conditional compilation to compile out all +// code referring to the gtest_main library, MSVC linker will not link +// that library at all and consequently complain about missing entry +// point defined in that library (fatal error LNK1561: entry point +// must be defined). This dummy test keeps gtest_main linked in. +TEST(DummyTest, ValueParameterizedTestsAreNotSupportedOnThisPlatform) {} + +#endif + + +int main(int argc, char **argv) { + vector args; + argv_to_vec(argc, (const char **)argv, args); + + global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, CODE_ENVIRONMENT_UTILITY, 0); + common_init_finish(g_ceph_context); + + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +/* + * Local Variables: + * compile-command: "cd ../.. ; make ceph_test_async_networkstack && + * ./ceph_test_async_networkstack + * + * End: + */ diff --git a/src/test/msgr/test_msgr.cc b/src/test/msgr/test_msgr.cc index a94c77125b294..05fcbbe4ba437 100644 --- a/src/test/msgr/test_msgr.cc +++ b/src/test/msgr/test_msgr.cc @@ -175,6 +175,7 @@ class FakeDispatcher : public Dispatcher { s->put(); } got_remote_reset = true; + cond.Signal(); } void ms_fast_dispatch(Message *m) { Mutex::Locker l(lock); @@ -450,6 +451,7 @@ TEST_P(MessengerTest, StatefulTest) { // don't lose state ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); + srv_dispatcher.got_new = false; conn = client_msgr->get_connection(server_msgr->get_myinst()); { m = new MPing(); @@ -461,14 +463,26 @@ TEST_P(MessengerTest, StatefulTest) { } ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); - ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); + { + Mutex::Locker l(srv_dispatcher.lock); + while (!srv_dispatcher.got_remote_reset) + srv_dispatcher.cond.Wait(srv_dispatcher.lock); + } // 2. test for client reconnect ASSERT_FALSE(cli_dispatcher.got_remote_reset); cli_dispatcher.got_connect = false; + cli_dispatcher.got_new = false; + cli_dispatcher.got_remote_reset = false; server_conn->mark_down(); ASSERT_FALSE(server_conn->is_connected()); // ensure client detect server socket closed + { + Mutex::Locker l(cli_dispatcher.lock); + while (!cli_dispatcher.got_remote_reset) + cli_dispatcher.cond.Wait(cli_dispatcher.lock); + cli_dispatcher.got_remote_reset = false; + } { Mutex::Locker l(cli_dispatcher.lock); while (!cli_dispatcher.got_connect) @@ -477,6 +491,7 @@ TEST_P(MessengerTest, StatefulTest) { } CHECK_AND_WAIT_TRUE(conn->is_connected()); ASSERT_TRUE(conn->is_connected()); + { m = new MPing(); ASSERT_EQ(conn->send_message(m), 0); @@ -487,10 +502,9 @@ TEST_P(MessengerTest, StatefulTest) { cli_dispatcher.got_new = false; } // resetcheck happen - ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); + ASSERT_EQ(static_cast(conn->get_priv())->get_count(), 1); server_conn = server_msgr->get_connection(client_msgr->get_myinst()); - ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); - ASSERT_TRUE(cli_dispatcher.got_remote_reset); + ASSERT_EQ(static_cast(server_conn->get_priv())->get_count(), 1); cli_dispatcher.got_remote_reset = false; server_msgr->shutdown(); @@ -529,6 +543,7 @@ TEST_P(MessengerTest, StatelessTest) { conn->mark_down(); ASSERT_FALSE(conn->is_connected()); + srv_dispatcher.got_new = false; conn = client_msgr->get_connection(server_msgr->get_myinst()); { m = new MPing(); @@ -541,7 +556,12 @@ TEST_P(MessengerTest, StatelessTest) { ASSERT_TRUE(static_cast(conn->get_priv())->get_count() == 1); ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst()); // server lose state - ASSERT_TRUE(static_cast(server_conn->get_priv())->get_count() == 1); + { + Mutex::Locker l(srv_dispatcher.lock); + while (!srv_dispatcher.got_new) + srv_dispatcher.cond.Wait(srv_dispatcher.lock); + } + ASSERT_EQ(static_cast(server_conn->get_priv())->get_count(), 1); // 2. test for client lossy server_conn->mark_down();