diff --git a/src/middleman.cpp b/src/middleman.cpp index 606bb203a4..da80877188 100644 --- a/src/middleman.cpp +++ b/src/middleman.cpp @@ -555,74 +555,84 @@ class middleman_overseer : public network_channel { bool continue_reading() { DEBUG("middleman_overseer::continue_reading"); - uint32_t dummy; - if (::read(read_handle(), &dummy, sizeof(dummy)) != sizeof(dummy)) { - CPPA_CRITICAL("cannot read from pipe"); - } - atomic_thread_fence(memory_order_seq_cst); - unique_ptr msg(m_queue.try_pop()); - if (!msg) { CPPA_CRITICAL("nullptr dequeued"); } - switch (msg->type) { - case middleman_message_type::add_peer: { - DEBUG("middleman_overseer: add_peer: " - << to_string(*(msg->new_peer.second))); - auto& new_peer = msg->new_peer; - auto& io_ptrs = new_peer.first; - peer_connection_ptr peer; - peer.reset(new peer_connection(parent(), - io_ptrs.first, - io_ptrs.second, - new_peer.second)); - parent()->add_channel_ptr(peer); - parent()->add_peer(*(new_peer.second), peer); - break; + static constexpr size_t num_dummies = 256; + uint8_t dummies[num_dummies]; + auto read_result = ::read(read_handle(), dummies, num_dummies); + if (read_result < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // try again later + return true; } - case middleman_message_type::publish: { - DEBUG("middleman_overseer: publish"); - auto& ptrs = msg->new_published_actor; - parent()->add_channel(ptrs.second->id(), - move(ptrs.first)); - break; + else { + CPPA_CRITICAL("cannot read from pipe"); } - case middleman_message_type::unpublish: { - if (msg->published_actor) { - DEBUG("middleman_overseer: unpublish actor id " - << msg->published_actor->id()); - auto channel = parent()->acceptor_of(msg->published_actor); - if (channel) { - parent()->erase(channel); - } + } + atomic_thread_fence(memory_order_seq_cst); + for (int i = 0; i < read_result; ++i) { + unique_ptr msg(m_queue.try_pop()); + if (!msg) { CPPA_CRITICAL("nullptr dequeued"); } + switch (msg->type) { + case middleman_message_type::add_peer: { + DEBUG("middleman_overseer: add_peer: " + << to_string(*(msg->new_peer.second))); + auto& new_peer = msg->new_peer; + auto& io_ptrs = new_peer.first; + peer_connection_ptr peer; + peer.reset(new peer_connection(parent(), + io_ptrs.first, + io_ptrs.second, + new_peer.second)); + parent()->add_channel_ptr(peer); + parent()->add_peer(*(new_peer.second), peer); + break; } - break; - } - case middleman_message_type::outgoing_message: { - DEBUG("middleman_overseer: outgoing_message"); - auto& target_peer = msg->out_msg.first; - auto& out_msg = msg->out_msg.second; - CPPA_REQUIRE(target_peer != nullptr); - auto peer = parent()->peer(*target_peer); - if (!peer) { - DEBUG("message to an unknown peer: " << to_string(out_msg)); + case middleman_message_type::publish: { + DEBUG("middleman_overseer: publish"); + auto& ptrs = msg->new_published_actor; + parent()->add_channel(ptrs.second->id(), + move(ptrs.first)); break; } - DEBUG("--> " << to_string(out_msg)); - auto had_unwritten_data = peer->has_unwritten_data(); - try { - peer->write(out_msg); - if (!had_unwritten_data && peer->has_unwritten_data()) { - parent()->continue_writing(peer); + case middleman_message_type::unpublish: { + if (msg->published_actor) { + DEBUG("middleman_overseer: unpublish actor id " + << msg->published_actor->id()); + auto channel = parent()->acceptor_of(msg->published_actor); + if (channel) { + parent()->erase(channel); + } + } + break; + } + case middleman_message_type::outgoing_message: { + DEBUG("middleman_overseer: outgoing_message"); + auto& target_peer = msg->out_msg.first; + auto& out_msg = msg->out_msg.second; + CPPA_REQUIRE(target_peer != nullptr); + auto peer = parent()->peer(*target_peer); + if (!peer) { + DEBUG("message to an unknown peer: " << to_string(out_msg)); + break; + } + DEBUG("--> " << to_string(out_msg)); + auto had_unwritten_data = peer->has_unwritten_data(); + try { + peer->write(out_msg); + if (!had_unwritten_data && peer->has_unwritten_data()) { + parent()->continue_writing(peer); + } } + catch (exception& e) { + DEBUG("peer disconnected: " << e.what()); + parent()->erase(peer); + } + break; } - catch (exception& e) { - DEBUG("peer disconnected: " << e.what()); - parent()->erase(peer); + case middleman_message_type::shutdown: { + DEBUG("middleman: shutdown"); + parent()->quit(); + break; } - break; - } - case middleman_message_type::shutdown: { - DEBUG("middleman: shutdown"); - parent()->quit(); - break; } } return true; diff --git a/src/network_manager.cpp b/src/network_manager.cpp index a291742219..349159eda0 100644 --- a/src/network_manager.cpp +++ b/src/network_manager.cpp @@ -65,6 +65,14 @@ struct network_manager_impl : network_manager { } // store pipe read handle in local variables for lambda expression int pipe_fd0 = pipe_fd[0]; + // set read handle to nonblocking + auto flags = fcntl(pipe_fd0, F_GETFL, 0); + if (flags == -1) { + throw network_error("unable to read socket flags"); + } + if (fcntl(pipe_fd0, F_SETFL, flags | O_NONBLOCK) < 0) { + CPPA_CRITICAL("unable to set pipe read handle to nonblock"); + } // start threads m_middleman_thread = std::thread([this, pipe_fd0] { middleman_loop(pipe_fd0, this->m_middleman_queue); @@ -82,7 +90,7 @@ struct network_manager_impl : network_manager { void send_to_middleman(std::unique_ptr msg) { m_middleman_queue._push_back(msg.release()); std::atomic_thread_fence(std::memory_order_seq_cst); - std::uint32_t dummy = 0; + std::uint8_t dummy = 0; if (write(pipe_fd[1], &dummy, sizeof(dummy)) != sizeof(dummy)) { CPPA_CRITICAL("cannot write to pipe"); }