335 changes: 272 additions & 63 deletions doc/async_executors.qbk

Large diffs are not rendered by default.

223 changes: 223 additions & 0 deletions include/boost/thread/concurrent_queues/detail/sync_queue_base.hpp
@@ -0,0 +1,223 @@
#ifndef BOOST_THREAD_CONCURRENT_QUEUES_DETAIL_SYNC_QUEUE_BASE_HPP
#define BOOST_THREAD_CONCURRENT_QUEUES_DETAIL_SYNC_QUEUE_BASE_HPP

//////////////////////////////////////////////////////////////////////////////
//
// (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost
// Software License, Version 1.0. (See accompanying file
// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// See http://www.boost.org/libs/thread for documentation.
//
//////////////////////////////////////////////////////////////////////////////

#include <boost/thread/detail/config.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/concurrent_queues/queue_op_status.hpp>

#include <boost/chrono/duration.hpp>
#include <boost/chrono/time_point.hpp>
#include <boost/chrono/system_clocks.hpp>
#include <boost/throw_exception.hpp>

#include <boost/config/abi_prefix.hpp>

namespace boost
{
namespace concurrent
{
namespace detail
{

template <class ValueType, class Queue>
class sync_queue_base
{
public:
typedef ValueType value_type;
typedef Queue underlying_queue_type;
typedef std::size_t size_type;
typedef queue_op_status op_status;

typedef typename chrono::steady_clock clock;
typedef typename clock::duration duration;
typedef typename clock::time_point time_point;

// Constructors/Assignment/Destructors
BOOST_THREAD_NO_COPYABLE(sync_queue_base)
inline sync_queue_base();
//template <typename Range>
//inline explicit sync_queue(Range range);
inline ~sync_queue_base();

// Observers
inline bool empty() const;
inline bool full() const;
inline size_type size() const;
inline bool closed() const;

// Modifiers
inline void close();

inline underlying_queue_type underlying_queue() {
lock_guard<mutex> lk(mtx_);
return boost::move(data_);
}

protected:
mutable mutex mtx_;
condition_variable not_empty_;
underlying_queue_type data_;
bool closed_;

inline bool empty(unique_lock<mutex>& ) const BOOST_NOEXCEPT
{
return data_.empty();
}
inline bool empty(lock_guard<mutex>& ) const BOOST_NOEXCEPT
{
return data_.empty();
}

inline size_type size(lock_guard<mutex>& ) const BOOST_NOEXCEPT
{
return data_.size();
}
inline bool closed(unique_lock<mutex>& lk) const;
inline bool closed(lock_guard<mutex>& lk) const;

inline void throw_if_closed(unique_lock<mutex>&);
inline void throw_if_closed(lock_guard<mutex>&);

inline void wait_until_not_empty(unique_lock<mutex>& lk);
inline bool wait_until_not_empty_or_closed(unique_lock<mutex>& lk);
inline queue_op_status wait_until_not_empty_until(unique_lock<mutex>& lk, time_point const&);

inline void notify_not_empty_if_needed(unique_lock<mutex>& )
{
not_empty_.notify_one();
}
inline void notify_not_empty_if_needed(lock_guard<mutex>& )
{
not_empty_.notify_one();
}

};

template <class ValueType, class Queue>
sync_queue_base<ValueType, Queue>::sync_queue_base() :
data_(), closed_(false)
{
BOOST_ASSERT(data_.empty());
}

template <class ValueType, class Queue>
sync_queue_base<ValueType, Queue>::~sync_queue_base()
{
}

template <class ValueType, class Queue>
void sync_queue_base<ValueType, Queue>::close()
{
{
lock_guard<mutex> lk(mtx_);
closed_ = true;
}
not_empty_.notify_all();
}

template <class ValueType, class Queue>
bool sync_queue_base<ValueType, Queue>::closed() const
{
lock_guard<mutex> lk(mtx_);
return closed(lk);
}
template <class ValueType, class Queue>
bool sync_queue_base<ValueType, Queue>::closed(unique_lock<mutex>&) const
{
return closed_;
}
template <class ValueType, class Queue>
bool sync_queue_base<ValueType, Queue>::closed(lock_guard<mutex>&) const
{
return closed_;
}

template <class ValueType, class Queue>
bool sync_queue_base<ValueType, Queue>::empty() const
{
lock_guard<mutex> lk(mtx_);
return empty(lk);
}
template <class ValueType, class Queue>
bool sync_queue_base<ValueType, Queue>::full() const
{
return false;
}

template <class ValueType, class Queue>
typename sync_queue_base<ValueType, Queue>::size_type sync_queue_base<ValueType, Queue>::size() const
{
lock_guard<mutex> lk(mtx_);
return size(lk);
}

template <class ValueType, class Queue>
void sync_queue_base<ValueType, Queue>::throw_if_closed(unique_lock<mutex>& lk)
{
if (closed(lk))
{
BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
}
}
template <class ValueType, class Queue>
void sync_queue_base<ValueType, Queue>::throw_if_closed(lock_guard<mutex>& lk)
{
if (closed(lk))
{
BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
}
}

template <class ValueType, class Queue>
void sync_queue_base<ValueType, Queue>::wait_until_not_empty(unique_lock<mutex>& lk)
{
for (;;)
{
if (! empty(lk)) break;
throw_if_closed(lk);
not_empty_.wait(lk);
}
}
template <class ValueType, class Queue>
bool sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed(unique_lock<mutex>& lk)
{
for (;;)
{
if (! empty(lk)) break;
if (closed(lk)) return true;
not_empty_.wait(lk);
}
return false;
}

template <class ValueType, class Queue>
queue_op_status sync_queue_base<ValueType, Queue>::wait_until_not_empty_until(unique_lock<mutex>& lk, time_point const&tp)
{
for (;;)
{
if (! empty(lk)) return queue_op_status::success;
throw_if_closed(lk);
if (not_empty_.wait_until(lk, tp) == cv_status::timeout ) return queue_op_status::timeout;
}
}


} // detail
} // concurrent
} // boost

#include <boost/config/abi_suffix.hpp>

#endif
2 changes: 1 addition & 1 deletion include/boost/thread/concurrent_queues/queue_op_status.hpp
Expand Up @@ -22,7 +22,7 @@ namespace concurrent
{

BOOST_SCOPED_ENUM_DECLARE_BEGIN(queue_op_status)
{ success = 0, empty, full, closed, busy }
{ success = 0, empty, full, closed, busy, timeout, not_ready }
BOOST_SCOPED_ENUM_DECLARE_END(queue_op_status)

struct sync_queue_is_closed : std::exception
Expand Down
725 changes: 725 additions & 0 deletions include/boost/thread/concurrent_queues/sync_bounded_queue.hpp

Large diffs are not rendered by default.

362 changes: 362 additions & 0 deletions include/boost/thread/concurrent_queues/sync_priority_queue.hpp
@@ -0,0 +1,362 @@
// Copyright (C) 2014 Ian Forbed
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_THREAD_SYNC_PRIORITY_QUEUE
#define BOOST_THREAD_SYNC_PRIORITY_QUEUE

#include <boost/thread/detail/config.hpp>

#include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
#include <boost/thread/concurrent_queues/queue_op_status.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/csbl/vector.hpp>
#include <boost/thread/detail/move.hpp>
#include <boost/thread/mutex.hpp>

#include <boost/atomic.hpp>
#include <boost/chrono/duration.hpp>
#include <boost/chrono/time_point.hpp>

#include <exception>
#include <queue>
#include <utility>

#include <boost/config/abi_prefix.hpp>

namespace boost
{
namespace detail {

template <
class Type,
class Container = csbl::vector<Type>,
class Compare = std::less<Type>
>
class priority_queue
{
private:
std::vector<Type> _elements;
Compare _compare;
public:
explicit priority_queue(const Compare& compare = Compare())
: _elements(), _compare(compare)
{ }

std::size_t size() const
{
return _elements.size();
}

bool empty() const
{
return _elements.empty();
}

void push(Type const& element)
{
_elements.push_back(element);
std::push_heap(_elements.begin(), _elements.end(), _compare);
}
void push(BOOST_RV_REF(Type) element)
{
_elements.push_back(boost::move(element));
std::push_heap(_elements.begin(), _elements.end(), _compare);
}

Type pull()
{
std::pop_heap(_elements.begin(), _elements.end(), _compare);
Type result = boost::move(_elements.back());
_elements.pop_back();
return boost::move(result);
}

Type const& top()
{
return _elements.back();
}
};
}

namespace concurrent
{
template <class ValueType,
class Container = csbl::vector<ValueType>,
class Compare = std::less<typename Container::value_type> >
class sync_priority_queue
: public detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> >
{
typedef detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> > super;

public:
typedef ValueType value_type;
//typedef typename super::value_type value_type; // fixme
typedef typename super::underlying_queue_type underlying_queue_type;
typedef typename super::size_type size_type;
typedef typename super::op_status op_status;

typedef chrono::steady_clock clock;
protected:

public:
sync_priority_queue() {}

~sync_priority_queue()
{
if(!super::closed())
{
super::close();
}
}

void push(const ValueType& elem);
void push(BOOST_THREAD_RV_REF(ValueType) elem);

queue_op_status try_push(const ValueType& elem);
queue_op_status try_push(BOOST_THREAD_RV_REF(ValueType) elem);

ValueType pull();

void pull(ValueType&);

queue_op_status pull_until(const clock::time_point&, ValueType&);
queue_op_status pull_for(const clock::duration&, ValueType&);

queue_op_status try_pull(ValueType& elem);
queue_op_status wait_pull(ValueType& elem);
queue_op_status nonblocking_pull(ValueType&);

private:
void push(unique_lock<mutex>&, const ValueType& elem);
void push(lock_guard<mutex>&, const ValueType& elem);
void push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
void push(lock_guard<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);

queue_op_status try_push(unique_lock<mutex>&, const ValueType& elem);
queue_op_status try_push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);

ValueType pull(unique_lock<mutex>&);
ValueType pull(lock_guard<mutex>&);

void pull(unique_lock<mutex>&, ValueType&);
void pull(lock_guard<mutex>&, ValueType&);

queue_op_status try_pull(lock_guard<mutex>& lk, ValueType& elem);
queue_op_status try_pull(unique_lock<mutex>& lk, ValueType& elem);

queue_op_status wait_pull(unique_lock<mutex>& lk, ValueType& elem);

queue_op_status nonblocking_pull(unique_lock<mutex>& lk, ValueType&);

sync_priority_queue(const sync_priority_queue&);
sync_priority_queue& operator= (const sync_priority_queue&);
sync_priority_queue(BOOST_THREAD_RV_REF(sync_priority_queue));
sync_priority_queue& operator= (BOOST_THREAD_RV_REF(sync_priority_queue));
}; //end class


//////////////////////
template <class T, class Container,class Cmp>
void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, const T& elem)
{
super::throw_if_closed(lk);
super::data_.push(elem);
super::notify_not_empty_if_needed(lk);
}
template <class T, class Container,class Cmp>
void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem)
{
super::throw_if_closed(lk);
super::data_.push(elem);
super::notify_not_empty_if_needed(lk);
}
template <class T, class Container,class Cmp>
void sync_priority_queue<T,Container,Cmp>::push(const T& elem)
{
lock_guard<mutex> lk(super::mtx_);
push(lk, elem);
}

//////////////////////
template <class T, class Container,class Cmp>
void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
{
super::throw_if_closed(lk);
super::data_.push(boost::move(elem));
super::notify_not_empty_if_needed(lk);
}
template <class T, class Container,class Cmp>
void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
{
super::throw_if_closed(lk);
super::data_.push(boost::move(elem));
super::notify_not_empty_if_needed(lk);
}
template <class T, class Container,class Cmp>
void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem)
{
lock_guard<mutex> lk(super::mtx_);
push(lk, boost::move(elem));
}

//////////////////////
template <class T, class Container,class Cmp>
queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(const T& elem)
{
lock_guard<mutex> lk(super::mtx_);
if (super::closed(lk)) return queue_op_status::closed;
push(lk, elem);
return queue_op_status::success;
}

//////////////////////
template <class T, class Container,class Cmp>
queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(BOOST_THREAD_RV_REF(T) elem)
{
lock_guard<mutex> lk(super::mtx_);
if (super::closed(lk)) return queue_op_status::closed;
push(lk, boost::move(elem));

return queue_op_status::success;
}

//////////////////////
template <class T,class Container, class Cmp>
T sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&)
{
return super::data_.pull();
}
template <class T,class Container, class Cmp>
T sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&)
{
return super::data_.pull();
}

template <class T,class Container, class Cmp>
T sync_priority_queue<T,Container,Cmp>::pull()
{
unique_lock<mutex> lk(super::mtx_);
super::wait_until_not_empty(lk);
return pull(lk);
}

//////////////////////
template <class T,class Container, class Cmp>
void sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&, T& elem)
{
elem = super::data_.pull();
}
template <class T,class Container, class Cmp>
void sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&, T& elem)
{
elem = super::data_.pull();
}

template <class T,class Container, class Cmp>
void sync_priority_queue<T,Container,Cmp>::pull(T& elem)
{
unique_lock<mutex> lk(super::mtx_);
super::wait_until_not_empty(lk);
pull(lk, elem);
}

//////////////////////
template <class T, class Cont,class Cmp>
queue_op_status
sync_priority_queue<T,Cont,Cmp>::pull_until(const clock::time_point& tp, T& elem)
{
unique_lock<mutex> lk(super::mtx_);
if (queue_op_status::timeout == super::wait_until_not_empty_until(lk, tp))
return queue_op_status::timeout;
pull(lk, elem);
return queue_op_status::success;
}

//////////////////////
template <class T, class Cont,class Cmp>
queue_op_status
sync_priority_queue<T,Cont,Cmp>::pull_for(const clock::duration& dura, T& elem)
{
return pull_until(clock::now() + dura, elem);
}

//////////////////////
template <class T, class Container,class Cmp>
queue_op_status
sync_priority_queue<T,Container,Cmp>::try_pull(unique_lock<mutex>& lk, T& elem)
{
if (super::empty(lk))
{
if (super::closed(lk)) return queue_op_status::closed;
return queue_op_status::empty;
}
pull(lk, elem);
return queue_op_status::success;
}

template <class T, class Container,class Cmp>
queue_op_status
sync_priority_queue<T,Container,Cmp>::try_pull(lock_guard<mutex>& lk, T& elem)
{
if (super::empty(lk))
{
if (super::closed(lk)) return queue_op_status::closed;
return queue_op_status::empty;
}
pull(lk, elem);
return queue_op_status::success;
}

template <class T, class Container,class Cmp>
queue_op_status
sync_priority_queue<T,Container,Cmp>::try_pull(T& elem)
{
lock_guard<mutex> lk(super::mtx_);
return try_pull(lk, elem);
}

//////////////////////
template <class T,class Container, class Cmp>
queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem)
{
if (super::empty(lk))
{
if (super::closed(lk)) return queue_op_status::closed;
}
bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
if (has_been_closed) return queue_op_status::closed;
pull(lk, elem);
return queue_op_status::success;
}

template <class T,class Container, class Cmp>
queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(T& elem)
{
unique_lock<mutex> lk(super::mtx_);
return wait_pull(lk, elem);
}

//////////////////////

template <class T,class Container, class Cmp>
queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem)
{
unique_lock<mutex> lk(super::mtx_, try_to_lock);
if (!lk.owns_lock()) return queue_op_status::busy;
return try_pull(lk, elem);
}



} //end concurrent namespace

using concurrent::sync_priority_queue;

} //end boost namespace
#include <boost/config/abi_suffix.hpp>

#endif
544 changes: 544 additions & 0 deletions include/boost/thread/concurrent_queues/sync_queue.hpp

Large diffs are not rendered by default.

431 changes: 431 additions & 0 deletions include/boost/thread/concurrent_queues/sync_timed_queue.hpp

Large diffs are not rendered by default.

16 changes: 5 additions & 11 deletions include/boost/thread/executors/basic_thread_pool.hpp
Expand Up @@ -36,7 +36,7 @@ namespace executors
typedef csbl::vector<thread_t> thread_vector;

/// the thread safe work queue
sync_queue<work > work_queue;
concurrent::sync_queue<work > work_queue;
/// A move aware vector
thread_vector threads;

Expand All @@ -48,22 +48,19 @@ namespace executors
*/
bool try_executing_one()
{
work task;
try
{
work task;
if (work_queue.try_pull_front(task) == queue_op_status::success)
{
task();
return true;
}
return false;
}
catch (std::exception& )
{
return false;
}
catch (...)
{
std::terminate();
return false;
}
}
Expand Down Expand Up @@ -95,12 +92,9 @@ namespace executors
task();
}
}
catch (std::exception& )
{
return;
}
catch (...)
{
std::terminate();
return;
}
}
Expand Down Expand Up @@ -134,7 +128,7 @@ namespace executors
*
* \b Throws: Whatever exception is thrown while initializing the needed resources.
*/
basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency())
basic_thread_pool(unsigned const thread_count = thread::hardware_concurrency()+1)
{
try
{
Expand Down
93 changes: 93 additions & 0 deletions include/boost/thread/executors/detail/scheduled_executor_base.hpp
@@ -0,0 +1,93 @@
// Copyright (C) 2014 Ian Forbed
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef SCHEDULED_EXECUTOR_HPP
#define SCHEDULED_EXECUTOR_HPP

#include <boost/atomic.hpp>
#include <boost/function.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/concurrent_queues/sync_timed_queue.hpp>
#include <boost/thread/executors/work.hpp>

namespace boost
{
namespace executors
{
namespace detail
{
class scheduled_executor_base
{
public:
typedef boost::function<void()> work;
//typedef executors::work work;
typedef chrono::steady_clock clock;
typedef clock::duration duration;
typedef clock::time_point time_point;
protected:
concurrent::sync_timed_queue<work> _workq;

scheduled_executor_base() {}
public:

~scheduled_executor_base()
{
if(!closed())
{
this->close();
}
}

void close()
{
_workq.close();
}

bool closed()
{
return _workq.closed();
}

void submit(work w)
{
_workq.push(w, clock::now());
}

void submit_at(work w, const time_point& tp)
{
_workq.push(w, tp);
}

void submit_after(work w, const duration& dura)
{
_workq.push(w, dura);
}

void loop()
{
try
{
for(;;)
{
work task;
queue_op_status st = _workq.wait_pull(task);
if (st == queue_op_status::closed) return;
task();
}
}
catch (...)
{
std::terminate();
return;
}
}
}; //end class

} //end detail namespace
} //end executors namespace
} //end boost namespace
#endif
54 changes: 47 additions & 7 deletions include/boost/thread/executors/inline_executor.hpp
Expand Up @@ -26,6 +26,7 @@ namespace executors
/// type-erasure to store the works to do
typedef executors::work work;
bool closed_;
mutable mutex mtx_;
/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
Expand Down Expand Up @@ -66,16 +67,22 @@ namespace executors
*/
void close()
{
lock_guard<mutex> lk(mtx_);
closed_ = true;
}

/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
bool closed(lock_guard<mutex>& )
{
return closed_;
}
bool closed()
{
lock_guard<mutex> lk(mtx_);
return closed(lk);
}

/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
Expand All @@ -93,21 +100,54 @@ namespace executors
template <typename Closure>
void submit(Closure & closure)
{
if (closed()) return;
closure();
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
}
try
{
closure();
}
catch (...)
{
std::terminate();
return;
}
}
#endif
void submit(void (*closure)())
{
if (closed()) return;
closure();
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
}
try
{
closure();
}
catch (...)
{
std::terminate();
return;
}
}

template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
if (closed()) return;
closure();
{
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
}
try
{
closure();
}
catch (...)
{
std::terminate();
return;
}
}

/**
Expand Down
35 changes: 15 additions & 20 deletions include/boost/thread/executors/loop_executor.hpp
Expand Up @@ -31,7 +31,7 @@ namespace executors
typedef executors::work work;
private:
/// the thread safe work queue
sync_queue<work > work_queue;
concurrent::sync_queue<work > work_queue;

public:
/**
Expand All @@ -51,12 +51,9 @@ namespace executors
}
return false;
}
catch (std::exception& )
{
return false;
}
catch (...)
{
std::terminate();
return false;
}
}
Expand All @@ -74,19 +71,7 @@ namespace executors
}


/**
* The main loop of the worker thread
*/
void worker_thread()
{
while (!closed())
{
schedule_one_or_yield();
}
while (try_executing_one())
{
}
}


public:
/// loop_executor is not copyable.
Expand All @@ -112,9 +97,19 @@ namespace executors
}

/**
* loop
* The main loop of the worker thread
*/
void loop() { worker_thread(); }
void loop()
{
while (!closed())
{
schedule_one_or_yield();
}
while (try_executing_one())
{
}
}

/**
* \b Effects: close the \c loop_executor for submissions.
* The loop will work until there is no more closures to run.
Expand Down
68 changes: 68 additions & 0 deletions include/boost/thread/executors/scheduled_thread_pool.hpp
@@ -0,0 +1,68 @@
// Copyright (C) 2014 Ian Forbed
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_THREAD_EXECUTORS_SCHEDULED_THREAD_POOL_HPP
#define BOOST_THREAD_EXECUTORS_SCHEDULED_THREAD_POOL_HPP

#include <boost/thread/executors/detail/scheduled_executor_base.hpp>

namespace boost
{
namespace executors
{

class scheduled_thread_pool : public detail::scheduled_executor_base
{
private:
thread_group _workers;
public:

scheduled_thread_pool(size_t num_threads) : super()
{
for(size_t i = 0; i < num_threads; i++)
{
_workers.create_thread(bind(&super::loop, this));
}
}

~scheduled_thread_pool()
{
this->close();
_workers.join_all();
}

private:
typedef detail::scheduled_executor_base super;
inline void loop();
}; //end class

void scheduled_thread_pool::loop()
{
try
{
for(;;)
{
super::work task;
queue_op_status st = super::_workq.wait_pull(task);
if (st == queue_op_status::closed) return;
task();
}
}
catch (...)
{
std::terminate();
return;
}
}

} //end executors namespace

using executors::scheduled_thread_pool;

} //end boost
#endif

259 changes: 259 additions & 0 deletions include/boost/thread/executors/scheduler.hpp
@@ -0,0 +1,259 @@
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_THREAD_EXECUTORS_SCHEDULER_HPP
#define BOOST_THREAD_EXECUTORS_SCHEDULER_HPP

#include <boost/thread/detail/config.hpp>
#include <boost/thread/executors/detail/scheduled_executor_base.hpp>

#include <boost/chrono/system_clocks.hpp>

#include <boost/config/abi_prefix.hpp>

namespace boost
{
namespace executors
{
/// Wraps the reference to an executor and a function to make a work that submit the function using the executor.
template <class Executor, class Function>
class resubmitter
{
public:
resubmitter(Executor& ex, Function funct) :
ex(ex),
funct(boost::move(funct))
{}

void operator()()
{
ex.submit(funct);
}

private:
Executor& ex;
Function funct;
};

/// resubmitter factory
template <class Executor, class Function>
resubmitter<Executor, typename decay<Function>::type>
resubmit(Executor& ex, BOOST_THREAD_FWD_REF(Function) funct) {
return resubmitter<Executor, typename decay<Function>::type >(ex, boost::move(funct));
}

/// Wraps references to a @c Scheduler and an @c Executor providing an @c Executor that
/// resubmit the function using the referenced Executor at a given @c time_point known at construction.
template <class Scheduler, class Executor>
class resubmit_at_executor
{
public:
typedef chrono::steady_clock clock;

resubmit_at_executor(Scheduler& sch, Executor& ex, clock::time_point const& tp) :
sch(sch),
ex(ex),
tp(tp),
is_closed(false)
{
}

~resubmit_at_executor()
{
close();
}

template <class Work>
void submit(BOOST_THREAD_FWD_REF(Work) w)
{
if (closed())
{
BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
}
sch.submit_at(resubmit(ex,boost::forward<Work>(w)), tp);
}

Executor& underlying_executor()
{
return ex;
}
Scheduler& underlying_scheduler()
{
return sch;
}

void close()
{
is_closed = true;
}

bool closed()
{
return is_closed || sch.closed() || ex.closed();
}

private:
Scheduler& sch;
Executor& ex;
clock::time_point tp;
bool is_closed;
};


/// Expression template helper storing a pair of references to an @c Scheduler and an @c Executor
/// It provides factory helper functions such as at/after that convert these a pair of @c Scheduler @c Executor
/// into an new @c Executor that submit the work using the referenced @c Executor at/after a specific time/duration
/// respectively, using the referenced @Scheduler.
template <class Scheduler, class Executor>
class scheduler_executor_wrapper
{
public:
typedef chrono::steady_clock clock;
typedef resubmit_at_executor<Scheduler, Executor> the_executor;

scheduler_executor_wrapper(Scheduler& sch, Executor& ex) :
sch(sch),
ex(ex)
{}

~scheduler_executor_wrapper()
{
}

Executor& underlying_executor()
{
return ex;
}
Scheduler& underlying_scheduler()
{
return sch;
}

template <class Duration>
the_executor after(Duration const& rel_time)
{
return at(clock::now() + rel_time );
}

the_executor at(clock::time_point const& abs_time)
{
return the_executor(sch, ex, abs_time);
}

private:
Scheduler& sch;
Executor& ex;
}; //end class

/// Wraps a reference to a @c Scheduler providing an @c Executor that
/// run the function at a given @c time_point known at construction.
template <class Scheduler>
class at_executor
{
public:
typedef chrono::steady_clock clock;

at_executor(Scheduler& sch, clock::time_point const& tp) :
sch(sch),
tp(tp),
is_closed(false)
{}

~at_executor()
{
close();
}

Scheduler& underlying_scheduler()
{
return sch;
}

void close()
{
is_closed = true;
}

bool closed()
{
return is_closed || sch.closed();
}

template <class Work>
void submit(BOOST_THREAD_FWD_REF(Work) w)
{
if (closed())
{
BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
}
sch.submit_at(boost::forward<Work>(w), tp);
}

template <class Executor>
resubmit_at_executor<Scheduler, Executor> on(Executor& ex)
{
return resubmit_at_executor<Scheduler, Executor>(sch, ex, tp);
}

private:
Scheduler& sch;
clock::time_point tp;
bool is_closed;
}; //end class

/// A @c Scheduler using a specific thread. Note that a Scheduler is not an Executor.
/// It provides factory helper functions such as at/after that convert a @c Scheduler into an @c Executor
/// that submit the work at/after a specific time/duration respectively.
class scheduler : public detail::scheduled_executor_base
{
public:
typedef chrono::steady_clock clock;
typedef clock::time_point time_point;

scheduler()
: super(),
thr(&super::loop, this) {}

~scheduler()
{
this->close();
thr.join();
}
template <class Ex>
scheduler_executor_wrapper<scheduler, Ex> on(Ex& ex)
{
return scheduler_executor_wrapper<scheduler, Ex>(*this, ex);
}

template <class Duration>
at_executor<scheduler> after(Duration const& rel_time)
{
return at(rel_time + clock::now());
}

at_executor<scheduler> at(time_point const& tp)
{
return at_executor<scheduler>(*this, tp);
}

private:
typedef detail::scheduled_executor_base super;
thread thr;
};


}
using executors::resubmitter;
using executors::resubmit;
using executors::resubmit_at_executor;
using executors::scheduler_executor_wrapper;
using executors::at_executor;
using executors::scheduler;
}

#include <boost/config/abi_suffix.hpp>

#endif
72 changes: 72 additions & 0 deletions include/boost/thread/executors/scheduling_adaptor.hpp
@@ -0,0 +1,72 @@
// Copyright (C) 2014 Ian Forbed
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#ifndef BOOST_THREAD_EXECUTORS_SCHEDULING_ADAPTOR_HPP
#define BOOST_THREAD_EXECUTORS_SCHEDULING_ADAPTOR_HPP

#include <boost/thread/executors/detail/scheduled_executor_base.hpp>

namespace boost
{
namespace executors
{

template <typename Executor>
class scheduling_adpator : public detail::scheduled_executor_base
{
private:
Executor& _exec;
thread _scheduler;
public:

scheduling_adpator(Executor& ex)
: super(),
_exec(ex),
_scheduler(&scheduling_adpator::loop, this) {}

~scheduling_adpator()
{
this->close();
_scheduler.join();
}

Executor& underlying_executor()
{
return _exec;
}

private:
typedef detail::scheduled_executor_base super;
void loop();
}; //end class

template<typename Executor>
void scheduling_adpator<Executor>::loop()
{
try
{
for(;;)
{
super::work task;
queue_op_status st = super::_workq.wait_pull(task);
if (st == queue_op_status::closed) return;
_exec.submit(task);
}
}
catch (...)
{
std::terminate();
return;
}
}

} //end executors

using executors::scheduling_adpator;

} //end boost
#endif
18 changes: 5 additions & 13 deletions include/boost/thread/executors/serial_executor.hpp
Expand Up @@ -33,7 +33,7 @@ namespace executors
typedef scoped_thread<> thread_t;

/// the thread safe work queue
sync_queue<work > work_queue;
concurrent::sync_queue<work > work_queue;
generic_executor_ref ex;
thread_t thr;

Expand All @@ -43,7 +43,7 @@ namespace executors
try_executing_one_task(work& task, boost::promise<void> &p)
: task(task), p(p) {}
void operator()() {
task(); // if task() throws promise is not set but as the the program terminates and should terminate there is no need to use try-catch here.
task();
p.set_value();
}
};
Expand All @@ -52,7 +52,7 @@ namespace executors
* \par Returns
* The underlying executor wrapped on a generic executor reference.
*/
generic_executor_ref underlying_executor() BOOST_NOEXCEPT { return ex; }
generic_executor_ref& underlying_executor() BOOST_NOEXCEPT { return ex; }

/**
* Effects: try to execute one task.
Expand All @@ -69,22 +69,14 @@ namespace executors
boost::promise<void> p;
try_executing_one_task tmp(task,p);
ex.submit(tmp);
// ex.submit([&task, &p]()
// {
// task(); // if task() throws promise is not set but as the the program terminates and should terminate there is no need to use try-catch here.
// p.set_value();
// });
p.get_future().wait();
return true;
}
return false;
}
catch (std::exception& )
{
return false;
}
catch (...)
{
std::terminate();
return false;
}
}
Expand Down Expand Up @@ -136,7 +128,7 @@ namespace executors
*/
~serial_executor()
{
// signal to all the worker thread that there will be no more submissions.
// signal to the worker thread that there will be no more submissions.
close();
}

Expand Down
36 changes: 28 additions & 8 deletions include/boost/thread/executors/thread_executor.hpp
Expand Up @@ -15,6 +15,8 @@
#include <boost/thread/executors/work.hpp>
#include <boost/thread/executors/executor.hpp>
#include <boost/thread/thread_only.hpp>
#include <boost/thread/scoped_thread.hpp>
#include <boost/thread/csbl/vector.hpp>

#include <boost/config/abi_prefix.hpp>

Expand All @@ -28,6 +30,11 @@ namespace executors
/// type-erasure to store the works to do
typedef executors::work work;
bool closed_;
typedef scoped_thread<> thread_t;
typedef csbl::vector<thread_t> threads_type;
threads_type threads_;
mutable mutex mtx_;

/**
* Effects: try to execute one task.
* Returns: whether a task has been executed.
Expand All @@ -52,14 +59,15 @@ namespace executors
{
}
/**
* \b Effects: Destroys the inline executor.
* \b Effects: Waits for closures (if any) to complete, then joins and destroys the threads.
*
* \b Synchronization: The completion of all the closures happen before the completion of the \c thread_executor destructor.
*/
~thread_executor()
{
// signal to all the worker thread that there will be no more submissions.
close();
// all the scoped threads will join before destroying
}

/**
Expand All @@ -68,16 +76,22 @@ namespace executors
*/
void close()
{
lock_guard<mutex> lk(mtx_);
closed_ = true;
}

/**
* \b Returns: whether the pool is closed for submissions.
*/
bool closed()
bool closed(lock_guard<mutex>& )
{
return closed_;
}
bool closed()
{
lock_guard<mutex> lk(mtx_);
return closed(lk);
}

/**
* \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
Expand All @@ -95,24 +109,30 @@ namespace executors
template <typename Closure>
void submit(Closure & closure)
{
if (closed()) return;
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
threads_.reserve(threads_.size() + 1);
thread th(closure);
th.detach();
threads_.push_back(thread_t(boost::move(th)));
}
#endif
void submit(void (*closure)())
{
if (closed()) return;
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
threads_.reserve(threads_.size() + 1);
thread th(closure);
th.detach();
threads_.push_back(thread_t(boost::move(th)));
}

template <typename Closure>
void submit(BOOST_THREAD_FWD_REF(Closure) closure)
{
if (closed()) return;
lock_guard<mutex> lk(mtx_);
if (closed(lk)) BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
threads_.reserve(threads_.size() + 1);
thread th(boost::forward<Closure>(closure));
th.detach();
threads_.push_back(thread_t(boost::move(th)));
}

/**
Expand Down
708 changes: 1 addition & 707 deletions include/boost/thread/sync_bounded_queue.hpp

Large diffs are not rendered by default.

649 changes: 1 addition & 648 deletions include/boost/thread/sync_queue.hpp

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions test/Jamfile.v2
Expand Up @@ -682,6 +682,25 @@ rule thread-compile ( sources : reqs * : name )
[ thread-run2-noit ./sync/mutual_exclusion/sync_bounded_queue/multi_thread_pass.cpp : sync_bounded_q_multi_thread_p ]
;

test-suite ts_sync_pq
:
[ thread-run2-noit ./sync/mutual_exclusion/sync_pq/pq_single_thread_pass.cpp : sync_pq_single_thread_p ]
[ thread-run2-noit ./sync/mutual_exclusion/sync_pq/pq_multi_thread_pass.cpp : sync_pq_multi_thread_p ]
;

test-suite ts_sync_tq
:
[ thread-run2-noit ./sync/mutual_exclusion/sync_pq/tq_single_thread_pass.cpp : sync_tq_single_thread_p ]
#[ thread-run2-noit ./sync/mutual_exclusion/sync_pq/tq_multi_thread_pass.cpp : sync_tq_multi_thread_p ]
;

test-suite ts_scheduler
:
[ thread-run2-noit ./test_scheduled_tp.cpp : test_scheduled_tp_p ]
[ thread-run2-noit ./test_scheduling_adaptor.cpp : test_scheduling_adaptor_p ]
[ thread-run2-noit ./test_scheduler.cpp : test_scheduler_p ]
;

test-suite ts_queue_views
:
[ thread-run2-noit ./sync/mutual_exclusion/queue_views/single_thread_pass.cpp : queue_views__single_thread_p ]
Expand Down
215 changes: 215 additions & 0 deletions test/sync/mutual_exclusion/sync_pq/pq_multi_thread_pass.cpp
@@ -0,0 +1,215 @@
// Copyright (C) 2014 Ian Forbed
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/config.hpp>
#if ! defined BOOST_NO_CXX11_DECLTYPE
#define BOOST_RESULT_OF_USE_DECLTYPE
#endif

#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_EXECUTORS

#include <exception>

#include <boost/thread/thread.hpp>
#include <boost/thread/barrier.hpp>
#include <boost/thread/concurrent_queues/sync_priority_queue.hpp>

#include <boost/core/lightweight_test.hpp>

typedef boost::concurrent::sync_priority_queue<int> sync_pq;

int call_pull(sync_pq* q, boost::barrier* go)
{
go->wait();
return q->pull();

}

void call_push(sync_pq* q, boost::barrier* go, int val)
{
go->wait();
q->push(val);
}

void test_pull(const int n)
{
sync_pq pq;
BOOST_TEST(pq.empty());
for(int i = 0; i < n; i++)
{
pq.push(i);
}
BOOST_TEST(!pq.empty());
BOOST_TEST_EQ(pq.size(), n);
pq.close();
BOOST_TEST(pq.closed());
boost::barrier b(n);
boost::thread_group tg;
for(int i = 0; i < n; i++)
{
tg.create_thread(boost::bind(call_pull, &pq, &b));
}
tg.join_all();
BOOST_TEST(pq.empty());
}

void test_push(const int n)
{
sync_pq pq;
BOOST_TEST(pq.empty());

boost::barrier b(n);
boost::thread_group tg;
for(int i = 0; i < n; i++)
{
tg.create_thread(boost::bind(call_push, &pq, &b, i));
}
tg.join_all();
BOOST_TEST(!pq.empty());
BOOST_TEST_EQ(pq.size(), n);
}

void test_both(const int n)
{
sync_pq pq;
BOOST_TEST(pq.empty());

boost::barrier b(2*n);
boost::thread_group tg;
for(int i = 0; i < n; i++)
{
tg.create_thread(boost::bind(call_pull, &pq, &b));
tg.create_thread(boost::bind(call_push, &pq, &b, i));
}
tg.join_all();
BOOST_TEST(pq.empty());
BOOST_TEST_EQ(pq.size(), 0);
}

void push_range(sync_pq* q, const int begin, const int end)
{
for(int i = begin; i < end; i++)
q->push(i);
}

void atomic_pull(sync_pq* q, boost::atomic<int>* sum)
{
while(1)
{
try{
const int val = q->pull();
sum->fetch_add(val);
}
catch(std::exception& e ){
break;
}
}
}

/**
* This test computes the sum of the first N integers upto $limit using
* $n threads for the push operation and $n threads for the pull and count
* operation. The push operation push a range of numbers on the queue while
* the pull operation pull from the queue and increments an atomic int.
* At the end of execution the value of atomic<int> $sum should be the same
* as n*(n+1)/2 as this is the closed form solution to this problem.
*/
void compute_sum(const int n)
{
const int limit = 1000;
sync_pq pq;
BOOST_TEST(pq.empty());
boost::atomic<int> sum(0);
boost::thread_group tg1;
boost::thread_group tg2;
for(int i = 0; i < n; i++)
{
tg1.create_thread(boost::bind(push_range, &pq, i*(limit/n)+1, (i+1)*(limit/n)+1));
tg2.create_thread(boost::bind(atomic_pull, &pq, &sum));
}
tg1.join_all();
pq.close(); //Wait until all enqueuing is done before closing.
BOOST_TEST(pq.closed());
tg2.join_all();
BOOST_TEST(pq.empty());
BOOST_TEST_EQ(sum.load(), limit*(limit+1)/2);
}

void move_between_queues(sync_pq* q1, sync_pq* q2)
{
while(1){
try{
const int val = q1->pull();
q2->push(val);
}
catch(std::exception& e){
break;
}
}
}

/**
* This test computes the sum of the first N integers upto $limit by moving
* numbers between 2 sync_priority_queues. A range of numbers are pushed onto
* one queue by $n threads while $n threads pull from this queue and push onto
* another sync_pq. At the end the main thread ensures the the values in the
* second queue are in proper order and then sums all the values from this
* queue. The sum should match n*(n+1)/2, the closed form solution to this
* problem.
*/
void sum_with_moving(const int n)
{
const int limit = 1000;
sync_pq pq1;
sync_pq pq2;
BOOST_TEST(pq1.empty());
BOOST_TEST(pq2.empty());
boost::thread_group tg1;
boost::thread_group tg2;
for(int i = 0; i < n; i++)
{
tg1.create_thread(boost::bind(push_range, &pq1, i*(limit/n)+1, (i+1)*(limit/n)+1));
tg2.create_thread(boost::bind(move_between_queues, &pq1, &pq2));
}
tg1.join_all();
pq1.close(); //Wait until all enqueuing is done before closing.
BOOST_TEST(pq1.closed());
tg2.join_all();
BOOST_TEST(pq1.empty());
BOOST_TEST(!pq2.empty());
int sum = 0;
for(int i = 1000; i > 0; i--){
const int val = pq2.pull();
BOOST_TEST_EQ(i,val);
sum += val;
}
BOOST_TEST(pq2.empty());
BOOST_TEST_EQ(sum, limit*(limit+1)/2);
}

int main()
{
for(int i = 1; i <= 64; i *= 2)
{
test_pull(i);
test_push(i);
test_both(i);
}
//These numbers must divide 1000
compute_sum(1);
compute_sum(4);
compute_sum(10);
compute_sum(25);
compute_sum(50);
sum_with_moving(1);
sum_with_moving(4);
sum_with_moving(10);
sum_with_moving(25);
sum_with_moving(50);
return boost::report_errors();
}
429 changes: 429 additions & 0 deletions test/sync/mutual_exclusion/sync_pq/pq_single_thread_pass.cpp

Large diffs are not rendered by default.

155 changes: 155 additions & 0 deletions test/sync/mutual_exclusion/sync_pq/tq_single_thread_pass.cpp
@@ -0,0 +1,155 @@
// Copyright (C) 2014 Ian Forbed
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/config.hpp>
#if ! defined BOOST_NO_CXX11_DECLTYPE
#define BOOST_RESULT_OF_USE_DECLTYPE
#endif

#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_EXECUTORS

#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <boost/function.hpp>
#include <boost/thread/concurrent_queues/sync_timed_queue.hpp>
#include <boost/thread/executors/work.hpp>

#include <boost/core/lightweight_test.hpp>

using namespace boost::chrono;

typedef boost::concurrent::sync_timed_queue<int> sync_tq;

void test_all()
{
sync_tq pq;
BOOST_TEST(pq.empty());
BOOST_TEST(!pq.closed());
BOOST_TEST_EQ(pq.size(), 0);

for(int i = 1; i <= 5; i++){
pq.push(i, milliseconds(i*100));
BOOST_TEST(!pq.empty());
BOOST_TEST_EQ(pq.size(), i);
}

for(int i = 6; i <= 10; i++){
pq.push(i,steady_clock::now() + milliseconds(i*100));
BOOST_TEST(!pq.empty());
BOOST_TEST_EQ(pq.size(), i);
}

for(int i = 1; i <= 10; i++){
int val = pq.pull();
BOOST_TEST_EQ(val, i);
}

int val;
boost::queue_op_status st = pq.nonblocking_pull(val);
BOOST_TEST(boost::queue_op_status::empty == st);

BOOST_TEST(pq.empty());
pq.close();
BOOST_TEST(pq.closed());
}

void test_all_with_try()
{
sync_tq pq;
BOOST_TEST(pq.empty());
BOOST_TEST(!pq.closed());
BOOST_TEST_EQ(pq.size(), 0);

for(int i = 1; i <= 5; i++){
boost::queue_op_status st = pq.try_push(i, milliseconds(i*100));
BOOST_TEST(st == boost::queue_op_status::success );
BOOST_TEST(!pq.empty());
BOOST_TEST_EQ(pq.size(), i);
}

for(int i = 6; i <= 10; i++){
boost::queue_op_status st = pq.try_push(i,steady_clock::now() + milliseconds(i*100));
BOOST_TEST(st == boost::queue_op_status::success );
BOOST_TEST(!pq.empty());
BOOST_TEST_EQ(pq.size(), i);
}

for(int i = 1; i <= 10; i++){
int val=0;
boost::queue_op_status st = pq.wait_pull(val);
BOOST_TEST(st == boost::queue_op_status::success );
BOOST_TEST_EQ(val, i);
}

int val;
boost::queue_op_status st = pq.nonblocking_pull(val);
BOOST_TEST(st == boost::queue_op_status::empty );

BOOST_TEST(pq.empty());
pq.close();
BOOST_TEST(pq.closed());
}

void func(steady_clock::time_point pushed, steady_clock::duration dur)
{
BOOST_TEST(pushed + dur <= steady_clock::now());
}
void func2()
{
BOOST_TEST(false);
}

/**
* This test ensures that when items come of the front of the queue
* that at least $dur has elapsed.
*/
void test_deque_times()
{
boost::concurrent::sync_timed_queue<boost::function<void()> > tq;
for(int i = 0; i < 10; i++)
{
steady_clock::duration d = milliseconds(i*100);
boost::function<void()> fn = boost::bind(func, steady_clock::now(), d);
tq.push(fn, d);
}
while(!tq.empty())
{
boost::function<void()> fn = tq.pull();
fn();
}
}

/**
* This test ensures that when items come of the front of the queue
* that at least $dur has elapsed.
*/
#if 0
void test_deque_times2()
{
boost::concurrent::sync_timed_queue<boost::executors::work> tq;
for(int i = 0; i < 10; i++)
{
steady_clock::duration d = milliseconds(i*100);
tq.push(func2, d);
}
while(!tq.empty())
{
boost::executors::work fn = tq.pull();
fn();
}
}
#endif

int main()
{
test_all();
test_all_with_try();
test_deque_times();
//test_deque_times2(); // rt fails
return boost::report_errors();
}
Expand Up @@ -72,7 +72,7 @@ void test_bind() {
BOOST_TEST(c == 345);
}

#if defined(BOOST_NO_VARIADIC_TEMPLATES)
#if defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES)
void test_bind_non_const() {
std::cout << "c++11 variadic templates disabled" << std::endl;
}
Expand Down
5 changes: 5 additions & 0 deletions test/test_10340.cpp
@@ -1,3 +1,8 @@
// Copyright (C) 2014 Vicente Botet
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#define BOOST_THREAD_VERSION 4

#include <boost/thread/future.hpp>
Expand Down
5 changes: 5 additions & 0 deletions test/test_9192.cpp
@@ -1,3 +1,8 @@
// Copyright (C) 2014 Vicente Botet
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <boost/thread.hpp>
Expand Down
7 changes: 6 additions & 1 deletion test/test_9303.cpp
@@ -1,3 +1,8 @@
// Copyright (C) 2014 Vicente Botet
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#define BOOST_THREAD_VERSION 4
#include <iostream>
#include <fstream>
Expand Down Expand Up @@ -81,7 +86,7 @@

#if defined EXAMPLE_3
//! Doesn't compile in C++03.
//! error: variable ‘boost::packaged_task<std::basic_string<char>(std::basic_string<char>&)> example’ has initializer but incomplete type
//! error: variable âboost::packaged_task<std::basic_string<char>(std::basic_string<char>&)> exampleâ has initializer but incomplete type

{
boost::packaged_task<std::string(std::string&)> example(string_with_params);
Expand Down
4 changes: 4 additions & 0 deletions test/test_9711.cpp
@@ -1,3 +1,7 @@
// Copyright (C) 2014 Vicente Botet
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#define BOOST_THREAD_PROVIDES_FUTURE
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
Expand Down
89 changes: 89 additions & 0 deletions test/test_scheduled_tp.cpp
@@ -0,0 +1,89 @@
// Copyright (C) 2014 Ian Forbed
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/config.hpp>
#if ! defined BOOST_NO_CXX11_DECLTYPE
#define BOOST_RESULT_OF_USE_DECLTYPE
#endif

#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_EXECUTORS

#include <boost/bind.hpp>
#include <boost/chrono.hpp>
#include <boost/chrono/chrono_io.hpp>
#include <boost/function.hpp>
#include <boost/thread/executors/scheduled_thread_pool.hpp>
#include <iostream>

#include <boost/core/lightweight_test.hpp>

using namespace boost::chrono;

typedef boost::scheduled_thread_pool scheduled_tp;

void fn(int x)
{
std::cout << x << std::endl;
}

void func(steady_clock::time_point pushed, steady_clock::duration dur)
{
BOOST_TEST(pushed + dur < steady_clock::now());
}

void test_timing(const int n)
{
//This function should take n seconds to execute.
boost::scheduled_thread_pool se(4);

for(int i = 1; i <= n; i++)
{
se.submit_after(boost::bind(fn,i), milliseconds(i*100));
}
boost::this_thread::sleep_for(boost::chrono::seconds(10));
//dtor is called here so all task will have to be executed before we return
}

void test_deque_timing()
{
boost::scheduled_thread_pool se(4);
for(int i = 0; i < 10; i++)
{
steady_clock::duration d = milliseconds(i*100);
boost::function<void()> fn = boost::bind(func,steady_clock::now(),d);
se.submit_after(fn,d);
}
}

void test_deque_multi(const int n)
{
scheduled_tp se(4);
boost::thread_group tg;
for(int i = 0; i < n; i++)
{
steady_clock::duration d = milliseconds(i*100);
boost::function<void()> fn = boost::bind(func,steady_clock::now(),d);
tg.create_thread(boost::bind(boost::mem_fn(&scheduled_tp::submit_after), &se, fn, d));
}
tg.join_all();
//dtor is called here so execution will block untill all the closures
//have been completed.
}

int main()
{
steady_clock::time_point start = steady_clock::now();
test_timing(5);
steady_clock::duration diff = steady_clock::now() - start;
BOOST_TEST(diff > milliseconds(500));
test_deque_timing();
test_deque_multi(4);
test_deque_multi(8);
test_deque_multi(16);
return boost::report_errors();
}
81 changes: 81 additions & 0 deletions test/test_scheduler.cpp
@@ -0,0 +1,81 @@
// Copyright (C) 2014 Ian Forbed
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/config.hpp>
#if ! defined BOOST_NO_CXX11_DECLTYPE
#define BOOST_RESULT_OF_USE_DECLTYPE
#endif

#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_EXECUTORS

#include <boost/thread/executors/scheduler.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <boost/chrono/chrono_io.hpp>
#include <iostream>

#include <boost/core/lightweight_test.hpp>

using namespace boost::chrono;


typedef boost::executors::basic_thread_pool thread_pool;

void fn(int x)
{
//std::cout << "[" << __LINE__ << "] " << steady_clock::now() << std::endl;
std::cout << x << std::endl;
}

void test_scheduler(const int n, boost::scheduler& sch)
{
for(int i = 1; i <= n; i++)
{
sch.submit_after(boost::bind(fn,i), seconds(i));
sch.submit_after(boost::bind(fn,i), milliseconds(i*100));
}
}

void test_after(const int n, boost::scheduler& sch)
{
for(int i = 1; i <= n; i++)
{
sch.after(seconds(i)).submit(boost::bind(fn,i));
sch.after(milliseconds(i*100)).submit(boost::bind(fn,i));
}
}

void test_at(const int n, boost::scheduler& sch)
{
for(int i = 1; i <= n; i++)
{
sch.at(steady_clock::now()+seconds(i)).submit(boost::bind(fn,i));
sch.at(steady_clock::now()+milliseconds(i*100)).submit(boost::bind(fn,i));
}
}

void test_on(const int n, boost::scheduler& sch, thread_pool& tp)
{
for(int i = 1; i <= n; i++)
{
sch.on(tp).after(seconds(i)).submit(boost::bind(fn,i));
sch.on(tp).after(milliseconds(i*100)).submit(boost::bind(fn,i));
}
}

int main()
{
thread_pool tp(4);
boost::scheduler sch;
test_scheduler(5, sch);
test_after(5, sch);
test_at(5, sch);
test_on(5, sch, tp);
boost::this_thread::sleep_for(boost::chrono::seconds(10));

return boost::report_errors();
}
54 changes: 54 additions & 0 deletions test/test_scheduling_adaptor.cpp
@@ -0,0 +1,54 @@
// Copyright (C) 2014 Ian Forbed
// Copyright (C) 2014 Vicente J. Botet Escriba
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//

#include <boost/config.hpp>
#if ! defined BOOST_NO_CXX11_DECLTYPE
#define BOOST_RESULT_OF_USE_DECLTYPE
#endif

#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_EXECUTORS

#include <boost/function.hpp>
#include <boost/thread/executors/executor.hpp>
#include <boost/thread/executors/basic_thread_pool.hpp>
#include <boost/thread/executors/scheduling_adaptor.hpp>
#include <boost/chrono/chrono_io.hpp>

#include <boost/core/lightweight_test.hpp>

using namespace boost::chrono;


typedef boost::executors::basic_thread_pool thread_pool;

void fn(int x)
{
//std::cout << "[" << __LINE__ << "] " << steady_clock::now() << std::endl;
std::cout << x << std::endl;
}

void test_timing(const int n)
{
thread_pool tp(4);
boost::scheduling_adpator<thread_pool> sa(tp);
for(int i = 1; i <= n; i++)
{
sa.submit_after(boost::bind(fn,i),seconds(i));
sa.submit_after(boost::bind(fn,i), milliseconds(i*100));
}
boost::this_thread::sleep_for(boost::chrono::seconds(10));
}

int main()
{
steady_clock::time_point start = steady_clock::now();
test_timing(5);
steady_clock::duration diff = steady_clock::now() - start;
BOOST_TEST(diff > seconds(5));
return boost::report_errors();
}
2 changes: 2 additions & 0 deletions test/winrt_init.cpp
@@ -1,3 +1,5 @@
// Copyright (C) 2014 Vicente Botet
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

Expand Down