-
-
Notifications
You must be signed in to change notification settings - Fork 427
/
thread_executor_traits.hpp
296 lines (264 loc) · 11.8 KB
/
thread_executor_traits.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
// Copyright (c) 2007-2017 Hartmut Kaiser
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
/// \file parallel/executors/thread_executor_traits.hpp
#if !defined(HPX_PARALLEL_THREAD_EXECUTOR_TRAITS_AUG_07_2015_0826AM)
#define HPX_PARALLEL_THREAD_EXECUTOR_TRAITS_AUG_07_2015_0826AM
#include <hpx/config.hpp>
#if defined(HPX_HAVE_EXECUTOR_COMPATIBILITY)
#include <hpx/apply.hpp>
#include <hpx/async.hpp>
#include <hpx/lcos/when_all.hpp>
#include <hpx/parallel/algorithms/detail/predicates.hpp>
#include <hpx/parallel/executors/executor_traits.hpp>
#include <hpx/runtime/threads/thread_executor.hpp>
#include <hpx/traits/is_launch_policy.hpp>
#include <hpx/util/decay.hpp>
#include <hpx/util/deferred_call.hpp>
#include <hpx/util/range.hpp>
#include <hpx/util/unwrap.hpp>
#include <cstddef>
#include <functional>
#include <type_traits>
#include <utility>
#include <vector>
namespace hpx { namespace parallel { inline namespace v3
{
///////////////////////////////////////////////////////////////////////////
/// Specialization for executor_traits for types which conform to
/// traits::is_threads_executor<Executor>
template <typename Executor>
struct executor_traits<Executor,
typename std::enable_if<
hpx::traits::is_threads_executor<Executor>::value
>::type>
{
/// The type of the executor associated with this instance of
/// \a executor_traits
typedef Executor executor_type;
/// The category of agents created by the bulk-form execute() and
/// async_execute(). All threads::executors create parallel execution
/// agents
///
typedef parallel_execution_tag execution_category;
/// The type of future returned by async_execute(). All
/// threads::executors return hpx::future<T>.
///
template <typename T>
struct future
{
typedef hpx::future<T> type;
};
/// \brief Singleton form of asynchronous fire & forget execution agent
/// creation.
///
/// This asynchronously (fire & forget) creates a single function
/// invocation f() using the associated executor. All
/// threads::executors invoke hpx::apply(sched, f).
///
/// \param sched [in] The executor object to use for scheduling of the
/// function \a f.
/// \param f [in] The function which will be scheduled using the
/// given executor.
/// \param ts... [in] Additional arguments to use to invoke \a f.
///
template <typename Executor_, typename F, typename ... Ts>
static void post(Executor_ && sched, F && f, Ts &&... ts)
{
hpx::apply(std::forward<Executor_>(sched), std::forward<F>(f),
std::forward<Ts>(ts)...);
}
/// \brief Singleton form of asynchronous execution agent creation.
///
/// This asynchronously creates a single function invocation f() using
/// the associated executor. All threads::executors invoke
/// hpx::async(sched, f).
///
/// \param sched [in] The executor object to use for scheduling of the
/// function \a f.
/// \param f [in] The function which will be scheduled using the
/// given executor.
/// \param ts... [in] Additional arguments to use to invoke \a f.
///
/// \returns f(ts...)'s result through a future
///
template <typename Executor_, typename F, typename ... Ts>
static hpx::future<
typename hpx::util::detail::invoke_deferred_result<F, Ts...>::type
>
async_execute(Executor_ && sched, F && f, Ts &&... ts)
{
return hpx::async(std::forward<Executor_>(sched),
std::forward<F>(f), std::forward<Ts>(ts)...);
}
/// \brief Singleton form of synchronous execution agent creation.
///
/// This synchronously creates a single function invocation f() using
/// the associated executor. The execution of the supplied function
/// synchronizes with the caller. All threads::executors invoke
/// hpx::async(sched, f).get().
///
/// \param sched [in] The executor object to use for scheduling of the
/// function \a f.
/// \param f [in] The function which will be scheduled using the
/// given executor.
/// \param ts... [in] Additional arguments to use to invoke \a f.
///
/// \returns f(ts...)'s result through a future
///
template <typename Executor_, typename F, typename ... Ts>
static typename hpx::util::detail::invoke_deferred_result<F, Ts...>::type
execute(Executor_ && sched, F && f, Ts &&... ts)
{
return hpx::async(std::forward<Executor_>(sched),
std::forward<F>(f), std::forward<Ts>(ts)...).get();
}
/// \brief Bulk form of asynchronous execution agent creation
///
/// This asynchronously creates a group of function invocations f(i)
/// whose ordering is given by the execution_category associated with
/// the executor.
///
/// Here \a i takes on all values in the index space implied by shape.
/// All exceptions thrown by invocations of f(i) are reported in a
/// manner consistent with parallel algorithm execution through the
/// returned future.
///
/// \param sched [in] The executor object to use for scheduling of the
/// function \a f.
/// \param f [in] The function which will be scheduled using the
/// given executor.
/// \param shape [in] The shape objects which defines the iteration
/// boundaries for the arguments to be passed to \a f.
/// \param ts... [in] Additional arguments to use to invoke \a f.
///
/// \returns The return type of \a executor_type::async_execute if
/// defined by \a executor_type. Otherwise a vector
/// of futures holding the returned value of each invocation
/// of \a f.
///
template <typename Executor_, typename F, typename Shape,
typename ... Ts>
static std::vector<hpx::future<
typename detail::bulk_async_execute_result<F, Shape, Ts...>::type
> >
bulk_async_execute(Executor_ && sched, F && f, Shape const& shape,
Ts &&... ts)
{
std::vector<hpx::future<
typename detail::bulk_async_execute_result<
F, Shape, Ts...
>::type
> > results;
std::size_t size = hpx::util::size(shape);
results.resize(size);
static std::size_t num_tasks =
(std::min)(std::size_t(128), hpx::get_os_thread_count());
spawn(sched, results, 0, size, num_tasks, f, hpx::util::begin(shape),
ts...).get();
return results;
}
/// \cond NOINTERNAL
template <typename Executor_, typename Result, typename F,
typename Iter, typename ... Ts>
static hpx::future<void>
spawn(Executor_& sched, std::vector<hpx::future<Result> >& results,
std::size_t base, std::size_t size, std::size_t num_tasks,
F const& func, Iter it, Ts const&... ts)
{
const std::size_t num_spread = 4;
if (size > num_tasks)
{
// spawn hierarchical tasks
std::size_t chunk_size = (size + num_spread) / num_spread - 1;
chunk_size = (std::max)(chunk_size, num_tasks);
std::vector<hpx::future<void> > tasks;
tasks.reserve(num_spread);
hpx::future<void> (*spawn_func)(
Executor_&, std::vector<hpx::future<Result> >&,
std::size_t, std::size_t, std::size_t, F const&, Iter,
Ts const&...
) = &executor_traits::spawn;
while (size != 0)
{
std::size_t curr_chunk_size = (std::min)(chunk_size, size);
hpx::future<void> f = hpx::async(
spawn_func, std::ref(sched), std::ref(results), base,
curr_chunk_size, num_tasks, std::ref(func), it,
std::ref(ts)...);
tasks.push_back(std::move(f));
base += curr_chunk_size;
it = hpx::parallel::v1::detail::next(it, curr_chunk_size);
size -= curr_chunk_size;
}
HPX_ASSERT(size == 0);
return hpx::when_all(tasks);
}
// spawn all tasks sequentially
HPX_ASSERT(base + size <= results.size());
for (std::size_t i = 0; i != size; ++i, ++it)
{
results[base + i] = hpx::async(sched, func, *it, ts...);
}
return hpx::make_ready_future();
}
/// \endcond
/// \brief Bulk form of synchronous execution agent creation
///
/// This synchronously creates a group of function invocations f(i)
/// whose ordering is given by the execution_category associated with
/// the executor. The function synchronizes the execution of all
/// scheduled functions with the caller.
///
/// Here \a i takes on all values in the index space implied by shape.
/// All exceptions thrown by invocations of f(i) are reported in a
/// manner consistent with parallel algorithm execution through the
/// returned future.
///
/// \param sched [in] The executor object to use for scheduling of the
/// function \a f.
/// \param f [in] The function which will be scheduled using the
/// given executor.
/// \param shape [in] The shape objects which defines the iteration
/// boundaries for the arguments to be passed to \a f.
/// \param ts... [in] Additional arguments to use to invoke \a f.
///
/// \returns The return type of \a executor_type::execute if defined
/// by \a executor_type. Otherwise a vector holding the
/// returned value of each invocation of \a f except when
/// \a f returns void, which case void is returned.
///
template <typename Executor_, typename F, typename Shape,
typename ... Ts>
static typename detail::bulk_execute_result<F, Shape, Ts...>::type
bulk_execute(Executor_ && sched, F && f, Shape const& shape,
Ts &&... ts)
{
std::vector<hpx::future<
typename detail::bulk_async_execute_result<
F, Shape, Ts...
>::type
> > results;
for (auto const& elem: shape)
{
results.push_back(hpx::async(sched, std::forward<F>(f),
elem, ts...));
}
return hpx::util::unwrap(results);
}
/// Retrieve whether this executor has operations pending or not.
/// All threads::executors invoke sched.num_pending_closures().
///
/// \param sched [in] The executor object to use for querying the
/// number of pending tasks.
///
template <typename Executor_>
static bool has_pending_closures(Executor_ && sched)
{
return sched.num_pending_closures() != 0;
}
};
}}}
#endif
#endif