Skip to content

Commit

Permalink
Adding test to trigger problem reported in #2916
Browse files Browse the repository at this point in the history
- flyby: channel::close() now returns the number of elements left in the channel
  • Loading branch information
hkaiser committed Oct 18, 2017
1 parent 954818a commit 30a4c49
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 28 deletions.
20 changes: 10 additions & 10 deletions hpx/lcos/channel.hpp
Expand Up @@ -290,20 +290,20 @@ namespace hpx { namespace lcos
typedef typename lcos::server::channel<T>::close_action action_type;
hpx::apply(action_type(), this->get_id(), force_delete_entries);
}
hpx::future<void> close(
hpx::future<std::size_t> close(
launch::async_policy, bool force_delete_entries = false)
{
typedef typename lcos::server::channel<T>::close_action action_type;
return hpx::async(action_type(), this->get_id(), force_delete_entries);
}
void close(launch::sync_policy, bool force_delete_entries = false)
std::size_t close(launch::sync_policy, bool force_delete_entries = false)
{
typedef typename lcos::server::channel<T>::close_action action_type;
action_type()(this->get_id(), force_delete_entries);
return action_type()(this->get_id(), force_delete_entries);
}
void close(bool force_delete_entries = false)
std::size_t close(bool force_delete_entries = false)
{
close(launch::sync, force_delete_entries);
return close(launch::sync, force_delete_entries);
}

///////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -528,20 +528,20 @@ namespace hpx { namespace lcos
typedef typename lcos::server::channel<T>::close_action action_type;
hpx::apply(action_type(), this->get_id(), force_delete_entries);
}
hpx::future<void> close(
hpx::future<std::size_t> close(
launch::async_policy, bool force_delete_entries = false)
{
typedef typename lcos::server::channel<T>::close_action action_type;
return hpx::async(action_type(), this->get_id(), force_delete_entries);
}
void close(launch::sync_policy, bool force_delete_entries = false)
std::size_t close(launch::sync_policy, bool force_delete_entries = false)
{
typedef typename lcos::server::channel<T>::close_action action_type;
action_type()(this->get_id(), force_delete_entries);
return action_type()(this->get_id(), force_delete_entries);
}
void close(bool force_delete_entries = false)
std::size_t close(bool force_delete_entries = false)
{
close(launch::sync, force_delete_entries);
return close(launch::sync, force_delete_entries);
}
};
}}
Expand Down
31 changes: 17 additions & 14 deletions hpx/lcos/local/channel.hpp
Expand Up @@ -51,7 +51,7 @@ namespace hpx { namespace lcos { namespace local
virtual bool try_get(std::size_t generation,
hpx::future<T>* f = nullptr) = 0;
virtual hpx::future<void> set(std::size_t generation, T && t) = 0;
virtual void close(bool force_delete_entries = false) = 0;
virtual std::size_t close(bool force_delete_entries = false) = 0;

virtual bool requires_delete()
{
Expand Down Expand Up @@ -186,7 +186,7 @@ namespace hpx { namespace lcos { namespace local
return hpx::make_ready_future();
}

void close(bool force_delete_entries = false)
std::size_t close(bool force_delete_entries = false)
{
std::unique_lock<mutex_type> l(mtx_);
if(closed_)
Expand All @@ -195,13 +195,13 @@ namespace hpx { namespace lcos { namespace local
HPX_THROW_EXCEPTION(hpx::invalid_status,
"hpx::lcos::local::channel::close",
"attempting to close an already closed channel");
return;
return 0;
}

closed_ = true;

if (buffer_.empty())
return;
return 0;

std::exception_ptr e;

Expand All @@ -216,7 +216,7 @@ namespace hpx { namespace lcos { namespace local
// all pending requests which can't be satisfied have to be
// canceled at this point, force deleting possibly waiting
// requests
buffer_.cancel_waiting(e, force_delete_entries);
return buffer_.cancel_waiting(e, force_delete_entries);
}

private:
Expand Down Expand Up @@ -305,14 +305,16 @@ namespace hpx { namespace lcos { namespace local
}

template <typename Lock>
void cancel(std::exception_ptr const& e, Lock& l)
std::size_t cancel(std::exception_ptr const& e, Lock& l)
{
HPX_ASSERT_OWNS_LOCK(l);
if (pop_active_)
{
pop_.set_exception(e);
pop_active_ = false;
return 1;
}
return 0;
}

template <typename Lock>
Expand Down Expand Up @@ -454,7 +456,7 @@ namespace hpx { namespace lcos { namespace local
return buffer_.push(std::move(t), l);
}

void close(bool force_delete_entries = false)
std::size_t close(bool force_delete_entries = false)
{
std::unique_lock<mutex_type> l(mtx_);

Expand All @@ -464,13 +466,13 @@ namespace hpx { namespace lcos { namespace local
HPX_THROW_EXCEPTION(hpx::invalid_status,
"hpx::lcos::local::channel::close",
"attempting to close an already closed channel");
return;
return 0;
}

closed_ = true;

if (buffer_.is_empty(l) || !buffer_.has_pending_request(l))
return;
return 0;

// all pending requests which can't be satisfied have to be
// canceled at this point
Expand All @@ -482,7 +484,8 @@ namespace hpx { namespace lcos { namespace local
"hpx::lcos::local::close",
"canceled waiting on this entry"));
}
buffer_.cancel(std::move(e), l);

return buffer_.cancel(std::move(e), l);
}

void set_exception(std::exception_ptr e)
Expand Down Expand Up @@ -693,9 +696,9 @@ namespace hpx { namespace lcos { namespace local
return channel_->set(generation, std::move(val));
}

void close(bool force_delete_entries = false)
std::size_t close(bool force_delete_entries = false)
{
channel_->close(force_delete_entries);
return channel_->close(force_delete_entries);
}

///////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -958,9 +961,9 @@ namespace hpx { namespace lcos { namespace local
return channel_->set(generation, hpx::util::unused_type());
}

void close(bool force_delete_entries = false)
std::size_t close(bool force_delete_entries = false)
{
channel_->close(force_delete_entries);
return channel_->close(force_delete_entries);
}

///////////////////////////////////////////////////////////////////
Expand Down
12 changes: 10 additions & 2 deletions hpx/lcos/local/receive_buffer.hpp
Expand Up @@ -198,20 +198,24 @@ namespace hpx { namespace lcos { namespace local
return buffer_map_.empty();
}

void cancel_waiting(std::exception_ptr const& e,
// return the number of deleted buffer entries
std::size_t cancel_waiting(std::exception_ptr const& e,
bool force_delete_entries = false)
{
std::lock_guard<mutex_type> l(mtx_);

std::size_t count = 0;
iterator end = buffer_map_.end();
for (iterator it = buffer_map_.begin(); it != end; /**/)
{
iterator to_delete = it++;
if (to_delete->second->cancel(e) || force_delete_entries)
{
buffer_map_.erase(to_delete);
++count;
}
}
return count;
}

protected:
Expand Down Expand Up @@ -412,20 +416,24 @@ namespace hpx { namespace lcos { namespace local
return buffer_map_.empty();
}

void cancel_waiting(std::exception_ptr const& e,
// return the number of deleted buffer entries
std::size_t cancel_waiting(std::exception_ptr const& e,
bool force_delete_entries = false)
{
std::lock_guard<mutex_type> l(mtx_);

std::size_t count = 0;
iterator end = buffer_map_.end();
for (iterator it = buffer_map_.begin(); it != end; /**/)
{
iterator to_delete = it++;
if (to_delete->second->cancel(e) || force_delete_entries)
{
buffer_map_.erase(to_delete);
++count;
}
}
return count;
}

protected:
Expand Down
4 changes: 2 additions & 2 deletions hpx/lcos/server/channel.hpp
Expand Up @@ -104,9 +104,9 @@ namespace hpx { namespace lcos { namespace server
}
HPX_DEFINE_COMPONENT_ACTION(channel, set_generation);

void close(bool force_delete_entries)
std::size_t close(bool force_delete_entries)
{
channel_.close(force_delete_entries);
return channel_.close(force_delete_entries);
}
HPX_DEFINE_COMPONENT_ACTION(channel, close);

Expand Down
3 changes: 3 additions & 0 deletions tests/regressions/lcos/CMakeLists.txt
Expand Up @@ -14,6 +14,7 @@ set(tests
broadcast_unwrap_future_2885
broadcast_wait_for_2822
call_promise_get_gid_more_than_once
channel_2916
channel_not_empty_2890
channel_register_as_2722
dataflow_791
Expand Down Expand Up @@ -64,6 +65,8 @@ set(async_callback_with_bound_callback_PARAMETERS LOCALITIES 2)
set(async_callback_non_deduced_context_PARAMETERS THREADS_PER_LOCALITY 4)
set(broadcast_unwrap_future_2885_PARAMETERS LOCALITIES 2 THREADS_PER_LOCALITY 4)
set(broadcast_wait_for_2822_PARAMETERS LOCALITIES 2 THREADS_PER_LOCALITY 4)
set(channel_2916_FLAGS DEPENDENCIES iostreams_component)
set(channel_2916_PARAMETERS LOCALITIES 2 THREADS_PER_LOCALITY 4)
set(future_hang_on_get_629_PARAMETERS LOCALITIES 2 THREADS_PER_LOCALITY 2)
set(dataflow_future_swap2_FLAGS DEPENDENCIES iostreams_component)
set(dataflow_launch_775_PARAMETERS THREADS_PER_LOCALITY 2)
Expand Down
69 changes: 69 additions & 0 deletions tests/regressions/lcos/channel_2916.cpp
@@ -0,0 +1,69 @@
// Copyright (c) 2017 Igor Krivenko
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx_init.hpp>
#include <hpx/hpx.hpp>
#include <hpx/util/lightweight_test.hpp>

#include <cstddef>

typedef hpx::naming::id_type locality_id_t;
HPX_REGISTER_CHANNEL(locality_id_t);

std::atomic<std::size_t> count(0);

int hpx_main()
{
// List of currently available resources
hpx::lcos::channel<locality_id_t> free_resources(hpx::find_here());

std::size_t os_thread_count = hpx::get_os_thread_count();

// At the beginning all threads on all localities are free
for (locality_id_t id : hpx::find_all_localities())
{
for (std::size_t i = 0; i != os_thread_count; ++i)
{
free_resources.set(id);
++count;
}
}

for (int i = 0; i < 1000; ++i)
{
// Ask for resources
hpx::shared_future<locality_id_t> target = free_resources.get();

// Do some work, once we have acquired resources
hpx::shared_future<int> result = target.then(
[](hpx::shared_future<locality_id_t> t)
-> hpx::shared_future<int>
{
--count;
return hpx::make_ready_future(0);
});

// Free resources
result.then(
[free_resources, target](hpx::shared_future<int>) mutable
{
++count;
free_resources.set(target.get());
});

result.get();
}

std::size_t remaining_count = free_resources.close(true);
HPX_TEST_EQ(remaining_count, count.load());

return hpx::finalize();
}

int main(int argc, char* argv[])
{
HPX_TEST_EQ(0, hpx::init(argc, argv));
return hpx::util::report_errors();
}

0 comments on commit 30a4c49

Please sign in to comment.