forked from STEllAR-GROUP/hpx
-
Notifications
You must be signed in to change notification settings - Fork 0
/
guided_pool_executor.hpp
255 lines (213 loc) · 8 KB
/
guided_pool_executor.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
// Copyright (c) 2017 John Biddiscombe
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#ifndef HPX_RUNTIME_THREADS_GUIDED_POOL_EXECUTOR
#define HPX_RUNTIME_THREADS_GUIDED_POOL_EXECUTOR
#include <hpx/async.hpp>
#include <hpx/runtime/threads/executors/pool_executor.hpp>
#include <hpx/runtime/threads/detail/thread_pool_base.hpp>
#include <hpx/util/thread_description.hpp>
#include <hpx/util/thread_specific_ptr.hpp>
#include <hpx/lcos/dataflow.hpp>
#include <cstddef>
#include <cstdint>
#include <string>
#include <iostream>
#include <hpx/config/warnings_prefix.hpp>
//
#include <typeinfo>
#ifdef __GNUG__
# include <cstdlib>
# include <cxxabi.h>
#endif
// ------------------------------------------------------------------
// helper to demangle type names
// ------------------------------------------------------------------
#ifdef __GNUG__
std::string demangle(const char* name)
{
// some arbitrary value to eliminate the compiler warning
int status = -4;
std::unique_ptr<char, void(*)(void*)> res {
abi::__cxa_demangle(name, NULL, NULL, &status),
std::free
};
return (status==0) ? res.get() : name ;
}
#else
// does nothing if not g++
std::string demangle(const char* name) {
return name;
}
#endif
inline std::string print_type() { return ""; }
template <class T>
inline std::string print_type()
{
return demangle(typeid(T).name());
}
template<typename T, typename... Args>
inline std::string print_type(T&& head, Args&&... tail)
{
std::string temp = print_type<T>();
std::cout << temp << std::endl;
return print_type(std::forward<Args>(tail)...);
}
namespace hpx { namespace threads { namespace executors
{
struct bitmap_storage
{
struct tls_tag {};
static hpx::util::thread_specific_ptr<hwloc_bitmap_ptr, tls_tag> bitmap_storage_;
};
// --------------------------------------------------------------------
// Template type for a numa domain scheduling hint
template <typename... Args>
struct HPX_EXPORT pool_numa_hint {};
// Template type for a core scheduling hint
template <typename... Args>
struct HPX_EXPORT pool_core_hint {};
// --------------------------------------------------------------------
// helper : numa domain scheduling and then execution
template <typename Executor, typename NumaFunction>
struct pre_execution_domain_schedule
{
Executor executor_;
NumaFunction numa_function_;
//
template <typename F, typename ... Ts>
auto operator()(F && f, Ts &&... ts) const
{
int domain = numa_function_(ts...);
std::cout << "The numa domain is " << domain << "\n";
print_type(ts...);
// now we must forward the task on to the correct dispatch function
typedef typename util::detail::invoke_deferred_result<F, Ts...>::type
result_type;
lcos::local::futures_factory<result_type()> p(
const_cast<Executor&>(executor_),
util::deferred_call(std::forward<F>(f), std::forward<Ts>(ts)...));
p.apply(
launch::async,
threads::thread_priority_default,
threads::thread_stacksize_default,
threads::thread_schedule_hint(domain));
return p.get_future();
}
};
// --------------------------------------------------------------------
struct HPX_EXPORT guided_pool_executor_base {
public:
guided_pool_executor_base(const std::string& pool_name)
: pool_executor_(pool_name)
{}
guided_pool_executor_base(const std::string& pool_name,
thread_stacksize stacksize)
: pool_executor_(pool_name, stacksize)
{}
guided_pool_executor_base(const std::string& pool_name,
thread_priority priority,
thread_stacksize stacksize = thread_stacksize_default)
: pool_executor_(pool_name, priority, stacksize)
{}
protected:
pool_executor pool_executor_;
};
// --------------------------------------------------------------------
template <typename... Args>
struct HPX_EXPORT guided_pool_executor {};
// --------------------------------------------------------------------
// this is a guided pool executor templated over a function type
// the function type should be the one used for async calls
template <>
template <typename R, typename...Args>
struct HPX_EXPORT guided_pool_executor<pool_numa_hint<R(*)(Args...)>>
: guided_pool_executor_base
{
public:
using guided_pool_executor_base::guided_pool_executor_base;
template <typename F, typename ... Ts>
hpx::future<
typename hpx::util::detail::invoke_deferred_result<F, Ts...>::type>
async_execute(F && f, Ts &&... ts)
{
// hold onto the function until all futures have become ready
// by using a dataflow operation, then call the scheduling hint
// before passing the task onwards to the real executor
return hpx::dataflow(
util::unwrapping(
pre_execution_domain_schedule<pool_executor,
pool_numa_hint<R(*)(Args...)>>{
pool_executor_, hint_
}
),
std::forward<F>(f), std::forward<Ts>(ts)...);
};
private:
pool_numa_hint<R(*)(Args...)> hint_;
};
// --------------------------------------------------------------------
// this is a guided pool executor templated over args only
// the args should be the same as those that would be called
// for an async function or continuation. This makes it possible to
// guide a lambda rather than a full function.
template <>
template <typename...Args>
struct HPX_EXPORT guided_pool_executor<pool_numa_hint<Args...>>
: guided_pool_executor_base
{
public:
using guided_pool_executor_base::guided_pool_executor_base;
template <typename F, typename ... Ts>
hpx::future<
typename hpx::util::detail::invoke_deferred_result<F, Ts...>::type>
async_execute(F && f, Ts &&... ts) const
{
// hold onto the function until all futures have become ready
// by using a dataflow operation, then call the scheduling hint
// before passing the task onwards to the real executor
return hpx::dataflow(
util::unwrapping(
pre_execution_domain_schedule<pool_executor,
pool_numa_hint<Args...>> {
pool_executor_, hint_
}
),
std::forward<F>(f), std::forward<Ts>(ts)...);
};
private:
pool_numa_hint<Args...> hint_;
};
}}}
namespace hpx { namespace parallel { namespace execution
{
template <typename Executor>
struct executor_execution_category<
threads::executors::guided_pool_executor<Executor> >
{
typedef parallel::execution::parallel_execution_tag type;
};
template <typename Executor>
struct is_one_way_executor<
threads::executors::guided_pool_executor<Executor> >
: std::false_type
{};
template <typename Executor>
struct is_two_way_executor<
threads::executors::guided_pool_executor<Executor> >
: std::true_type
{};
template <typename Executor>
struct is_bulk_one_way_executor<
threads::executors::guided_pool_executor<Executor> >
: std::false_type
{};
template <typename Executor>
struct is_bulk_two_way_executor<
threads::executors::guided_pool_executor<Executor> >
: std::false_type
{};
}}}
#include <hpx/config/warnings_suffix.hpp>
#endif /*HPX_RUNTIME_THREADS_GUIDED_POOL_EXECUTOR*/