Permalink
Browse files

take some load off select() by reading chunks from pipe

  • Loading branch information...
1 parent 81634aa commit 7fe66f90732d26f9d7659d068ad059e6e0c8e6c7 @Neverlord Neverlord committed Aug 20, 2012
Showing with 79 additions and 61 deletions.
  1. +70 −60 src/middleman.cpp
  2. +9 −1 src/network_manager.cpp
View
@@ -555,74 +555,84 @@ class middleman_overseer : public network_channel {
bool continue_reading() {
DEBUG("middleman_overseer::continue_reading");
- uint32_t dummy;
- if (::read(read_handle(), &dummy, sizeof(dummy)) != sizeof(dummy)) {
- CPPA_CRITICAL("cannot read from pipe");
- }
- atomic_thread_fence(memory_order_seq_cst);
- unique_ptr<middleman_message> msg(m_queue.try_pop());
- if (!msg) { CPPA_CRITICAL("nullptr dequeued"); }
- switch (msg->type) {
- case middleman_message_type::add_peer: {
- DEBUG("middleman_overseer: add_peer: "
- << to_string(*(msg->new_peer.second)));
- auto& new_peer = msg->new_peer;
- auto& io_ptrs = new_peer.first;
- peer_connection_ptr peer;
- peer.reset(new peer_connection(parent(),
- io_ptrs.first,
- io_ptrs.second,
- new_peer.second));
- parent()->add_channel_ptr(peer);
- parent()->add_peer(*(new_peer.second), peer);
- break;
+ static constexpr size_t num_dummies = 256;
+ uint8_t dummies[num_dummies];
+ auto read_result = ::read(read_handle(), dummies, num_dummies);
+ if (read_result < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ // try again later
+ return true;
}
- case middleman_message_type::publish: {
- DEBUG("middleman_overseer: publish");
- auto& ptrs = msg->new_published_actor;
- parent()->add_channel<peer_acceptor>(ptrs.second->id(),
- move(ptrs.first));
- break;
+ else {
+ CPPA_CRITICAL("cannot read from pipe");
}
- case middleman_message_type::unpublish: {
- if (msg->published_actor) {
- DEBUG("middleman_overseer: unpublish actor id "
- << msg->published_actor->id());
- auto channel = parent()->acceptor_of(msg->published_actor);
- if (channel) {
- parent()->erase(channel);
- }
+ }
+ atomic_thread_fence(memory_order_seq_cst);
+ for (int i = 0; i < read_result; ++i) {
+ unique_ptr<middleman_message> msg(m_queue.try_pop());
+ if (!msg) { CPPA_CRITICAL("nullptr dequeued"); }
+ switch (msg->type) {
+ case middleman_message_type::add_peer: {
+ DEBUG("middleman_overseer: add_peer: "
+ << to_string(*(msg->new_peer.second)));
+ auto& new_peer = msg->new_peer;
+ auto& io_ptrs = new_peer.first;
+ peer_connection_ptr peer;
+ peer.reset(new peer_connection(parent(),
+ io_ptrs.first,
+ io_ptrs.second,
+ new_peer.second));
+ parent()->add_channel_ptr(peer);
+ parent()->add_peer(*(new_peer.second), peer);
+ break;
}
- break;
- }
- case middleman_message_type::outgoing_message: {
- DEBUG("middleman_overseer: outgoing_message");
- auto& target_peer = msg->out_msg.first;
- auto& out_msg = msg->out_msg.second;
- CPPA_REQUIRE(target_peer != nullptr);
- auto peer = parent()->peer(*target_peer);
- if (!peer) {
- DEBUG("message to an unknown peer: " << to_string(out_msg));
+ case middleman_message_type::publish: {
+ DEBUG("middleman_overseer: publish");
+ auto& ptrs = msg->new_published_actor;
+ parent()->add_channel<peer_acceptor>(ptrs.second->id(),
+ move(ptrs.first));
break;
}
- DEBUG("--> " << to_string(out_msg));
- auto had_unwritten_data = peer->has_unwritten_data();
- try {
- peer->write(out_msg);
- if (!had_unwritten_data && peer->has_unwritten_data()) {
- parent()->continue_writing(peer);
+ case middleman_message_type::unpublish: {
+ if (msg->published_actor) {
+ DEBUG("middleman_overseer: unpublish actor id "
+ << msg->published_actor->id());
+ auto channel = parent()->acceptor_of(msg->published_actor);
+ if (channel) {
+ parent()->erase(channel);
+ }
+ }
+ break;
+ }
+ case middleman_message_type::outgoing_message: {
+ DEBUG("middleman_overseer: outgoing_message");
+ auto& target_peer = msg->out_msg.first;
+ auto& out_msg = msg->out_msg.second;
+ CPPA_REQUIRE(target_peer != nullptr);
+ auto peer = parent()->peer(*target_peer);
+ if (!peer) {
+ DEBUG("message to an unknown peer: " << to_string(out_msg));
+ break;
+ }
+ DEBUG("--> " << to_string(out_msg));
+ auto had_unwritten_data = peer->has_unwritten_data();
+ try {
+ peer->write(out_msg);
+ if (!had_unwritten_data && peer->has_unwritten_data()) {
+ parent()->continue_writing(peer);
+ }
}
+ catch (exception& e) {
+ DEBUG("peer disconnected: " << e.what());
+ parent()->erase(peer);
+ }
+ break;
}
- catch (exception& e) {
- DEBUG("peer disconnected: " << e.what());
- parent()->erase(peer);
+ case middleman_message_type::shutdown: {
+ DEBUG("middleman: shutdown");
+ parent()->quit();
+ break;
}
- break;
- }
- case middleman_message_type::shutdown: {
- DEBUG("middleman: shutdown");
- parent()->quit();
- break;
}
}
return true;
View
@@ -65,6 +65,14 @@ struct network_manager_impl : network_manager {
}
// store pipe read handle in local variables for lambda expression
int pipe_fd0 = pipe_fd[0];
+ // set read handle to nonblocking
+ auto flags = fcntl(pipe_fd0, F_GETFL, 0);
+ if (flags == -1) {
+ throw network_error("unable to read socket flags");
+ }
+ if (fcntl(pipe_fd0, F_SETFL, flags | O_NONBLOCK) < 0) {
+ CPPA_CRITICAL("unable to set pipe read handle to nonblock");
+ }
// start threads
m_middleman_thread = std::thread([this, pipe_fd0] {
middleman_loop(pipe_fd0, this->m_middleman_queue);
@@ -82,7 +90,7 @@ struct network_manager_impl : network_manager {
void send_to_middleman(std::unique_ptr<middleman_message> msg) {
m_middleman_queue._push_back(msg.release());
std::atomic_thread_fence(std::memory_order_seq_cst);
- std::uint32_t dummy = 0;
+ std::uint8_t dummy = 0;
if (write(pipe_fd[1], &dummy, sizeof(dummy)) != sizeof(dummy)) {
CPPA_CRITICAL("cannot write to pipe");
}

0 comments on commit 7fe66f9

Please sign in to comment.