Skip to content

Commit

Permalink
use poll rather than select in middleman (based on the patch by A…
Browse files Browse the repository at this point in the history
…rtemGr), relates #71
  • Loading branch information
Neverlord committed Oct 1, 2012
1 parent 2671d2c commit 7ef8d40
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 58 deletions.
28 changes: 19 additions & 9 deletions ChangeLog.md
@@ -1,22 +1,26 @@
Version 0.4.2 Version 0.4.2
-------------


2012-10-1 __2012-10-1__


- Bugfix: evaluate `errno` whenever select() fails and handle errors properly - Bugfix: evaluate `errno` whenever select() fails and handle errors properly
- Refactored announce - Refactored announce
* accept recursive containers, e.g., vector<vector<double>> * accept recursive containers, e.g., vector<vector<double>>
* allow user-defined types as members of announced types * allow user-defined types as members of announced types
* all-new, policy-based implementation * all-new, policy-based implementation
- Use `poll` rather than `select` in middleman (based on the patch by ArtemGr)


Version 0.4.1 Version 0.4.1
-------------


2012-08-22 __2012-08-22__


- Bugfix: shutdown() caused segfault if no scheduler or middleman was started - Bugfix: shutdown() caused segfault if no scheduler or middleman was started


Version 0.4 Version 0.4
-----------


2012-08-20 __2012-08-20__


- New network layer implementation - New network layer implementation
- Added acceptor and input/output stream interfaces - Added acceptor and input/output stream interfaces
Expand All @@ -32,8 +36,9 @@ Version 0.4
- Do not send more than one response message with `reply` - Do not send more than one response message with `reply`


Version 0.3.3 Version 0.3.3
-------------


2012-08-09 __2012-08-09__


- Bugfix: serialize message id for synchronous messaging - Bugfix: serialize message id for synchronous messaging
- Added macro to unit_testing/CMakeLists.txt for less verbose CMake setup - Added macro to unit_testing/CMakeLists.txt for less verbose CMake setup
Expand All @@ -46,21 +51,24 @@ Version 0.3.3
- Added "delayed_send_tuple" and "delayed_reply_tuple" - Added "delayed_send_tuple" and "delayed_reply_tuple"


Version 0.3.2 Version 0.3.2
-------------


2012-07-30 __2012-07-30__


- Bugfix: added 'bool' to the list of announced types - Bugfix: added 'bool' to the list of announced types


Version 0.3.1 Version 0.3.1
-------------


2012-07-27 __2012-07-27__


- Bugfix: always return from a synchronous handler if a timeout occurs - Bugfix: always return from a synchronous handler if a timeout occurs
- Bugfix: request next timeout after timeout handler invocation if needed - Bugfix: request next timeout after timeout handler invocation if needed


Version 0.3 Version 0.3
-----------


2012-07-25 __2012-07-25__


- Implemented synchronous messages - Implemented synchronous messages
- The function become() no longer accepts pointers - The function become() no longer accepts pointers
Expand All @@ -72,15 +80,17 @@ Version 0.3
- Group subscriptions are no longer attachables - Group subscriptions are no longer attachables


Version 0.2.1 Version 0.2.1
-------------


2012-07-02 __2012-07-02__


- More efficient behavior implementation - More efficient behavior implementation
- Relaxed definition of become() to accept const lvalue references as well - Relaxed definition of become() to accept const lvalue references as well


Version 0.2 Version 0.2
-----------


2012-06-29 __2012-06-29__


- Removed become_void() [use quit() instead] - Removed become_void() [use quit() instead]
- Renamed "future_send()" to "delayed_send()" - Renamed "future_send()" to "delayed_send()"
Expand Down
129 changes: 80 additions & 49 deletions src/middleman.cpp
Expand Up @@ -35,6 +35,11 @@
#include <sstream> #include <sstream>
#include <iostream> #include <iostream>


#ifdef CPPA_WINDOWS
#else
# include <poll.h>
#endif

#include "cppa/on.hpp" #include "cppa/on.hpp"
#include "cppa/actor.hpp" #include "cppa/actor.hpp"
#include "cppa/match.hpp" #include "cppa/match.hpp"
Expand Down Expand Up @@ -441,7 +446,7 @@ bool peer_connection::continue_reading() {
binary_deserializer bd(m_rd_buf.data(), m_rd_buf.size()); binary_deserializer bd(m_rd_buf.data(), m_rd_buf.size());
m_meta_msg->deserialize(&msg, &bd); m_meta_msg->deserialize(&msg, &bd);
auto& content = msg.content(); auto& content = msg.content();
DEBUG("<-- " << to_string(msg)); //DEBUG("<-- " << to_string(msg));
match(content) ( match(content) (
// monitor messages are sent automatically whenever // monitor messages are sent automatically whenever
// actor_proxy_cache creates a new proxy // actor_proxy_cache creates a new proxy
Expand Down Expand Up @@ -670,7 +675,7 @@ class middleman_overseer : public network_channel {
DEBUG("message to an unknown peer: " << to_string(out_msg)); DEBUG("message to an unknown peer: " << to_string(out_msg));
break; break;
} }
DEBUG("--> " << to_string(out_msg)); //DEBUG("--> " << to_string(out_msg));
auto had_unwritten_data = peer->has_unwritten_data(); auto had_unwritten_data = peer->has_unwritten_data();
try { try {
peer->write(out_msg); peer->write(out_msg);
Expand Down Expand Up @@ -702,19 +707,18 @@ class middleman_overseer : public network_channel {


void middleman::operator()(int pipe_fd, middleman_queue& queue) { void middleman::operator()(int pipe_fd, middleman_queue& queue) {
DEBUG("pself: " << to_string(*m_pself)); DEBUG("pself: " << to_string(*m_pself));
int maxfd = 0; std::vector<pollfd> pollset;
fd_set rdset;
fd_set wrset;
fd_set* wrset_ptr = nullptr;
m_channels.emplace_back(new middleman_overseer(this, pipe_fd, queue)); m_channels.emplace_back(new middleman_overseer(this, pipe_fd, queue));
auto update_fd_sets = [&] { auto update_fd_sets = [&] {
FD_ZERO(&rdset); pollset.clear();
maxfd = 0;
CPPA_REQUIRE(m_channels.size() > 0); CPPA_REQUIRE(m_channels.size() > 0);
// add all read handles of all channels (POLLIN)
for (auto& channel : m_channels) { for (auto& channel : m_channels) {
auto fd = channel->read_handle(); pollfd pfd;
maxfd = max(maxfd, fd); pfd.fd = channel->read_handle();
FD_SET(fd, &rdset); pfd.events = POLLIN;
pfd.revents = 0;
pollset.push_back(pfd);
} }
// check consistency of m_peers_with_unwritten_data // check consistency of m_peers_with_unwritten_data
if (!m_peers_with_unwritten_data.empty()) { if (!m_peers_with_unwritten_data.empty()) {
Expand All @@ -727,18 +731,14 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
else ++i; else ++i;
} }
} }
if (m_peers_with_unwritten_data.empty()) { // add all write handles of all peers with unwritten data (POLLOUT)
if (wrset_ptr) wrset_ptr = nullptr; for (auto& peer : m_peers_with_unwritten_data) {
struct pollfd pfd;
pfd.fd = peer->write_handle();
pfd.events = POLLOUT;
pfd.revents = 0;
pollset.push_back(pfd);
} }
else {
for (auto& peer : m_peers_with_unwritten_data) {
auto fd = peer->write_handle();
maxfd = max(maxfd, fd);
FD_SET(fd, &wrset);
}
wrset_ptr = &wrset;
}
CPPA_REQUIRE(maxfd > 0);
}; };
auto continue_reading = [&](const network_channel_ptr& ch) { auto continue_reading = [&](const network_channel_ptr& ch) {
bool erase_channel = false; bool erase_channel = false;
Expand All @@ -757,21 +757,19 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
erase_channel = true; erase_channel = true;
} }
if (erase_channel) { if (erase_channel) {
DEBUG("erase worker"); DEBUG("erase worker (read failed)");
m_erased_channels.insert(ch); m_erased_channels.insert(ch);
} }
}; };
auto continue_writing = [&](const peer_connection_ptr& peer) { auto continue_writing = [&](const peer_connection_ptr& peer) {
bool erase_channel = false; bool erase_channel = false;
try { try { erase_channel = !peer->continue_writing(); }
erase_channel = !peer->continue_writing();
}
catch (exception& e) { catch (exception& e) {
DEBUG(demangle(typeid(e).name()) << ": " << e.what()); DEBUG(demangle(typeid(e).name()) << ": " << e.what());
erase_channel = true; erase_channel = true;
} }
if (erase_channel) { if (erase_channel) {
DEBUG("erase worker"); DEBUG("erase worker (write failed)");
m_erased_channels.insert(peer); m_erased_channels.insert(peer);
} }
}; };
Expand Down Expand Up @@ -799,17 +797,16 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
}; };
do { do {
update_fd_sets(); update_fd_sets();
//DEBUG("select()"); int presult;
int sresult;
do { do {
DEBUG("select() on " DEBUG("poll() on "
<< (m_peers_with_unwritten_data.size() + m_channels.size()) << (m_peers_with_unwritten_data.size() + m_channels.size())
<< " sockets"); << " sockets");
sresult = select(maxfd + 1, &rdset, wrset_ptr, nullptr, nullptr); presult = poll (pollset.data(), pollset.size(), -1);
DEBUG("select() returned " << sresult); DEBUG("poll() returned " << presult);
if (sresult < 0) { if (presult < 0) {
// try again or die hard // try again or die hard
sresult = 0; presult = 0;
switch (errno) { switch (errno) {
// a signal was caught // a signal was caught
case EINTR: { case EINTR: {
Expand All @@ -819,9 +816,7 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
// nfds is negative or the value // nfds is negative or the value
// contained within timeout is invalid // contained within timeout is invalid
case EINVAL: { case EINVAL: {
if ((maxfd + 1) < 0) { CPPA_CRITICAL("poll EINVAL");
CPPA_CRITICAL("overflow: maxfd + 1 > 0");
}
break; break;
} }
case ENOMEM: { case ENOMEM: {
Expand All @@ -831,6 +826,13 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
//this_thread::yield(); //this_thread::yield();
break; break;
} }
// array given as argument was not contained
// in the calling program's address space
case EFAULT: {
// must not happen
CPPA_CRITICAL("poll EFAULT");
break;
}
case EBADF: { case EBADF: {
// this really shouldn't happen // this really shouldn't happen
// try IO on each single socket and rebuild rd_set // try IO on each single socket and rebuild rd_set
Expand All @@ -851,20 +853,49 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
} }
} }
} }
while (sresult == 0); while (presult == 0);
# ifdef CPPA_LINUX
# define POLL_ERR_MASK (POLLRDHUP | POLLERR | POLLHUP | POLLNVAL)
# else
# define POLL_ERR_MASK (POLLERR | POLLHUP | POLLNVAL)
# endif
//DEBUG("continue reading ..."); //DEBUG("continue reading ...");
{ // iterate over all channels and remove channels as needed // iterate over all channels and remove channels as needed
for (auto& ch : m_channels) { for (auto& pfd : pollset) {
if (FD_ISSET(ch->read_handle(), &rdset)) { if (pfd.revents != 0) {
continue_reading(ch); DEBUG("fd " << pfd.fd << "; read revents: " << pfd.revents);
auto ch_end = end(m_channels);
// check wheter pfd belongs to a read handle
auto ch = find_if(begin(m_channels), ch_end,
[&](const network_channel_ptr& ptr) {
return pfd.fd == ptr->read_handle();
});
if (ch != ch_end) {
if (pfd.revents & POLL_ERR_MASK) {
// remove socket on error
m_erased_channels.insert(*ch);
}
else if (pfd.revents & (POLLIN | POLLPRI)) {
// read some if possible
continue_reading(*ch);
}
} }
} // check wheter pfd belongs to a write handle (can be both!)
} auto pc_end = end(m_peers_with_unwritten_data);
if (wrset_ptr) { // iterate over peers with unwritten data auto pc = find_if(begin(m_peers_with_unwritten_data), pc_end,
DEBUG("continue writing ..."); [&](const peer_connection_ptr& ptr) {
for (auto& peer : m_peers_with_unwritten_data) { return pfd.fd == ptr->write_handle();
if (FD_ISSET(peer->write_handle(), &wrset)) { });
continue_writing(peer); if (pc != pc_end) {
if (pfd.revents & POLL_ERR_MASK) {
// remove socket on error
m_erased_channels.insert(*pc);
m_peers_with_unwritten_data.erase(*pc);
}
else if (pfd.revents & POLLOUT) {
// write some if possible
continue_writing(*pc);
}
} }
} }
} }
Expand Down

0 comments on commit 7ef8d40

Please sign in to comment.