diff --git a/ChangeLog.md b/ChangeLog.md index 1c562d7f2c..90cdf757af 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,22 +1,26 @@ Version 0.4.2 +------------- -2012-10-1 +__2012-10-1__ - Bugfix: evaluate `errno` whenever select() fails and handle errors properly - Refactored announce * accept recursive containers, e.g., vector> * allow user-defined types as members of announced types * all-new, policy-based implementation +- Use `poll` rather than `select` in middleman (based on the patch by ArtemGr) Version 0.4.1 +------------- -2012-08-22 +__2012-08-22__ - Bugfix: shutdown() caused segfault if no scheduler or middleman was started Version 0.4 +----------- -2012-08-20 +__2012-08-20__ - New network layer implementation - Added acceptor and input/output stream interfaces @@ -32,8 +36,9 @@ Version 0.4 - Do not send more than one response message with `reply` Version 0.3.3 +------------- -2012-08-09 +__2012-08-09__ - Bugfix: serialize message id for synchronous messaging - Added macro to unit_testing/CMakeLists.txt for less verbose CMake setup @@ -46,21 +51,24 @@ Version 0.3.3 - Added "delayed_send_tuple" and "delayed_reply_tuple" Version 0.3.2 +------------- -2012-07-30 +__2012-07-30__ - Bugfix: added 'bool' to the list of announced types Version 0.3.1 +------------- -2012-07-27 +__2012-07-27__ - Bugfix: always return from a synchronous handler if a timeout occurs - Bugfix: request next timeout after timeout handler invocation if needed Version 0.3 +----------- -2012-07-25 +__2012-07-25__ - Implemented synchronous messages - The function become() no longer accepts pointers @@ -72,15 +80,17 @@ Version 0.3 - Group subscriptions are no longer attachables Version 0.2.1 +------------- -2012-07-02 +__2012-07-02__ - More efficient behavior implementation - Relaxed definition of become() to accept const lvalue references as well Version 0.2 +----------- -2012-06-29 +__2012-06-29__ - Removed become_void() [use quit() instead] - Renamed "future_send()" to "delayed_send()" diff --git a/src/middleman.cpp b/src/middleman.cpp index 825fbf6591..78582afc3e 100644 --- a/src/middleman.cpp +++ b/src/middleman.cpp @@ -35,6 +35,11 @@ #include #include +#ifdef CPPA_WINDOWS +#else +# include +#endif + #include "cppa/on.hpp" #include "cppa/actor.hpp" #include "cppa/match.hpp" @@ -441,7 +446,7 @@ bool peer_connection::continue_reading() { binary_deserializer bd(m_rd_buf.data(), m_rd_buf.size()); m_meta_msg->deserialize(&msg, &bd); auto& content = msg.content(); - DEBUG("<-- " << to_string(msg)); + //DEBUG("<-- " << to_string(msg)); match(content) ( // monitor messages are sent automatically whenever // actor_proxy_cache creates a new proxy @@ -670,7 +675,7 @@ class middleman_overseer : public network_channel { DEBUG("message to an unknown peer: " << to_string(out_msg)); break; } - DEBUG("--> " << to_string(out_msg)); + //DEBUG("--> " << to_string(out_msg)); auto had_unwritten_data = peer->has_unwritten_data(); try { peer->write(out_msg); @@ -702,19 +707,18 @@ class middleman_overseer : public network_channel { void middleman::operator()(int pipe_fd, middleman_queue& queue) { DEBUG("pself: " << to_string(*m_pself)); - int maxfd = 0; - fd_set rdset; - fd_set wrset; - fd_set* wrset_ptr = nullptr; + std::vector pollset; m_channels.emplace_back(new middleman_overseer(this, pipe_fd, queue)); auto update_fd_sets = [&] { - FD_ZERO(&rdset); - maxfd = 0; + pollset.clear(); CPPA_REQUIRE(m_channels.size() > 0); + // add all read handles of all channels (POLLIN) for (auto& channel : m_channels) { - auto fd = channel->read_handle(); - maxfd = max(maxfd, fd); - FD_SET(fd, &rdset); + pollfd pfd; + pfd.fd = channel->read_handle(); + pfd.events = POLLIN; + pfd.revents = 0; + pollset.push_back(pfd); } // check consistency of m_peers_with_unwritten_data if (!m_peers_with_unwritten_data.empty()) { @@ -727,18 +731,14 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) { else ++i; } } - if (m_peers_with_unwritten_data.empty()) { - if (wrset_ptr) wrset_ptr = nullptr; + // add all write handles of all peers with unwritten data (POLLOUT) + for (auto& peer : m_peers_with_unwritten_data) { + struct pollfd pfd; + pfd.fd = peer->write_handle(); + pfd.events = POLLOUT; + pfd.revents = 0; + pollset.push_back(pfd); } - else { - for (auto& peer : m_peers_with_unwritten_data) { - auto fd = peer->write_handle(); - maxfd = max(maxfd, fd); - FD_SET(fd, &wrset); - } - wrset_ptr = &wrset; - } - CPPA_REQUIRE(maxfd > 0); }; auto continue_reading = [&](const network_channel_ptr& ch) { bool erase_channel = false; @@ -757,21 +757,19 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) { erase_channel = true; } if (erase_channel) { - DEBUG("erase worker"); + DEBUG("erase worker (read failed)"); m_erased_channels.insert(ch); } }; auto continue_writing = [&](const peer_connection_ptr& peer) { bool erase_channel = false; - try { - erase_channel = !peer->continue_writing(); - } + try { erase_channel = !peer->continue_writing(); } catch (exception& e) { DEBUG(demangle(typeid(e).name()) << ": " << e.what()); erase_channel = true; } if (erase_channel) { - DEBUG("erase worker"); + DEBUG("erase worker (write failed)"); m_erased_channels.insert(peer); } }; @@ -799,17 +797,16 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) { }; do { update_fd_sets(); - //DEBUG("select()"); - int sresult; + int presult; do { - DEBUG("select() on " + DEBUG("poll() on " << (m_peers_with_unwritten_data.size() + m_channels.size()) << " sockets"); - sresult = select(maxfd + 1, &rdset, wrset_ptr, nullptr, nullptr); - DEBUG("select() returned " << sresult); - if (sresult < 0) { + presult = poll (pollset.data(), pollset.size(), -1); + DEBUG("poll() returned " << presult); + if (presult < 0) { // try again or die hard - sresult = 0; + presult = 0; switch (errno) { // a signal was caught case EINTR: { @@ -819,9 +816,7 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) { // nfds is negative or the value // contained within timeout is invalid case EINVAL: { - if ((maxfd + 1) < 0) { - CPPA_CRITICAL("overflow: maxfd + 1 > 0"); - } + CPPA_CRITICAL("poll EINVAL"); break; } case ENOMEM: { @@ -831,6 +826,13 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) { //this_thread::yield(); break; } + // array given as argument was not contained + // in the calling program's address space + case EFAULT: { + // must not happen + CPPA_CRITICAL("poll EFAULT"); + break; + } case EBADF: { // this really shouldn't happen // try IO on each single socket and rebuild rd_set @@ -851,20 +853,49 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) { } } } - while (sresult == 0); + while (presult == 0); +# ifdef CPPA_LINUX +# define POLL_ERR_MASK (POLLRDHUP | POLLERR | POLLHUP | POLLNVAL) +# else +# define POLL_ERR_MASK (POLLERR | POLLHUP | POLLNVAL) +# endif //DEBUG("continue reading ..."); - { // iterate over all channels and remove channels as needed - for (auto& ch : m_channels) { - if (FD_ISSET(ch->read_handle(), &rdset)) { - continue_reading(ch); + // iterate over all channels and remove channels as needed + for (auto& pfd : pollset) { + if (pfd.revents != 0) { + DEBUG("fd " << pfd.fd << "; read revents: " << pfd.revents); + auto ch_end = end(m_channels); + // check wheter pfd belongs to a read handle + auto ch = find_if(begin(m_channels), ch_end, + [&](const network_channel_ptr& ptr) { + return pfd.fd == ptr->read_handle(); + }); + if (ch != ch_end) { + if (pfd.revents & POLL_ERR_MASK) { + // remove socket on error + m_erased_channels.insert(*ch); + } + else if (pfd.revents & (POLLIN | POLLPRI)) { + // read some if possible + continue_reading(*ch); + } } - } - } - if (wrset_ptr) { // iterate over peers with unwritten data - DEBUG("continue writing ..."); - for (auto& peer : m_peers_with_unwritten_data) { - if (FD_ISSET(peer->write_handle(), &wrset)) { - continue_writing(peer); + // check wheter pfd belongs to a write handle (can be both!) + auto pc_end = end(m_peers_with_unwritten_data); + auto pc = find_if(begin(m_peers_with_unwritten_data), pc_end, + [&](const peer_connection_ptr& ptr) { + return pfd.fd == ptr->write_handle(); + }); + if (pc != pc_end) { + if (pfd.revents & POLL_ERR_MASK) { + // remove socket on error + m_erased_channels.insert(*pc); + m_peers_with_unwritten_data.erase(*pc); + } + else if (pfd.revents & POLLOUT) { + // write some if possible + continue_writing(*pc); + } } } }