Skip to content

Commit

Permalink
Fixing shutdown when parcels are still in flight
Browse files Browse the repository at this point in the history
 - Making sure a receiver connection in the TCP parcelports waits until
   the connection has been completed to get completely closed
 - Make sure that no parcels are still in flight when starting the termination
   detection

This completely fixes #2334
  • Loading branch information
Thomas Heller committed Oct 7, 2016
1 parent 94582f5 commit bad1145
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 4 deletions.
15 changes: 15 additions & 0 deletions hpx/plugins/parcelport/tcp/receiver.hpp
Expand Up @@ -58,6 +58,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace tcp
, parcelport_(parcelport)
, timer_()
, mtx_()
, operation_in_flight_(0)
{}

~receiver()
Expand Down Expand Up @@ -128,6 +129,13 @@ namespace hpx { namespace parcelset { namespace policies { namespace tcp
socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
socket_.close(ec); // close the socket to give it back to the OS
}

while(operation_in_flight_ != 0)
{
if(threads::get_self_ptr())
hpx::this_thread::suspend(hpx::threads::pending,
"tcp::reveiver::shutdown");
}
}

private:
Expand All @@ -137,13 +145,15 @@ namespace hpx { namespace parcelset { namespace policies { namespace tcp
void handle_read_header(boost::system::error_code const& e,
std::size_t bytes_transferred, Handler handler)
{
HPX_ASSERT(operation_in_flight_ == 0);
if (e) {
handler(e);

// Issue a read operation to read the next parcel.
// async_read(handler);
}
else {
++operation_in_flight_;
// Determine the length of the serialized data.
std::uint64_t inbound_size = buffer_.size_;

Expand Down Expand Up @@ -231,6 +241,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace tcp
{
if (e) {
handler(e);
--operation_in_flight_;

// Issue a read operation to read the next parcel.
// async_read(handler);
Expand Down Expand Up @@ -288,6 +299,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace tcp
{
if (e) {
handler(e);
--operation_in_flight_;

// Issue a read operation to read the next parcel.
// async_read(handler);
Expand Down Expand Up @@ -330,8 +342,10 @@ namespace hpx { namespace parcelset { namespace policies { namespace tcp
void handle_write_ack(boost::system::error_code const& e,
Handler handler)
{
HPX_ASSERT(operation_in_flight_ != 0);
// Inform caller that data has been received ok.
handler(e);
--operation_in_flight_;

// Issue a read operation to read the next parcel.
if (!e)
Expand All @@ -355,6 +369,7 @@ namespace hpx { namespace parcelset { namespace policies { namespace tcp
util::high_resolution_timer timer_;

mutex_type mtx_;
hpx::util::atomic_count operation_in_flight_;
};
}}}}

Expand Down
2 changes: 2 additions & 0 deletions hpx/runtime/parcelset/parcelhandler.hpp
Expand Up @@ -108,6 +108,8 @@ namespace hpx { namespace parcelset

void initialize(naming::resolver_client &resolver, applier::applier *applier);

void flush_parcels();

/// \brief Stop all parcel ports associated with this parcelhandler
void stop(bool blocking = true);

Expand Down
2 changes: 2 additions & 0 deletions hpx/runtime/parcelset/parcelport.hpp
Expand Up @@ -93,6 +93,8 @@ namespace hpx { namespace parcelset
/// the routine returns immediately.
virtual bool run(bool blocking = true) = 0;

virtual void flush_parcels() = 0;

/// Stop the parcelport I/O thread pool.
///
/// \param blocking [in] If blocking is set to \a false the routine will
Expand Down
15 changes: 12 additions & 3 deletions hpx/runtime/parcelset/parcelport_impl.hpp
Expand Up @@ -19,6 +19,7 @@
#include <hpx/runtime/parcelset/parcelport.hpp>
#include <hpx/runtime/threads/thread.hpp>
#include <hpx/throw_exception.hpp>
#include <hpx/util/atomic_count.hpp>
#include <hpx/util/bind.hpp>
#include <hpx/util/connection_cache.hpp>
#include <hpx/util/io_service_pool.hpp>
Expand Down Expand Up @@ -173,18 +174,26 @@ namespace hpx { namespace parcelset
return success;
}

void stop(bool blocking = true)
void flush_parcels()
{
do_background_work(0);

// make sure no more work is pending, wait for service pool to get
// empty
while(operations_in_flight_ != 0)

while(operations_in_flight_ != 0 || get_pending_parcels_count(false) != 0)
{
if(threads::get_self_ptr())
{
hpx::this_thread::suspend(hpx::threads::pending,
"parcelport_impl::stop");
}
}
}

void stop(bool blocking = true)
{
flush_parcels();

io_service_pool_.stop();
if (blocking) {
Expand Down Expand Up @@ -942,7 +951,7 @@ namespace hpx { namespace parcelset
typedef hpx::lcos::local::spinlock mutex_type;

int archive_flags_;
boost::atomic<std::size_t> operations_in_flight_;
hpx::util::atomic_count operations_in_flight_;

boost::atomic<std::size_t> num_thread_;
std::size_t const max_background_thread_;
Expand Down
4 changes: 4 additions & 0 deletions src/runtime/components/server/runtime_support_server.cpp
Expand Up @@ -716,8 +716,10 @@ namespace hpx { namespace components { namespace server
{
applier::applier& appl = hpx::applier::get_applier();
naming::resolver_client& agas_client = appl.get_agas_client();
parcelset::parcelhandler& ph = appl.get_parcel_handler();

agas_client.start_shutdown();
ph.flush_parcels();

std::uint32_t locality_id = get_locality_id();

Expand Down Expand Up @@ -755,7 +757,9 @@ namespace hpx { namespace components { namespace server
threads::threadmanager_base& tm = appl.get_thread_manager();

while (tm.get_thread_count() > 1)
{
this_thread::yield();
}

return 0;
}
Expand Down
12 changes: 12 additions & 0 deletions src/runtime/parcelset/parcelhandler.cpp
Expand Up @@ -277,6 +277,18 @@ namespace hpx { namespace parcelset
return did_some_work;
}

void parcelhandler::flush_parcels()
{
// now flush all parcel ports to be shut down
for (pports_type::value_type& pp : pports_)
{
if(pp.first > 0)
{
pp.second->flush_parcels();
}
}
}

void parcelhandler::stop(bool blocking)
{
// now stop all parcel ports
Expand Down
8 changes: 7 additions & 1 deletion src/runtime/parcelset/parcelport.cpp
Expand Up @@ -207,7 +207,13 @@ namespace hpx { namespace parcelset
std::int64_t parcelport::get_pending_parcels_count(bool /*reset*/)
{
std::lock_guard<lcos::local::spinlock> l(mtx_);
return pending_parcels_.size();
std::int64_t count = 0;
for (auto && p : pending_parcels_)
{
count += hpx::util::get<0>(p.second).size();
HPX_ASSERT(hpx::util::get<0>(p.second).size() == hpx::util::get<1>(p.second).size());
}
return count;
}

///////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit bad1145

Please sign in to comment.