Skip to content

Commit

Permalink
Refactor of ibverbs parcelport complete (first step to fix #839)
Browse files Browse the repository at this point in the history
  • Loading branch information
sithhell authored and hkaiser committed Mar 19, 2014
1 parent b2aa3a7 commit 50c5e8f
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 1,028 deletions.
307 changes: 42 additions & 265 deletions hpx/runtime/parcelset/policies/ibverbs/acceptor.hpp
Expand Up @@ -10,6 +10,7 @@
#include <hpx/hpx_fwd.hpp>
#include <hpx/runtime/parcelset/policies/ibverbs/ibverbs_errors.hpp>
#include <hpx/runtime/parcelset/policies/ibverbs/context.hpp>
#include <hpx/runtime/parcelset/policies/ibverbs/receiver.hpp>
#include <hpx/runtime/parcelset/policies/ibverbs/helper.hpp>
#include <hpx/util/io_service_pool.hpp>

Expand All @@ -30,240 +31,18 @@
namespace hpx { namespace parcelset { namespace policies { namespace ibverbs
{
///////////////////////////////////////////////////////////////////////////
//
struct basic_acceptor_options
{
template <typename T>
struct option
{
/*
option(T const& num) : val_(num) {}
T val_;
*/
};

/*
typedef option<std::size_t> msg_num;
typedef option<bool> manage;
*/
};

///////////////////////////////////////////////////////////////////////////
template <typename Service>
class basic_acceptor
: public boost::asio::basic_io_object<Service>,
public basic_acceptor_options
{
public:
explicit basic_acceptor(boost::asio::io_service &io_service)
: boost::asio::basic_io_object<Service>(io_service)
{
}

void open(boost::system::error_code &ec = boost::system::throws)
{
this->service.open(this->implementation, ec);
}

void close(boost::system::error_code &ec = boost::system::throws)
{
this->service.close(this->implementation, ec);
}

void bind(
boost::asio::ip::tcp::endpoint const & ep,
boost::system::error_code &ec = boost::system::throws)
{
this->service.bind(this->implementation, ep, ec);
}

// synchronous and asynchronous accept
template <typename Service_>
void accept(basic_context<Service_> & conn,
boost::system::error_code &ec = boost::system::throws)
{
return this->service.accept(this->implementation, conn, ec);
}

template <typename Service_, typename Handler>
void async_accept(basic_context<Service_> & conn, Handler handler)
{
this->service.async_accept(this->implementation, conn, handler);
}
};

///////////////////////////////////////////////////////////////////////////
namespace detail
{
///////////////////////////////////////////////////////////////////////
template <typename Handler, typename Implementation, typename Service>
class accept_operation
: public boost::enable_shared_from_this<
accept_operation<Handler, Implementation, Service> >
{
typedef boost::shared_ptr<Implementation> implementation_type;

public:
accept_operation(implementation_type &impl,
boost::asio::io_service &io_service,
basic_context<Service> & conn, Handler handler)
: impl_(impl),
io_service_(io_service),
conn_(conn),
handler_(handler)
{}

void call() const
{
implementation_type impl = impl_.lock();
if (impl)
{
boost::system::error_code ec;
if (!impl->get_connect_request(conn_, ec) && !ec) {
// repost this function
io_service_.post(boost::bind(
&accept_operation::call, this->shared_from_this()));
}
else {
io_service_.post(boost::bind(
&accept_operation::try_accept, this->shared_from_this()));
}
}
else
{
io_service_.post(
boost::asio::detail::bind_handler(handler_,
boost::asio::error::operation_aborted));
}
}

private:
void try_accept() const
{
implementation_type impl = impl_.lock();
if (impl)
{
boost::system::error_code ec;
if (!impl->try_accept(conn_, ec) && !ec) {
// repost this function
io_service_.post(boost::bind(
&accept_operation::try_accept, this->shared_from_this()));
}
else {
io_service_.post(
boost::asio::detail::bind_handler(handler_, ec));
}
}
else
{
io_service_.post(
boost::asio::detail::bind_handler(handler_,
boost::asio::error::operation_aborted));
}
}

boost::weak_ptr<Implementation> impl_;
boost::asio::io_service &io_service_;
basic_context<Service> & conn_;
Handler handler_;
};
}

///////////////////////////////////////////////////////////////////////////
template <typename Implementation>
class basic_acceptor_service
: public boost::asio::io_service::service
{
public:
static boost::asio::io_service::id id;

explicit basic_acceptor_service(boost::asio::io_service &io_service)
: boost::asio::io_service::service(io_service)
{}

~basic_acceptor_service()
{}

typedef boost::shared_ptr<Implementation> implementation_type;

void construct(implementation_type &impl)
{
impl.reset(new Implementation());
}

void destroy(implementation_type &impl)
{
impl->destroy();
impl.reset();
}

void open(implementation_type &impl, boost::system::error_code &ec)
{
impl->open(ec);
boost::asio::detail::throw_error(ec);
}

void close(implementation_type &impl, boost::system::error_code &ec)
{
impl->close(ec);
boost::asio::detail::throw_error(ec);
}

void bind(implementation_type &impl,
boost::asio::ip::tcp::endpoint const & ep,
boost::system::error_code &ec)
{
impl->bind(ep, ec);
boost::asio::detail::throw_error(ec);
}

// synchronous and asynchronous accept
template <typename Service>
void accept(implementation_type &impl,
basic_context<Service> & conn, boost::system::error_code &ec)
{
while (!impl->try_get_connect_request(conn, ec) && !ec)
/* just wait for operation to succeed */;
while (!impl->try_accept(conn, ec) && !ec)
/* just wait for operation to succeed */;
boost::asio::detail::throw_error(ec);
}

template <typename Service, typename Handler>
void async_accept(implementation_type &impl,
basic_context<Service> & conn, Handler handler)
{
typedef detail::accept_operation<Handler, Implementation, Service>
operation_type;

boost::shared_ptr<operation_type> op(
boost::make_shared<operation_type>(
impl, this->get_io_service(), conn, handler));

this->get_io_service().post(boost::bind(&operation_type::call, op));
}

private:
void shutdown_service()
{}
};

template <typename Implementation>
boost::asio::io_service::id basic_acceptor_service<Implementation>::id;

///////////////////////////////////////////////////////////////////////////
class acceptor_impl
class acceptor
{
public:
acceptor_impl()
acceptor()
: event_channel_(0),
listener_(0),
executing_operation_(0),
aborted_(false),
close_operation_(false)
{}

~acceptor_impl()
~acceptor()
{
boost::system::error_code ec;
close(ec);
Expand Down Expand Up @@ -317,7 +96,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs
return;
}

std::string host = ep.address().to_v4().to_string();
std::string host = ep.address().to_string();
std::string port = boost::lexical_cast<std::string>(ep.port());

addrinfo *addr;
Expand Down Expand Up @@ -424,18 +203,20 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs
return get_next_event(event_channel_, event, this, ec);
}

template <typename Service>
bool get_connect_request(basic_context<Service> & conn, boost::system::error_code &ec)
template <typename Parcelport>
boost::shared_ptr<receiver> accept(
Parcelport & parcelport, boost::system::error_code &ec)
{
boost::shared_ptr<receiver> rcv;
if (close_operation_.load() || !event_channel_) {
HPX_IBVERBS_THROWS_IF(ec, boost::asio::error::not_connected);
return false;
return rcv;
}

rdma_cm_event event;
if(!get_next_rdma_event(event, ec))
{
return false;
return rcv;
}

if(event.event == RDMA_CM_EVENT_CONNECT_REQUEST)
Expand All @@ -445,53 +226,48 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs
cm_params.initiator_depth = cm_params.responder_resources = 1;
cm_params.rnr_retry_count = 7; // infinite retry

conn.build_connection(event.id, ec);
rcv.reset(new receiver(parcelport));
rcv->context().build_connection(event.id, ec);
if(ec)
{
return false;
rcv.reset();
return rcv;
}

conn.on_preconnect(event.id, ec);
rcv->context().on_preconnect(event.id, ec);
if(ec)
{
return false;
rcv.reset();
return rcv;
}

rdma_accept(event.id, &cm_params);
return true;
}
return false;
}

template <typename Service>
bool try_accept(basic_context<Service> & conn, boost::system::error_code &ec)
{
if (close_operation_.load() || !event_channel_) {
HPX_IBVERBS_THROWS_IF(ec, boost::asio::error::not_connected);
return false;
pending_recv_list.push_back(std::make_pair(event, rcv));
rcv.reset();
return rcv;
}

rdma_cm_event event;
if(!get_next_rdma_event(event, ec))
{
return false;
}
if(ec)
{
return false;
}
if(event.event == RDMA_CM_EVENT_ESTABLISHED)
{
conn.on_connection(event.id, ec);
if(ec)
for(pending_recv_list_type::iterator it = pending_recv_list.begin();
it != pending_recv_list.end();)
{
return false;
if(it->first.id == event.id)
{
rcv = it->second;
rcv->context().on_connection(event.id, ec);
it = pending_recv_list.erase(it);
break;
}
else
{
++it;
}
}
HPX_IBVERBS_RESET_EC(ec);
return true;
HPX_ASSERT(rcv);
}

return false;
return rcv;
}

void on_disconnect(rdma_cm_id * id)
Expand All @@ -505,12 +281,13 @@ namespace hpx { namespace parcelset { namespace policies { namespace ibverbs
boost::atomic<boost::uint16_t> executing_operation_;
boost::atomic<bool> aborted_;
boost::atomic<bool> close_operation_;
};

///////////////////////////////////////////////////////////////////////////
typedef basic_acceptor<
basic_acceptor_service<acceptor_impl>
> acceptor;
typedef
std::list<std::pair<rdma_cm_event, boost::shared_ptr<receiver> > >
pending_recv_list_type;

pending_recv_list_type pending_recv_list;
};
}}}}

#endif
Expand Down

0 comments on commit 50c5e8f

Please sign in to comment.