diff --git a/hpx/runtime/parcelset/policies/ibverbs/acceptor.hpp b/hpx/runtime/parcelset/policies/ibverbs/acceptor.hpp index cefa1277a93c..d3b7191c87f3 100644 --- a/hpx/runtime/parcelset/policies/ibverbs/acceptor.hpp +++ b/hpx/runtime/parcelset/policies/ibverbs/acceptor.hpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -30,232 +31,10 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs { /////////////////////////////////////////////////////////////////////////// - // - struct basic_acceptor_options - { - template - struct option - { - /* - option(T const& num) : val_(num) {} - T val_; - */ - }; - - /* - typedef option msg_num; - typedef option manage; - */ - }; - - /////////////////////////////////////////////////////////////////////////// - template - class basic_acceptor - : public boost::asio::basic_io_object, - public basic_acceptor_options - { - public: - explicit basic_acceptor(boost::asio::io_service &io_service) - : boost::asio::basic_io_object(io_service) - { - } - - void open(boost::system::error_code &ec = boost::system::throws) - { - this->service.open(this->implementation, ec); - } - - void close(boost::system::error_code &ec = boost::system::throws) - { - this->service.close(this->implementation, ec); - } - - void bind( - boost::asio::ip::tcp::endpoint const & ep, - boost::system::error_code &ec = boost::system::throws) - { - this->service.bind(this->implementation, ep, ec); - } - - // synchronous and asynchronous accept - template - void accept(basic_context & conn, - boost::system::error_code &ec = boost::system::throws) - { - return this->service.accept(this->implementation, conn, ec); - } - - template - void async_accept(basic_context & conn, Handler handler) - { - this->service.async_accept(this->implementation, conn, handler); - } - }; - - /////////////////////////////////////////////////////////////////////////// - namespace detail - { - /////////////////////////////////////////////////////////////////////// - template - class accept_operation - : public boost::enable_shared_from_this< - accept_operation > - { - typedef boost::shared_ptr implementation_type; - - public: - accept_operation(implementation_type &impl, - boost::asio::io_service &io_service, - basic_context & conn, Handler handler) - : impl_(impl), - io_service_(io_service), - conn_(conn), - handler_(handler) - {} - - void call() const - { - implementation_type impl = impl_.lock(); - if (impl) - { - boost::system::error_code ec; - if (!impl->get_connect_request(conn_, ec) && !ec) { - // repost this function - io_service_.post(boost::bind( - &accept_operation::call, this->shared_from_this())); - } - else { - io_service_.post(boost::bind( - &accept_operation::try_accept, this->shared_from_this())); - } - } - else - { - io_service_.post( - boost::asio::detail::bind_handler(handler_, - boost::asio::error::operation_aborted)); - } - } - - private: - void try_accept() const - { - implementation_type impl = impl_.lock(); - if (impl) - { - boost::system::error_code ec; - if (!impl->try_accept(conn_, ec) && !ec) { - // repost this function - io_service_.post(boost::bind( - &accept_operation::try_accept, this->shared_from_this())); - } - else { - io_service_.post( - boost::asio::detail::bind_handler(handler_, ec)); - } - } - else - { - io_service_.post( - boost::asio::detail::bind_handler(handler_, - boost::asio::error::operation_aborted)); - } - } - - boost::weak_ptr impl_; - boost::asio::io_service &io_service_; - basic_context & conn_; - Handler handler_; - }; - } - - /////////////////////////////////////////////////////////////////////////// - template - class basic_acceptor_service - : public boost::asio::io_service::service - { - public: - static boost::asio::io_service::id id; - - explicit basic_acceptor_service(boost::asio::io_service &io_service) - : boost::asio::io_service::service(io_service) - {} - - ~basic_acceptor_service() - {} - - typedef boost::shared_ptr implementation_type; - - void construct(implementation_type &impl) - { - impl.reset(new Implementation()); - } - - void destroy(implementation_type &impl) - { - impl->destroy(); - impl.reset(); - } - - void open(implementation_type &impl, boost::system::error_code &ec) - { - impl->open(ec); - boost::asio::detail::throw_error(ec); - } - - void close(implementation_type &impl, boost::system::error_code &ec) - { - impl->close(ec); - boost::asio::detail::throw_error(ec); - } - - void bind(implementation_type &impl, - boost::asio::ip::tcp::endpoint const & ep, - boost::system::error_code &ec) - { - impl->bind(ep, ec); - boost::asio::detail::throw_error(ec); - } - - // synchronous and asynchronous accept - template - void accept(implementation_type &impl, - basic_context & conn, boost::system::error_code &ec) - { - while (!impl->try_get_connect_request(conn, ec) && !ec) - /* just wait for operation to succeed */; - while (!impl->try_accept(conn, ec) && !ec) - /* just wait for operation to succeed */; - boost::asio::detail::throw_error(ec); - } - - template - void async_accept(implementation_type &impl, - basic_context & conn, Handler handler) - { - typedef detail::accept_operation - operation_type; - - boost::shared_ptr op( - boost::make_shared( - impl, this->get_io_service(), conn, handler)); - - this->get_io_service().post(boost::bind(&operation_type::call, op)); - } - - private: - void shutdown_service() - {} - }; - - template - boost::asio::io_service::id basic_acceptor_service::id; - - /////////////////////////////////////////////////////////////////////////// - class acceptor_impl + class acceptor { public: - acceptor_impl() + acceptor() : event_channel_(0), listener_(0), executing_operation_(0), @@ -263,7 +42,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs close_operation_(false) {} - ~acceptor_impl() + ~acceptor() { boost::system::error_code ec; close(ec); @@ -317,7 +96,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs return; } - std::string host = ep.address().to_v4().to_string(); + std::string host = ep.address().to_string(); std::string port = boost::lexical_cast(ep.port()); addrinfo *addr; @@ -424,18 +203,20 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs return get_next_event(event_channel_, event, this, ec); } - template - bool get_connect_request(basic_context & conn, boost::system::error_code &ec) + template + boost::shared_ptr accept( + Parcelport & parcelport, boost::system::error_code &ec) { + boost::shared_ptr rcv; if (close_operation_.load() || !event_channel_) { HPX_IBVERBS_THROWS_IF(ec, boost::asio::error::not_connected); - return false; + return rcv; } rdma_cm_event event; if(!get_next_rdma_event(event, ec)) { - return false; + return rcv; } if(event.event == RDMA_CM_EVENT_CONNECT_REQUEST) @@ -445,53 +226,48 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs cm_params.initiator_depth = cm_params.responder_resources = 1; cm_params.rnr_retry_count = 7; // infinite retry - conn.build_connection(event.id, ec); + rcv.reset(new receiver(parcelport)); + rcv->context().build_connection(event.id, ec); if(ec) { - return false; + rcv.reset(); + return rcv; } - conn.on_preconnect(event.id, ec); + rcv->context().on_preconnect(event.id, ec); if(ec) { - return false; + rcv.reset(); + return rcv; } rdma_accept(event.id, &cm_params); - return true; - } - return false; - } - - template - bool try_accept(basic_context & conn, boost::system::error_code &ec) - { - if (close_operation_.load() || !event_channel_) { - HPX_IBVERBS_THROWS_IF(ec, boost::asio::error::not_connected); - return false; + pending_recv_list.push_back(std::make_pair(event, rcv)); + rcv.reset(); + return rcv; } - rdma_cm_event event; - if(!get_next_rdma_event(event, ec)) - { - return false; - } - if(ec) - { - return false; - } if(event.event == RDMA_CM_EVENT_ESTABLISHED) { - conn.on_connection(event.id, ec); - if(ec) + for(pending_recv_list_type::iterator it = pending_recv_list.begin(); + it != pending_recv_list.end();) { - return false; + if(it->first.id == event.id) + { + rcv = it->second; + rcv->context().on_connection(event.id, ec); + it = pending_recv_list.erase(it); + break; + } + else + { + ++it; + } } - HPX_IBVERBS_RESET_EC(ec); - return true; + HPX_ASSERT(rcv); } - return false; + return rcv; } void on_disconnect(rdma_cm_id * id) @@ -505,12 +281,13 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs boost::atomic executing_operation_; boost::atomic aborted_; boost::atomic close_operation_; - }; - /////////////////////////////////////////////////////////////////////////// - typedef basic_acceptor< - basic_acceptor_service - > acceptor; + typedef + std::list > > + pending_recv_list_type; + + pending_recv_list_type pending_recv_list; + }; }}}} #endif diff --git a/hpx/runtime/parcelset/policies/ibverbs/connection_handler.hpp b/hpx/runtime/parcelset/policies/ibverbs/connection_handler.hpp index a271a5a27f56..e9bc521c8a96 100644 --- a/hpx/runtime/parcelset/policies/ibverbs/connection_handler.hpp +++ b/hpx/runtime/parcelset/policies/ibverbs/connection_handler.hpp @@ -27,7 +27,7 @@ namespace hpx { namespace parcelset { { typedef policies::ibverbs::sender connection_type; typedef boost::mpl::false_ send_early_parcel; - typedef boost::mpl::false_ do_background_work; + typedef boost::mpl::true_ do_background_work; typedef boost::mpl::false_ do_enable_parcel_handling; static const char * name() @@ -68,6 +68,8 @@ namespace hpx { namespace parcelset { /// Stop the handling of connections. void do_stop(); + void background_work(); + /// Retrieve the type of the locality represented by this parcelport connection_type get_type() const { @@ -79,20 +81,27 @@ namespace hpx { namespace parcelset { boost::shared_ptr create_connection( naming::locality const& l, error_code& ec); + + void add_sender(boost::shared_ptr const& sender_connection); private: // helper functions for receiving parcels - void handle_accept(boost::system::error_code const& e, - boost::shared_ptr); - void handle_read_completion(boost::system::error_code const& e, - boost::shared_ptr); + void handle_messages(); /// Acceptor used to listen for incoming connections. - acceptor* acceptor_; - - /// The list of accepted connections - typedef std::set > accepted_connections_set; - accepted_connections_set accepted_connections_; + acceptor acceptor_; + + typedef std::list > receivers_type; + receivers_type receivers_; + + hpx::lcos::local::spinlock senders_mtx_; + typedef std::list > senders_type; + senders_type senders_; + + boost::atomic stopped_; + boost::atomic handling_messages_; + + bool use_io_pool_; }; }} }} diff --git a/hpx/runtime/parcelset/policies/ibverbs/context.hpp b/hpx/runtime/parcelset/policies/ibverbs/context.hpp index 7e458091f1db..ba371e48981e 100644 --- a/hpx/runtime/parcelset/policies/ibverbs/context.hpp +++ b/hpx/runtime/parcelset/policies/ibverbs/context.hpp @@ -12,13 +12,11 @@ #include #include #include -#include #include #include #include #include -#include #include #include #include @@ -37,558 +35,6 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs { - struct basic_context_options - { - }; - - template - struct basic_context - : public boost::asio::basic_io_object, - public basic_context_options - { - public: - - basic_context( - boost::asio::io_service &io_service - ) - : boost::asio::basic_io_object(io_service) - { - } - - void create(boost::system::error_code &ec = boost::system::throws) - { - this->service.create(this->implementation, ec); - } - - void open(boost::system::error_code &ec = boost::system::throws) - { - this->service.open(this->implementation, ec); - } - - void close(boost::system::error_code &ec = boost::system::throws) - { - this->service.close(this->implementation, ec); - } - - void shutdown(boost::system::error_code &ec = boost::system::throws) - { - this->service.shutdown(this->implementation, ec); - } - - void bind( - boost::asio::ip::tcp::endpoint const & ep, - boost::system::error_code &ec = boost::system::throws) - { - this->service.bind(this->implementation, ep, ec); - } - - char * set_buffer_size(std::size_t buffer_size, boost::system::error_code &ec) - { - return this->service.set_buffer_size(this->implementation, buffer_size, ec); - } - - void build_connection(rdma_cm_id * id, boost::system::error_code &ec) - { - this->service.build_connection(this->implementation, id, ec); - } - - rdma_cm_id *conn_id() - { - return this->service.conn_id(this->implementation); - } - - void on_preconnect(rdma_cm_id * id, boost::system::error_code &ec) - { - this->service.on_preconnect(this->implementation, id, ec); - } - - void on_connection(rdma_cm_id * id, boost::system::error_code &ec) - { - this->service.on_connection(this->implementation, id, ec); - } - - void on_completion(ibv_wc * wc, boost::system::error_code &ec) - { - this->service.on_completion(this->implementation, wc, ec); - } - - void on_disconnect(rdma_cm_id * id) - { - this->service.on_disconnect(this->implementation, id); - } - - // synchronous and asynchronous connect - void connect( - boost::asio::ip::tcp::endpoint const & there, - boost::system::error_code &ec = boost::system::throws) - { - this->service.connect(this->implementation, there, ec); - } - - template - void async_connect( - boost::asio::ip::tcp::endpoint const & there, - Handler handler) - { - this->service.async_connect(this->implementation, there, handler); - } - - // synchronous and asynchronous read/write/read_ack/write_ack - void read(std::vector& data, - boost::system::error_code &ec = boost::system::throws) - { - this->service.read(this->implementation, data, ec); - } - - std::size_t write(data_buffer const& data, - boost::system::error_code &ec = boost::system::throws) - { - return this->service.write(this->implementation, data, ec); - } - - template - void async_read(data_buffer & data, Handler handler) - { - this->service.async_read(this->implementation, data, handler); - } - - void read_ack(boost::system::error_code &ec = boost::system::throws) - { - this->service.read_ack(this->implementation, ec); - } - - template - void async_read_ack(Handler handler) - { - this->service.async_read_ack(this->implementation, handler); - } - - template - void async_write(data_buffer const & data, Handler handler) - { - this->service.async_write(this->implementation, data, handler); - } - - template - void async_write_ack(Handler handler) - { - this->service.async_write_ack(this->implementation, handler); - } - }; - - /////////////////////////////////////////////////////////////////////////// - namespace detail - { - template - class connect_operation - { - typedef boost::shared_ptr implementation_type; - - public: - connect_operation(implementation_type &impl, - boost::asio::io_service &io_service, - boost::asio::ip::tcp::endpoint const & there, Handler handler) - : impl_(impl), - io_service_(io_service), - there_(there), - handler_(handler) - {} - - void operator()() const - { - implementation_type impl = impl_.lock(); - if (impl) - { - boost::system::error_code ec; - impl->connect(there_, ec); - io_service_.post(boost::asio::detail::bind_handler( - handler_, ec)); - } - else - { - io_service_.post(boost::asio::detail::bind_handler( - handler_, boost::asio::error::operation_aborted)); - } - } - - private: - boost::weak_ptr impl_; - boost::asio::io_service &io_service_; - boost::asio::ip::tcp::endpoint const& there_; - Handler handler_; - }; - - /////////////////////////////////////////////////////////////////////// - template - class read_operation - : public boost::enable_shared_from_this< - read_operation > - { - typedef boost::shared_ptr implementation_type; - - public: - read_operation(implementation_type &impl, - boost::asio::io_service &io_service, - data_buffer & data, Handler handler) - : impl_(impl), - io_service_(io_service), - data_(data), - handler_(handler) - {} - - void call() const - { - implementation_type impl = impl_.lock(); - if (impl) - { - boost::system::error_code ec; - - std::size_t size = impl->try_read_data(data_, ec); - if (size == 0 && !ec) { - // repost this handler - io_service_.post(boost::bind( - &read_operation::call, this->shared_from_this())); - } - else if(!ec) { - // post completion handler - io_service_.post(boost::asio::detail::bind_handler( - handler_, ec, size)); - /* - handler_(ec, size); - */ - } - } - else - { - io_service_.post(boost::asio::detail::bind_handler( - handler_, boost::asio::error::operation_aborted, 0)); - } - } - - private: - - boost::weak_ptr impl_; - boost::asio::io_service &io_service_; - data_buffer & data_; - Handler handler_; - }; - - /////////////////////////////////////////////////////////////////////// - template - class read_ack_operation - : public boost::enable_shared_from_this< - read_ack_operation > - { - typedef boost::shared_ptr implementation_type; - - public: - read_ack_operation(implementation_type &impl, - boost::asio::io_service &io_service, - Handler handler) - : impl_(impl), - io_service_(io_service), - handler_(handler) - {} - - void call() const - { - implementation_type impl = impl_.lock(); - if (impl) - { - boost::system::error_code ec; - - bool ack = impl->try_read_ack(ec); - if (!ack && !ec) { - // repost this handler - io_service_.post(boost::bind( - &read_ack_operation::call, this->shared_from_this())); - } - else if(!ec) { - // post completion handler - io_service_.post(boost::asio::detail::bind_handler( - handler_, ec)); - /* - handler_(ec); - */ - } - } - else - { - io_service_.post(boost::asio::detail::bind_handler( - handler_, boost::asio::error::operation_aborted)); - } - } - - private: - - boost::weak_ptr impl_; - boost::asio::io_service &io_service_; - Handler handler_; - }; - - /////////////////////////////////////////////////////////////////////// - template - class write_operation - { - typedef boost::shared_ptr implementation_type; - - public: - write_operation(implementation_type &impl, - boost::asio::io_service &io_service, - data_buffer const& data, Handler handler) - : impl_(impl), - io_service_(io_service), - data_(data), - handler_(handler) - {} - - void operator()() const - { - implementation_type impl = impl_.lock(); - if (impl) - { - boost::system::error_code ec; - std::size_t size = impl->write(data_, ec); - io_service_.post(boost::asio::detail::bind_handler( - handler_, ec, size)); - /* - handler_(ec, size); - */ - } - else - { - io_service_.post(boost::asio::detail::bind_handler( - handler_, boost::asio::error::operation_aborted, 0)); - } - } - - private: - boost::weak_ptr impl_; - boost::asio::io_service &io_service_; - data_buffer const& data_; - Handler handler_; - }; - - /////////////////////////////////////////////////////////////////////// - template - class write_ack_operation - { - typedef boost::shared_ptr implementation_type; - - public: - write_ack_operation(implementation_type &impl, - boost::asio::io_service &io_service, - Handler handler) - : impl_(impl), - io_service_(io_service), - handler_(handler) - {} - - void operator()() const - { - implementation_type impl = impl_.lock(); - if (impl) - { - boost::system::error_code ec; - impl->write_ack(ec); - io_service_.post(boost::asio::detail::bind_handler( - handler_, ec)); - /* - handler_(ec); - */ - } - else - { - io_service_.post(boost::asio::detail::bind_handler( - handler_, boost::asio::error::operation_aborted)); - } - } - - private: - boost::weak_ptr impl_; - boost::asio::io_service &io_service_; - Handler handler_; - }; - } - - /////////////////////////////////////////////////////////////////////////// - template - class basic_context_service - : public boost::asio::io_service::service - { - public: - typedef HPX_STD_FUNCTION callback_function; - - static boost::asio::io_service::id id; - - explicit basic_context_service(boost::asio::io_service &io_service) - : boost::asio::io_service::service(io_service) - {} - - ~basic_context_service() - {} - - typedef boost::shared_ptr implementation_type; - - /////////////////////////////////////////////////////////////////////// - void construct(implementation_type &impl) - { - impl.reset(new Implementation()); - } - - void destroy(implementation_type &impl) - { - impl->destroy(); - impl.reset(); - } - - /////////////////////////////////////////////////////////////////////// - void create(implementation_type &impl, boost::system::error_code &ec) - { - impl->create(false, ec); // create only - } - - void open(implementation_type &impl, boost::system::error_code &ec) - { - impl->open(ec); - } - - void close(implementation_type &impl, boost::system::error_code &ec) - { - impl->close(ec); - } - - void shutdown(implementation_type &impl, boost::system::error_code &ec) - { - impl->shutdown(ec); - } - - char * set_buffer_size(implementation_type &impl, std::size_t buffer_size, boost::system::error_code &ec) - { - return impl->set_buffer_size(buffer_size, ec); - } - - void build_connection(implementation_type &impl, rdma_cm_id * id, boost::system::error_code &ec) - { - impl->build_connection(id, ec); - } - - rdma_cm_id *conn_id(implementation_type &impl) - { - return impl->conn_id(); - } - - void on_preconnect(implementation_type &impl, rdma_cm_id * id, boost::system::error_code &ec) - { - return impl->on_preconnect(id, ec); - } - - void on_connection(implementation_type &impl, rdma_cm_id * id, boost::system::error_code &ec) - { - return impl->on_connection(id, ec); - } - - void on_completion(implementation_type &impl, ibv_wc * wc, boost::system::error_code &ec) - { - return impl->on_completion(wc, ec); - } - - void on_disconnect(implementation_type &impl, rdma_cm_id * id) - { - return impl->on_disconnect(id); - } - - void bind(implementation_type &impl, - boost::asio::ip::tcp::endpoint const & ep, - boost::system::error_code &ec) - { - impl->bind(ep, ec); - } - - // synchronous and asynchronous connect - void connect(implementation_type &impl, - boost::asio::ip::tcp::endpoint const & there, - boost::system::error_code &ec) - { - impl->connect(there, ec); - } - - template - void async_connect(implementation_type &impl, - boost::asio::ip::tcp::endpoint const & there, - Handler handler) - { - this->get_io_service().post( - detail::connect_operation( - impl, this->get_io_service(), there, handler)); - } - - std::size_t write(implementation_type &impl, data_buffer const& data, - boost::system::error_code &ec) - { - return impl->write(data, ec); - } - - template - void async_read(implementation_type &impl, data_buffer & data, - Handler handler) - { - typedef detail::read_operation operation_type; - - boost::shared_ptr op( - boost::make_shared( - impl, this->get_io_service(), data, handler)); - - this->get_io_service().post(boost::bind( - &operation_type::call, op)); - } - - void read_ack(implementation_type & impl, boost::system::error_code &ec) - { - while(!impl->try_read_ack(ec)) - ; - } - - template - void async_read_ack(implementation_type &impl, - Handler handler) - { - typedef detail::read_ack_operation operation_type; - - boost::shared_ptr op( - boost::make_shared( - impl, this->get_io_service(), handler)); - - this->get_io_service().post(boost::bind( - &operation_type::call, op)); - } - - template - void async_write(implementation_type &impl, data_buffer const& data, - Handler handler) - { - this->get_io_service().post( - detail::write_operation( - impl, this->get_io_service(), data, handler)); - } - - template - void async_write_ack(implementation_type &impl, Handler handler) - { - this->get_io_service().post( - detail::write_ack_operation( - impl, this->get_io_service(), handler)); - } - - - private: - void shutdown_service() - {} - }; - - template - boost::asio::io_service::id basic_context_service::id; - template struct next_wc; @@ -1426,13 +872,9 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs }; /////////////////////////////////////////////////////////////////////////// - typedef basic_context< - basic_context_service > - > client_context; + typedef context_impl client_context; - typedef basic_context< - basic_context_service > - > server_context; + typedef context_impl server_context; }}}} #endif diff --git a/hpx/runtime/parcelset/policies/ibverbs/ibverbs_errors.hpp b/hpx/runtime/parcelset/policies/ibverbs/ibverbs_errors.hpp index dae1cbfc0f69..1958138b0893 100644 --- a/hpx/runtime/parcelset/policies/ibverbs/ibverbs_errors.hpp +++ b/hpx/runtime/parcelset/policies/ibverbs/ibverbs_errors.hpp @@ -67,6 +67,10 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs else boost::asio::detail::throw_error(code); \ /**/ +#define HPX_IBVERBS_THROWS(code) \ + boost::asio::detail::throw_error(code) \ +/**/ + #define HPX_IBVERBS_RESET_EC(ec) \ if (&ec != &boost::system::throws) ec = boost::system::error_code(); \ /**/ diff --git a/hpx/runtime/parcelset/policies/ibverbs/receiver.hpp b/hpx/runtime/parcelset/policies/ibverbs/receiver.hpp index 9b81bc735eea..20fd88449ca0 100644 --- a/hpx/runtime/parcelset/policies/ibverbs/receiver.hpp +++ b/hpx/runtime/parcelset/policies/ibverbs/receiver.hpp @@ -4,6 +4,10 @@ // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +#ifndef HPX_PARCELSET_POLICIES_IBVERBS_RECEIVER_HPP +#define HPX_PARCELSET_POLICIES_IBVERBS_RECEIVER_HPP + + #include #include #include @@ -28,13 +32,17 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs class receiver : public parcelport_connection > { + typedef bool(receiver::*next_function_type)(); public: /// Construct a listening parcelport_connection with the given io_service. - receiver(boost::asio::io_service& io_service, - connection_handler& parcelport) - : context_(io_service), - parcelport_(parcelport) + receiver(connection_handler& parcelport) + : parcelport_(parcelport) { + boost::system::error_code ec; + std::string buffer_size_str = get_config_entry("hpx.parcel.ibverbs.buffer_size", "4096"); + + buffer_size_ = boost::lexical_cast(buffer_size_str); + mr_buffer_ = context_.set_buffer_size(buffer_size_, ec); } ~receiver() @@ -48,14 +56,8 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs { if(!buffer_ || (buffer_ && !buffer_->parcels_decoded_)) { - boost::system::error_code ec; - std::string buffer_size_str = get_config_entry("hpx.parcel.ibverbs.buffer_size", "4096"); - - std::size_t buffer_size = boost::lexical_cast(buffer_size_str); - char * mr_buffer = context_.set_buffer_size(buffer_size, ec); - buffer_ = boost::shared_ptr(new parcel_buffer_type()); - buffer_->data_.set_mr_buffer(mr_buffer, buffer_size); + buffer_->data_.set_mr_buffer(mr_buffer_, buffer_size_); } return buffer_; } @@ -64,10 +66,10 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs server_context& context() { return context_; } /// Asynchronously read a data structure from the socket. - template - void async_read(Handler handler) + void async_read() { buffer_ = get_buffer(); + buffer_->clear(); // Store the time of the begin of the read operation buffer_->data_point_.time_ = timer_.elapsed_nanoseconds(); @@ -75,62 +77,54 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs buffer_->data_point_.bytes_ = 0; buffer_->data_point_.num_parcels_ = 0; - // Issue a read operation to read the parcel priority and size. - void (receiver::*f)(boost::system::error_code const&, - boost::tuple) - = &receiver::handle_read_data; - - context_.async_read(buffer_->data_, - boost::bind(f, shared_from_this(), - boost::asio::placeholders::error, - boost::make_tuple(handler))); + next(&receiver::read_data); } - - private: - /// Handle a completed read of message data. - template - void handle_read_data(boost::system::error_code const& e, - boost::tuple handler) + + bool done(connection_handler & pp) { - if (e) { - boost::get<0>(handler)(e); - - // Issue a read operation to read the next parcel. - async_read(boost::get<0>(handler)); - } - else { - // complete data point and pass it along + HPX_ASSERT(next_ != 0); + if(((*this).*next_)()) + { + // take measurement of overall receive time buffer_->data_point_.time_ = timer_.elapsed_nanoseconds() - buffer_->data_point_.time_; - // now send acknowledgment message - void (receiver::*f)(boost::system::error_code const&, - boost::tuple) - = &receiver::handle_write_ack; - // decode the received parcels. - decode_parcels(parcelport_, shared_from_this(), buffer_); + decode_parcels(pp, shared_from_this(), buffer_); + return true; + } + return false; + } - // acknowledge to have received the parcel - context_.async_write_ack( - boost::bind(f, shared_from_this(), - boost::asio::placeholders::error, handler)); + private: + bool read_data() + { + std::size_t size = context_.try_read_data(buffer_->data_, boost::system::throws); + if(size == 0) + { + return next(&receiver::read_data); } + return next(&receiver::write_ack); } - template - void handle_write_ack(boost::system::error_code const& e, - boost::tuple handler) + bool write_ack() { - // Inform caller that data has been received ok. - boost::get<0>(handler)(e); + context_.write_ack(boost::system::throws); + next(0); + return true; + } - // Issue a read operation to handle the next parcel. - async_read(boost::get<0>(handler)); + bool next(next_function_type f) + { + next_ = f; + return false; } /// Data window for the receiver. server_context context_; + std::size_t buffer_size_; + char * mr_buffer_; + next_function_type next_; /// The handler used to process the incoming request. connection_handler& parcelport_; @@ -146,3 +140,5 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs return lhs.get() < rhs.get(); } }}}} + +#endif diff --git a/hpx/runtime/parcelset/policies/ibverbs/sender.hpp b/hpx/runtime/parcelset/policies/ibverbs/sender.hpp index 682a7aa864f3..05846c1f0525 100644 --- a/hpx/runtime/parcelset/policies/ibverbs/sender.hpp +++ b/hpx/runtime/parcelset/policies/ibverbs/sender.hpp @@ -20,16 +20,37 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs { + class connection_handler; + void add_sender(connection_handler & handler, + boost::shared_ptr const& sender_connection); + class sender : public parcelset::parcelport_connection { + typedef bool(sender::*next_function_type)(); public: - sender(boost::asio::io_service& io_service, - naming::locality const& there, + typedef + HPX_STD_FUNCTION + handler_function_type; + typedef + HPX_STD_FUNCTION< + void( + boost::system::error_code const & + , naming::locality const& + , boost::shared_ptr + ) + > + postprocess_function_type; + + sender(connection_handler & handler, naming::locality const& there, performance_counters::parcels::gatherer& parcels_sent) - : context_(io_service), - there_(there), parcels_sent_(parcels_sent) + : parcelport_(handler), there_(there), parcels_sent_(parcels_sent) { + boost::system::error_code ec; + std::string buffer_size_str = get_config_entry("hpx.parcel.ibverbs.buffer_size", "4096"); + + buffer_size_ = boost::lexical_cast(buffer_size_str); + mr_buffer_ = context_.set_buffer_size(buffer_size_, ec); } ~sender() @@ -57,14 +78,8 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs { if(!buffer_ || (buffer_ && !buffer_->parcels_decoded_)) { - boost::system::error_code ec; - std::string buffer_size_str = get_config_entry("hpx.parcel.ibverbs.buffer_size", "4096"); - - std::size_t buffer_size = boost::lexical_cast(buffer_size_str); - char * mr_buffer = context_.set_buffer_size(buffer_size, ec); - buffer_ = boost::shared_ptr(new parcel_buffer_type()); - buffer_->data_.set_mr_buffer(mr_buffer, buffer_size); + buffer_->data_.set_mr_buffer(mr_buffer_, buffer_size_); } return buffer_; } @@ -76,59 +91,74 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs /// Increment sends and begin timer. buffer_->data_point_.time_ = timer_.elapsed_nanoseconds(); - void (sender::*f)(boost::system::error_code const&, std::size_t, - boost::tuple) - = &sender::handle_write; - - context_.async_write(buffer_->data_, - boost::bind(f, shared_from_this(), - boost::asio::placeholders::error, ::_2, - boost::make_tuple(handler, parcel_postprocess))); + handler_ = handler; + postprocess_ = parcel_postprocess; + + next(&sender::send_data); + add_sender(parcelport_, shared_from_this()); + } + + bool done() + { + next_function_type f = 0; + { + hpx::lcos::local::spinlock::scoped_lock l(mtx_); + f = next_; + } + if(f != 0) + { + if(((*this).*f)()) + { + error_code ec; + handler_(ec, buffer_->data_.size()); + buffer_->data_point_.time_ = timer_.elapsed_nanoseconds() + - buffer_->data_point_.time_; + parcels_sent_.add_data(buffer_->data_point_); + postprocess_function_type pp; + std::swap(pp, postprocess_); + pp(ec, there_, shared_from_this()); + return true; + } + } + return false; } private: - /// handle completed write operation - template - void handle_write(boost::system::error_code const& e, std::size_t bytes, - boost::tuple handler) + bool send_data() { - // just call initial handler - boost::get<0>(handler)(e, bytes); - - // complete data point and push back onto gatherer - buffer_->data_point_.time_ = - timer_.elapsed_nanoseconds() - buffer_->data_point_.time_; - parcels_sent_.add_data(buffer_->data_point_); - - // now we can give this connection back to the cache - buffer_->clear(); - - buffer_->data_point_.bytes_ = 0; - buffer_->data_point_.time_ = 0; - buffer_->data_point_.serialization_time_ = 0; - buffer_->data_point_.num_parcels_ = 0; - - // now handle the acknowledgement byte which is sent by the receiver - void (sender::*f)(boost::system::error_code const&, - boost::tuple) - = &sender::handle_read_ack; + context_.write(buffer_->data_, boost::system::throws); + return next(&sender::read_ack); + } - context_.async_read_ack(boost::bind(f, shared_from_this(), - boost::asio::placeholders::error, handler)); + bool read_ack() + { + if(context_.try_read_ack(boost::system::throws)) + { + next(0); + return true; + } + return next(&sender::read_ack); } - template - void handle_read_ack(boost::system::error_code const& e, - boost::tuple handler) + bool next(next_function_type f) { - // Call post-processing handler, which will send remaining pending - // parcels. Pass along the connection so it can be reused if more - // parcels have to be sent. - boost::get<1>(handler)(e, there_, shared_from_this()); + hpx::lcos::local::spinlock::scoped_lock l(mtx_); + next_ = f; + return false; } /// Context for the parcelport_connection. client_context context_; + std::size_t buffer_size_; + char * mr_buffer_; + + hpx::lcos::local::spinlock mtx_; + next_function_type next_; + + handler_function_type handler_; + postprocess_function_type postprocess_; + + connection_handler & parcelport_; /// the other (receiving) end of this connection naming::locality there_; diff --git a/src/runtime/parcelset/policies/ibverbs/connection_handler_ibverbs.cpp b/src/runtime/parcelset/policies/ibverbs/connection_handler_ibverbs.cpp index 5f2c9261229a..7713c7a06d4d 100644 --- a/src/runtime/parcelset/policies/ibverbs/connection_handler_ibverbs.cpp +++ b/src/runtime/parcelset/policies/ibverbs/connection_handler_ibverbs.cpp @@ -34,7 +34,9 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs using namespace boost::assign; lines += - "buffer_size = ${HPX_PARCEL_IBVERBS_BUFFER_SIZE:65536}" + "buffer_size = ${HPX_PARCEL_IBVERBS_BUFFER_SIZE:65536}", + "io_pool_size = 1", + "use_io_pool = 1" ; return lines; @@ -44,27 +46,29 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs HPX_STD_FUNCTION const& on_start_thread, HPX_STD_FUNCTION const& on_stop_thread) : base_type(ini, on_start_thread, on_stop_thread) - , acceptor_(0) + , stopped_(false) + , handling_messages_(false) + , use_io_pool_(true) { // we never do zero copy optimization for this parcelport allow_zero_copy_optimizations_ = false; + + std::string use_io_pool = + ini.get_entry("hpx.parcel.mpi.use_io_pool", "1"); + if(boost::lexical_cast(use_io_pool) == 0) + { + use_io_pool_ = false; + } } connection_handler::~connection_handler() { - if (NULL != acceptor_) { - boost::system::error_code ec; - acceptor_->close(ec); - delete acceptor_; - acceptor_ = 0; - } + boost::system::error_code ec; + acceptor_.close(ec); } bool connection_handler::do_run() { - if (NULL == acceptor_) - acceptor_ = new acceptor(io_service_pool_.get_io_service(0)); - // initialize network std::size_t tried = 0; exception_list errors; @@ -74,19 +78,11 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs it != end; ++it, ++tried) { try { - boost::shared_ptr conn( - new receiver( - io_service_pool_.get_io_service(), *this)); - conn->get_buffer(); - boost::asio::ip::tcp::endpoint ep = *it; - - acceptor_->bind(ep); - - acceptor_->async_accept(conn->context(), - boost::bind(&connection_handler::handle_accept, - this, - boost::asio::placeholders::error, conn)); + + acceptor_.bind(ep, boost::system::throws); + ++tried; + break; } catch (boost::system::system_error const& e) { errors.add(e); // store all errors @@ -100,31 +96,51 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs "ibverbs::connection_handler::run", errors.get_message()); return false; } + background_work(); return true; } void connection_handler::do_stop() { + // Mark stopped state + stopped_ = true; + // Wait until message handler returns + std::size_t k = 0; + + // cancel all pending accept operations + boost::system::error_code ec; + acceptor_.close(ec); + + while(handling_messages_) { - // cancel all pending read operations, close those sockets - lcos::local::spinlock::scoped_lock l(mtx_); - BOOST_FOREACH(boost::shared_ptr c, accepted_connections_) - { - boost::system::error_code ec; - server_context& ctx = c->context(); - ctx.shutdown(ec); // shut down connection - ctx.close(ec); // close the data window to give it back to the OS - } - accepted_connections_.clear(); + hpx::lcos::local::spinlock::yield(k); + ++k; } + } - // cancel all pending accept operations - if (NULL != acceptor_) + // Make sure all pending requests are handled + void connection_handler::background_work() + { + if (stopped_) + return; + + // Atomically set handling_messages_ to true, if another work item hasn't + // started executing before us. + bool false_ = false; + if (!handling_messages_.compare_exchange_strong(false_, true)) + return; + + if(!hpx::is_starting() && !use_io_pool_) { - boost::system::error_code ec; - acceptor_->close(ec); - delete acceptor_; - acceptor_ = NULL; + hpx::applier::register_thread_nullary( + HPX_STD_BIND(&connection_handler::handle_messages, this), + "mpi::connection_handler::handle_messages", + threads::pending, true, threads::thread_priority_critical); + } + else + { + boost::asio::io_service& io_service = io_service_pool_.get_io_service(); + io_service.post(HPX_STD_BIND(&connection_handler::handle_messages, this)); } } @@ -137,8 +153,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs naming::locality const& l, error_code& ec) { boost::asio::io_service& io_service = io_service_pool_.get_io_service(); - boost::shared_ptr sender_connection(new sender( - io_service, l, parcels_sent_)); + boost::shared_ptr sender_connection(new sender(*this, l, parcels_sent_)); // Connect to the target locality, retry if needed boost::system::error_code error = boost::asio::error::try_again; @@ -153,8 +168,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs boost::asio::ip::tcp::endpoint const& ep = *it; client_context& ctx = sender_connection->context(); - sender_connection->get_buffer(parcel(), 0); - ctx.close(); + ctx.close(ec); ctx.connect(ep, error); if (!error) break; @@ -174,7 +188,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs } } catch (boost::system::system_error const& e) { - sender_connection->context().close(); + sender_connection->context().close(ec); sender_connection.reset(); HPX_THROWS_IF(ec, network_error, @@ -184,7 +198,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs } if (error) { - sender_connection->context().close(); + sender_connection->context().close(ec); sender_connection.reset(); hpx::util::osstream strm; @@ -199,66 +213,128 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs if (&ec != &throws) ec = make_success_code(); - + return sender_connection; } - // accepted new incoming connection - void connection_handler::handle_accept(boost::system::error_code const & e, - boost::shared_ptr conn) + void connection_handler::add_sender( + boost::shared_ptr const& sender_connection) { - if (!e) { - // handle this incoming parcel - boost::shared_ptr c(conn); // hold on to receiver_conn + hpx::lcos::local::spinlock::scoped_lock l(senders_mtx_); + senders_.push_back(sender_connection); + } - // create new connection waiting for next incoming parcel - conn.reset(new receiver( - io_service_pool_.get_io_service(), *this)); - conn->get_buffer(); + void add_sender(connection_handler & handler, + boost::shared_ptr const& sender_connection) + { + handler.add_sender(sender_connection); + } - acceptor_->async_accept(conn->context(), - boost::bind(&connection_handler::handle_accept, - this, - boost::asio::placeholders::error, conn)); + namespace detail + { + struct handling_messages + { + handling_messages(boost::atomic& handling_messages_flag) + : handling_messages_(handling_messages_flag) + {} + + ~handling_messages() { - // keep track of all the accepted connections - lcos::local::spinlock::scoped_lock l(mtx_); - accepted_connections_.insert(c); + handling_messages_.store(false); } - // now accept the incoming connection by starting to read from the - // context - c->async_read(boost::bind(&connection_handler::handle_read_completion, - this, boost::asio::placeholders::error, c)); - } - else { - // remove this connection from the list of known connections - lcos::local::spinlock::scoped_lock l(mtx_); - accepted_connections_.erase(conn); - } + boost::atomic& handling_messages_; + }; } - // Handle completion of a read operation. - void connection_handler::handle_read_completion( - boost::system::error_code const& e, - boost::shared_ptr receiver_conn) + void connection_handler::handle_messages() { - if (!e) return; - - if (e != boost::asio::error::operation_aborted && - e != boost::asio::error::eof) + detail::handling_messages hm(handling_messages_); // reset on exit + + bool bootstrapping = hpx::is_starting(); + bool has_work = true; + std::size_t k = 0; + + hpx::util::high_resolution_timer t; + // We let the message handling loop spin for another 2 seconds to avoid the + // costs involved with posting it to asio + while(bootstrapping || (!stopped_ && has_work) || (!has_work && t.elapsed() < 2.0)) { - LPT_(error) - << "handle read operation completion: error: " - << e.message(); + // handle all sends ... + { + hpx::lcos::local::spinlock::scoped_lock l(senders_mtx_); + for( + senders_type::iterator it = senders_.begin(); + !stopped_ && enable_parcel_handling_ && it != senders_.end(); + /**/) + { + if((*it)->done()) + { + it = senders_.erase(it); + } + else + { + ++it; + } + } + has_work = !senders_.empty(); + } + // handle all receives ... + for( + receivers_type::iterator it = receivers_.begin(); + !stopped_ && enable_parcel_handling_ && it != receivers_.end(); + /**/) + { + try + { + if((*it)->done(*this)) + { + (*it)->async_read(); + } + } + catch(boost::system::system_error const& e) + { + if(e.code() == boost::asio::error::eof + || e.code() == boost::asio::error::operation_aborted) + { + it = receivers_.erase(it); + continue; + } + throw; + } + ++it; + } + + // handle all accepts ... + boost::shared_ptr rcv = acceptor_.accept(*this, boost::system::throws); + if(rcv) + { + rcv->async_read(); + receivers_.push_back(rcv); + } + + if (bootstrapping) + bootstrapping = hpx::is_starting(); + + if(has_work) + { + t.restart(); + k = 0; + } + else + { + if(enable_parcel_handling_) + { + hpx::lcos::local::spinlock::yield(k); + ++k; + } + } } -// if (e != boost::asio::error::eof) + + if(stopped_ == true) { - // remove this connection from the list of known connections - lcos::local::spinlock::scoped_lock l(mtx_); - accepted_connections_.erase(receiver_conn); } } }}}}