Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing the queue LCO and add example demonstrating its use #1627

Merged
merged 2 commits into from Jun 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/manual/build_system/cmake_variables.qbk
Expand Up @@ -69,6 +69,7 @@ The options are split into these categories:
] [/ Generic Options]

[#build_system.cmake_variables.Build_Targets][h3 Build Targets Options]
* [link build_system.cmake_variables.HPX_WITH_COMPILE_ONLY_TESTS HPX_WITH_COMPILE_ONLY_TESTS]
* [link build_system.cmake_variables.HPX_WITH_DEFAULT_TARGETS HPX_WITH_DEFAULT_TARGETS]
* [link build_system.cmake_variables.HPX_WITH_DOCUMENTATION HPX_WITH_DOCUMENTATION]
* [link build_system.cmake_variables.HPX_WITH_DOCUMENTATION_SINGLEPAGE HPX_WITH_DOCUMENTATION_SINGLEPAGE]
Expand All @@ -82,10 +83,11 @@ The options are split into these categories:
* [link build_system.cmake_variables.HPX_WITH_TOOLS HPX_WITH_TOOLS]

[variablelist
[[[#build_system.cmake_variables.HPX_WITH_COMPILE_ONLY_TESTS] `HPX_WITH_COMPILE_ONLY_TESTS:BOOL`][Create build system support for compile time only HPX tests (default ON)]]
[[[#build_system.cmake_variables.HPX_WITH_DEFAULT_TARGETS] `HPX_WITH_DEFAULT_TARGETS:BOOL`][Associate the core HPX library with the default build target (default: ON).]]
[[[#build_system.cmake_variables.HPX_WITH_DOCUMENTATION] `HPX_WITH_DOCUMENTATION:BOOL`][Build the HPX documentation (default OFF).]]
[[[#build_system.cmake_variables.HPX_WITH_DOCUMENTATION_SINGLEPAGE] `HPX_WITH_DOCUMENTATION_SINGLEPAGE:BOOL`][The HPX documentation should be build as a single page HTML (default OFF).]]
[[[#build_system.cmake_variables.HPX_WITH_EXAMPLES] `HPX_WITH_EXAMPLES:BOOL`][Build the HPX examples (default OFF)]]
[[[#build_system.cmake_variables.HPX_WITH_EXAMPLES] `HPX_WITH_EXAMPLES:BOOL`][Build the HPX examples (default ON)]]
[[[#build_system.cmake_variables.HPX_WITH_RUNTIME] `HPX_WITH_RUNTIME:BOOL`][Build HPX runtime (default: ON)]]
[[[#build_system.cmake_variables.HPX_WITH_TESTS] `HPX_WITH_TESTS:BOOL`][Build the HPX tests (default ON)]]
[[[#build_system.cmake_variables.HPX_WITH_TESTS_BENCHMARKS] `HPX_WITH_TESTS_BENCHMARKS:BOOL`][Build HPX benchmark tests (default: ON)]]
Expand Down
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Expand Up @@ -19,6 +19,7 @@ set(subdirs
jacobi_smp
nqueen
performance_counters
queue
quickstart
qt
random_mem_access
Expand Down
14 changes: 14 additions & 0 deletions examples/queue/CMakeLists.txt
@@ -0,0 +1,14 @@
# Copyright (c) 2015 Hartmut Kaiser
#
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

add_hpx_executable(queue_client
SOURCES queue_client.cpp
FOLDER "Examples/Quickstart/Queue")

add_hpx_pseudo_target(examples.queue.queue)
add_hpx_pseudo_dependencies(examples.queue examples.queue.queue)
add_hpx_pseudo_dependencies(examples.queue.queue queue_client_exe)


71 changes: 71 additions & 0 deletions examples/queue/queue_client.cpp
@@ -0,0 +1,71 @@
// Copyright (c) 2007-2015 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/hpx.hpp>
#include <hpx/hpx_init.hpp>

#include <hpx/include/lcos.hpp>

#include <iostream>

///////////////////////////////////////////////////////////////////////////////
typedef hpx::lcos::queue<int> queue_type;

typedef hpx::components::managed_component<
hpx::lcos::server::queue<int>
> queue_of_ints_type;

HPX_REGISTER_DERIVED_COMPONENT_FACTORY(
queue_of_ints_type, queue_of_ints_type,
"hpx::lcos::base_lco_with_value<int, int>");

///////////////////////////////////////////////////////////////////////////////
void worker(queue_type queue)
{
try {
// retrieve one value, will possibly throw
int value = queue.get_value_sync();
std::cout << value << std::endl;
}
catch (hpx::exception const& e) {
std::cout << e.what() << std::endl;
}
}

void break_queue(queue_type queue)
{
queue.abort_pending();
}

///////////////////////////////////////////////////////////////////////////////
int hpx_main(boost::program_options::variables_map &vm)
{
// Create a new queue of integers.
queue_type queue = hpx::new_<queue_type>(hpx::find_here());

// Create some threads waiting to pull elements from the queue.
for (int i = 0; i < 5; ++i)
hpx::apply(hpx::util::bind(&worker, queue));

// Add some values to the queue.
for (int i = 0; i < 5; ++i)
queue.set_value_sync(i);

// Create some threads waiting to pull elements from the queue, these
// requests will fail because of the abort_pending() invoked below.
for (int i = 0; i < 5; ++i)
hpx::apply(hpx::util::bind(&worker, queue));

hpx::apply(hpx::util::bind(&break_queue, queue));

hpx::finalize();
return 0;
}

///////////////////////////////////////////////////////////////////////////////
int main(int argc, char* argv[])
{
return hpx::init("queue_of_ints_client", argc, argv);
}
3 changes: 1 addition & 2 deletions hpx/lcos/local/counting_semaphore.hpp
Expand Up @@ -96,8 +96,6 @@ namespace hpx { namespace lcos { namespace local
}

/// \brief Signal the semaphore
///
///
void signal(boost::int64_t count = 1)
{
boost::unique_lock<mutex_type> l(mtx_);
Expand Down Expand Up @@ -131,6 +129,7 @@ namespace hpx { namespace lcos { namespace local
HPX_ASSERT(l.owns_lock());

mutex_type* mtx = l.mutex();

// release no more threads than we get resources
value_ += count;
for (boost::int64_t i = 0; value_ >= 0 && i < count; ++i)
Expand Down
50 changes: 35 additions & 15 deletions hpx/lcos/local/detail/condition_variable.hpp
Expand Up @@ -101,7 +101,8 @@ namespace hpx { namespace lcos { namespace local { namespace detail
return queue_.size();
}

// Return false if no more threads are waiting.
// Return false if no more threads are waiting (returns true if queue
// is non-empty).
template <typename Mutex>
bool notify_one(boost::unique_lock<Mutex> lock, error_code& ec = throws)
{
Expand All @@ -110,24 +111,27 @@ namespace hpx { namespace lcos { namespace local { namespace detail
if (!queue_.empty())
{
threads::thread_id_repr_type id = queue_.front().id_;

// remove item from queue before error handling
queue_.front().id_ = threads::invalid_thread_id_repr;
queue_.pop_front();

if (HPX_UNLIKELY(id == threads::invalid_thread_id_repr))
{
HPX_THROWS_IF(ec, null_thread_id,
"condition_variable::notify_one",
"NULL thread id encountered");
return false;
}
queue_.front().id_ = threads::invalid_thread_id_repr;
queue_.pop_front();

bool empty = queue_.empty();
bool not_empty = !queue_.empty();
lock.unlock();

threads::set_thread_state(threads::thread_id_type(
reinterpret_cast<threads::thread_data_base*>(id)),
threads::pending, threads::wait_timeout,
threads::thread_priority_default, ec);
if (!ec) return empty;
if (!ec) return not_empty;
}
return false;
}
Expand All @@ -145,15 +149,18 @@ namespace hpx { namespace lcos { namespace local { namespace detail
while (!queue.empty())
{
threads::thread_id_repr_type id = queue.front().id_;

// remove item from queue before error handling
queue.front().id_ = threads::invalid_thread_id_repr;
queue.pop_front();

if (HPX_UNLIKELY(id == threads::invalid_thread_id_repr))
{
HPX_THROWS_IF(ec, null_thread_id,
"condition_variable::notify_all",
"NULL thread id encountered");
return;
}
queue.front().id_ = threads::invalid_thread_id_repr;
queue.pop_front();

threads::set_thread_state(threads::thread_id_type(
reinterpret_cast<threads::thread_data_base*>(id)),
Expand All @@ -175,29 +182,42 @@ namespace hpx { namespace lcos { namespace local { namespace detail

while (!queue.empty())
{
threads::thread_id_type id(
reinterpret_cast<threads::thread_data_base*>(queue_.front().id_));
threads::thread_id_repr_type id = queue.front().id_;
queue.front().id_ = threads::invalid_thread_id_repr;
queue.pop_front();

if (HPX_UNLIKELY(id == threads::invalid_thread_id_repr))
{
LERR_(fatal)
<< "condition_variable::abort_all:"
<< " NULL thread id encountered";
continue;
}

// we know that the id is actually the pointer to the thread
threads::thread_id_type tid(
reinterpret_cast<threads::thread_data_base*>(id));

LERR_(fatal)
<< "condition_variable::abort_all:"
<< " pending thread: "
<< get_thread_state_name(threads::get_thread_state(id))
<< "(" << id << "): " << threads::get_thread_description(id);
<< get_thread_state_name(threads::get_thread_state(tid))
<< "(" << tid << "): "
<< threads::get_thread_description(tid);

// forcefully abort thread, do not throw
error_code ec(lightweight);
threads::set_thread_state(id, threads::pending,
threads::wait_abort, threads::thread_priority_default, ec);
threads::set_thread_state(tid,
threads::pending, threads::wait_abort,
threads::thread_priority_default, ec);
if (ec)
{
LERR_(fatal)
<< "condition_variable::abort_all:"
<< " could not abort thread: "
<< get_thread_state_name(threads::get_thread_state(id))
<< "(" << id << "): " << threads::get_thread_description(id);
<< get_thread_state_name(threads::get_thread_state(tid))
<< "(" << tid << "): "
<< threads::get_thread_description(tid);
}
}
}
Expand Down
92 changes: 44 additions & 48 deletions hpx/lcos/queue.hpp
@@ -1,4 +1,4 @@
// Copyright (c) 2007-2012 Hartmut Kaiser
// Copyright (c) 2007-2015 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
Expand All @@ -8,100 +8,96 @@

#include <hpx/exception.hpp>
#include <hpx/include/client.hpp>
#include <hpx/lcos/stubs/queue.hpp>
#include <hpx/lcos/server/queue.hpp>

///////////////////////////////////////////////////////////////////////////////
namespace hpx { namespace lcos
{
///////////////////////////////////////////////////////////////////////////
template <typename ValueType, typename RemoteType = ValueType>
class queue
: public components::client_base<queue<ValueType, RemoteType>,
lcos::stubs::queue<ValueType, RemoteType> >
: public components::client_base<
queue<ValueType, RemoteType>,
lcos::server::queue<ValueType, RemoteType>
>
{
typedef components::client_base<
queue, lcos::stubs::queue<ValueType, RemoteType> > base_type;
queue, lcos::server::queue<ValueType, RemoteType>
> base_type;

public:
queue()
{}

/// Create a client side representation for the existing
/// \a server#queue instance with the given global id \a gid.
queue(naming::id_type gid)
: base_type(gid)
queue(future<id_type> && gid)
: base_type(std::move(gid))
{}

///////////////////////////////////////////////////////////////////////
// exposed functionality of this component

lcos::future<ValueType>
get_value_async()
future<ValueType> get_value()
{
HPX_ASSERT(this->get_gid());
return this->base_type::get_value_async(this->get_gid());
}
typedef typename
lcos::base_lco_with_value<ValueType, RemoteType>::get_value_action
action_type;

lcos::future<void>
set_value_async(RemoteType const& val)
{
HPX_ASSERT(this->get_gid());
RemoteType tmp(val);
return this->base_type::set_value_async(this->get_gid(), std::move(tmp));
return hpx::async<action_type>(this->get_gid());
}

lcos::future<void>
abort_pending_async(boost::exception_ptr const& e)
future<void> set_value(RemoteType && val)
{
HPX_ASSERT(this->get_gid());
return this->base_type::abort_pending_async(this->get_gid(), e);
}
typedef typename
lcos::base_lco_with_value<ValueType, RemoteType>::set_value_action
action_type;

///////////////////////////////////////////////////////////////////////
ValueType get_value_sync()
{
HPX_ASSERT(this->get_gid());
return this->base_type::get_value_sync(this->get_gid());
return hpx::async<action_type>(this->get_gid(), std::move(val));
}

void set_value_sync(RemoteType const& val)
future<void> set_value(RemoteType val)
{
typedef typename
lcos::base_lco_with_value<ValueType, RemoteType>::set_value_action
action_type;

HPX_ASSERT(this->get_gid());
RemoteType tmp(val);
this->base_type::set_value_sync(this->get_gid(), std::move(tmp));
return hpx::async<action_type>(this->get_gid(), std::move(val));
}

void set_value_sync(RemoteType && val) //-V659
future<void> abort_pending()
{
typedef lcos::base_lco::set_exception_action action_type;

HPX_ASSERT(this->get_gid());
this->base_type::set_value_sync(this->get_gid(), val);
boost::exception_ptr exception =
hpx::detail::get_exception(
hpx::exception(hpx::no_success), "queue::abort_pending",
__FILE__, __LINE__);
return hpx::async<action_type>(this->get_gid(), exception);
}

void abort_pending_sync(boost::exception_ptr const& e)
///////////////////////////////////////////////////////////////////////
ValueType get_value_sync()
{
this->base_type::abort_pending_sync(this->get_gid(), e);
return get_value().get();
}

///////////////////////////////////////////////////////////////////////
void set_value(RemoteType const& val)
void set_value_sync(RemoteType const& val)
{
RemoteType tmp(val);
this->base_type::set_value(this->get_gid(), std::move(tmp));
set_value(val).get();
}
void set_value(RemoteType && val) //-V659

void set_value_sync(RemoteType && val) //-V659
{
this->base_type::set_value(this->get_gid(), val);
set_value(std::move(val)).get();
}

void abort_pending()
void abort_pending_sync()
{
try {
HPX_THROW_EXCEPTION(no_success, "queue::set_exception",
"interrupt all pending requests");
}
catch (...) {
this->base_type::abort_pending(this->get_gid(), boost::current_exception());
}
abort_pending().get();
}
};
}}
Expand Down