forked from STEllAR-GROUP/hpx
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool_executor.cpp
186 lines (158 loc) · 6.64 KB
/
pool_executor.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
// Copyright (c) 2017 Shoshana Jakobovits
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#include <hpx/exception.hpp>
#include <hpx/runtime/threads/executors/pool_executor.hpp>
#include <hpx/runtime/threads/threadmanager.hpp>
#include <hpx/util/assert.hpp>
#include <hpx/util/bind.hpp>
#include <cstddef>
#include <cstdint>
#include <string>
#include <utility>
namespace hpx { namespace threads { namespace executors
{
namespace detail
{
pool_executor::pool_executor(
std::string const& pool_name)
: pool_(hpx::threads::get_thread_manager().get_pool(pool_name))
, stacksize_(thread_stacksize_default)
, priority_(thread_priority_default)
{}
pool_executor::pool_executor(const std::string& pool_name,
thread_stacksize stacksize)
: pool_(hpx::threads::get_thread_manager().get_pool(pool_name))
, stacksize_(stacksize)
, priority_(thread_priority_default)
{}
pool_executor::pool_executor(const std::string& pool_name,
thread_priority priority, thread_stacksize stacksize)
: pool_(hpx::threads::get_thread_manager().get_pool(pool_name))
, stacksize_(stacksize)
, priority_(priority)
{}
threads::thread_result_type
pool_executor::thread_function_nullary(closure_type func)
{
// execute the actual thread function
func();
return threads::thread_result_type(threads::terminated, nullptr);
}
// Return the requested policy element
std::size_t pool_executor::get_policy_element(
threads::detail::executor_parameter p, error_code& ec) const
{
//! FIXME what is this supposed to do??
HPX_THROWS_IF(ec, bad_parameter,
"pool_executor::get_policy_element",
"requested value of invalid policy element");
return std::size_t(-1);
}
// Schedule the specified function for execution in this executor.
// Depending on the subclass implementation, this may block in some
// situations.
void pool_executor::add(closure_type&& f,
util::thread_description const& desc,
threads::thread_state_enum initial_state, bool run_now,
threads::thread_stacksize stacksize,
threads::thread_schedule_hint schedulehint,
error_code& ec)
{
std::cout << "pool executor received hint " << schedulehint << std::endl;
// create a new thread
thread_init_data data(
util::bind(
util::one_shot(
&pool_executor::thread_function_nullary),
std::move(f)),
desc);
if (stacksize == threads::thread_stacksize_default)
stacksize = stacksize_;
data.stacksize = threads::get_stack_size(stacksize);
data.priority = priority_;
threads::thread_id_type id = threads::invalid_thread_id;
pool_.create_thread(data, id, initial_state, run_now, ec);
if (ec)
return;
HPX_ASSERT(invalid_thread_id != id || !run_now);
if (&ec != &throws)
ec = make_success_code();
}
// Schedule given function for execution in this executor no sooner
// than time abs_time. This call never blocks, and may violate
// bounds on the executor's queue size.
void pool_executor::add_at(
util::steady_clock::time_point const& abs_time,
closure_type&& f, util::thread_description const& desc,
threads::thread_stacksize stacksize, error_code& ec)
{
// create a new suspended thread
thread_init_data data(
util::bind(
util::one_shot(
&pool_executor::thread_function_nullary),
std::move(f)),
desc);
if (stacksize == threads::thread_stacksize_default)
stacksize = stacksize_;
data.stacksize = threads::get_stack_size(stacksize);
data.priority = priority_;
threads::thread_id_type id = threads::invalid_thread_id;
pool_.create_thread(data, id, suspended, true, ec);
if (ec)
return;
HPX_ASSERT(invalid_thread_id != id); // would throw otherwise
// now schedule new thread for execution
pool_.set_state(abs_time, id, pending, wait_timeout,
thread_priority_normal, ec);
if (ec)
return;
if (&ec != &throws)
ec = make_success_code();
}
// Schedule given function for execution in this executor no sooner
// than time rel_time from now. This call never blocks, and may
// violate bounds on the executor's queue size.
void pool_executor::add_after(
util::steady_clock::duration const& rel_time, closure_type&& f,
util::thread_description const& desc,
threads::thread_stacksize stacksize, error_code& ec)
{
return add_at(util::steady_clock::now() + rel_time,
std::move(f), desc, stacksize, ec);
}
// Return an estimate of the number of waiting tasks.
std::uint64_t pool_executor::num_pending_closures(
error_code& ec) const
{
if (&ec != &throws)
ec = make_success_code();
std::lock_guard<mutex_type> lk(mtx_);
return pool_.get_thread_count(
unknown, thread_priority_default, std::size_t(-1), false);
}
// Reset internal (round robin) thread distribution scheme
void pool_executor::reset_thread_distribution()
{
pool_.reset_thread_distribution();
}
}
}}}
namespace hpx { namespace threads { namespace executors
{
pool_executor::pool_executor(
const std::string& pool_name)
: scheduled_executor(new detail::pool_executor(pool_name))
{}
pool_executor::pool_executor(const std::string& pool_name,
thread_stacksize stacksize)
: scheduled_executor(new detail::pool_executor(pool_name, stacksize))
{}
pool_executor::pool_executor(const std::string& pool_name,
thread_priority priority, thread_stacksize stacksize)
: scheduled_executor(
new detail::pool_executor(pool_name, priority, stacksize))
{}
}}}