Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ARROW-2479: [C++] Add ThreadPool class
* A ThreadPool class with future-returning task submission, and the ability to change number of worker threads on-the-fly * Tests for the ThreadPool class, including stress tests * A singleton thread pool for cpu-bound tasks, configured based on hardware capacity * A public API to change global thread pool capacity * Migrated the Arrow codebase to using the global thread pool (except APIs taking a `nthreads`, see below) Remaining open question: * [ ] what do we do with APIs that take a user-facing `nthreads` argument? (the Pandas conversion routines, which are able to convert/copy different columns in parallel) Author: Antoine Pitrou <antoine@python.org> Closes #1953 from pitrou/ARROW-2479-threadpool and squashes the following commits: cea94b4 <Antoine Pitrou> Fix typo 1a96830 <Antoine Pitrou> Explicitly expose std::__once_call* in SO files. 154860c <Antoine Pitrou> Adjust symbols.map ab41c7c <Antoine Pitrou> Use global thread pool in Plasma 60f1c62 <Antoine Pitrou> Add process-global thread pool ad8fa41 <Antoine Pitrou> ARROW-2479: Add ThreadPool class
- Loading branch information
Showing
9 changed files
with
654 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,275 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
#include <algorithm> | ||
#include <chrono> | ||
#include <functional> | ||
#include <thread> | ||
#include <vector> | ||
|
||
#include "arrow/test-util.h" | ||
#include "arrow/util/macros.h" | ||
#include "arrow/util/thread-pool.h" | ||
|
||
namespace arrow { | ||
namespace internal { | ||
|
||
static void sleep_for(double seconds) { | ||
std::this_thread::sleep_for( | ||
std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9))); | ||
} | ||
|
||
static void busy_wait(double seconds, std::function<bool()> predicate) { | ||
const double period = 0.001; | ||
for (int i = 0; !predicate() && i * period < seconds; ++i) { | ||
sleep_for(period); | ||
} | ||
} | ||
|
||
template <typename T> | ||
static void task_add(T x, T y, T* out) { | ||
*out = x + y; | ||
} | ||
|
||
template <typename T> | ||
static void task_slow_add(double seconds, T x, T y, T* out) { | ||
sleep_for(seconds); | ||
*out = x + y; | ||
} | ||
|
||
typedef std::function<void(int, int, int*)> AddTaskFunc; | ||
|
||
template <typename T> | ||
static T add(T x, T y) { | ||
return x + y; | ||
} | ||
|
||
template <typename T> | ||
static T slow_add(double seconds, T x, T y) { | ||
sleep_for(seconds); | ||
return x + y; | ||
} | ||
|
||
template <typename T> | ||
static T inplace_add(T& x, T y) { | ||
return x += y; | ||
} | ||
|
||
// A class to spawn "add" tasks to a pool and check the results when done | ||
|
||
class AddTester { | ||
public: | ||
explicit AddTester(int nadds) : nadds(nadds), xs(nadds), ys(nadds), outs(nadds, -1) { | ||
int x = 0, y = 0; | ||
std::generate(xs.begin(), xs.end(), [&] { | ||
++x; | ||
return x; | ||
}); | ||
std::generate(ys.begin(), ys.end(), [&] { | ||
y += 10; | ||
return y; | ||
}); | ||
} | ||
|
||
AddTester(AddTester&&) = default; | ||
|
||
void SpawnTasks(ThreadPool* pool, AddTaskFunc add_func) { | ||
for (int i = 0; i < nadds; ++i) { | ||
ASSERT_OK(pool->Spawn([=] { add_func(xs[i], ys[i], &outs[i]); })); | ||
} | ||
} | ||
|
||
void CheckResults() { | ||
for (int i = 0; i < nadds; ++i) { | ||
ASSERT_EQ(outs[i], (i + 1) * 11); | ||
} | ||
} | ||
|
||
void CheckNotAllComputed() { | ||
for (int i = 0; i < nadds; ++i) { | ||
if (outs[i] == -1) { | ||
return; | ||
} | ||
} | ||
ASSERT_TRUE(0) << "all values were computed"; | ||
} | ||
|
||
private: | ||
ARROW_DISALLOW_COPY_AND_ASSIGN(AddTester); | ||
|
||
int nadds; | ||
std::vector<int> xs; | ||
std::vector<int> ys; | ||
std::vector<int> outs; | ||
}; | ||
|
||
class TestThreadPool : public ::testing::Test { | ||
public: | ||
void TearDown() { | ||
fflush(stdout); | ||
fflush(stderr); | ||
} | ||
|
||
std::shared_ptr<ThreadPool> MakeThreadPool() { return MakeThreadPool(4); } | ||
|
||
std::shared_ptr<ThreadPool> MakeThreadPool(size_t threads) { | ||
std::shared_ptr<ThreadPool> pool; | ||
Status st = ThreadPool::Make(threads, &pool); | ||
return pool; | ||
} | ||
|
||
void SpawnAdds(ThreadPool* pool, int nadds, AddTaskFunc add_func) { | ||
AddTester add_tester(nadds); | ||
add_tester.SpawnTasks(pool, add_func); | ||
ASSERT_OK(pool->Shutdown()); | ||
add_tester.CheckResults(); | ||
} | ||
|
||
void SpawnAddsThreaded(ThreadPool* pool, int nthreads, int nadds, | ||
AddTaskFunc add_func) { | ||
// Same as SpawnAdds, but do the task spawning from multiple threads | ||
std::vector<AddTester> add_testers; | ||
std::vector<std::thread> threads; | ||
for (int i = 0; i < nthreads; ++i) { | ||
add_testers.emplace_back(nadds); | ||
} | ||
for (auto& add_tester : add_testers) { | ||
threads.emplace_back([&] { add_tester.SpawnTasks(pool, add_func); }); | ||
} | ||
for (auto& thread : threads) { | ||
thread.join(); | ||
} | ||
ASSERT_OK(pool->Shutdown()); | ||
for (auto& add_tester : add_testers) { | ||
add_tester.CheckResults(); | ||
} | ||
} | ||
}; | ||
|
||
TEST_F(TestThreadPool, ConstructDestruct) { | ||
// Stress shutdown-at-destruction logic | ||
for (size_t threads : {1, 2, 3, 8, 32, 70}) { | ||
auto pool = this->MakeThreadPool(threads); | ||
} | ||
} | ||
|
||
// Correctness and stress tests using Spawn() and Shutdown() | ||
|
||
TEST_F(TestThreadPool, Spawn) { | ||
auto pool = this->MakeThreadPool(3); | ||
SpawnAdds(pool.get(), 7, task_add<int>); | ||
} | ||
|
||
TEST_F(TestThreadPool, StressSpawn) { | ||
auto pool = this->MakeThreadPool(30); | ||
SpawnAdds(pool.get(), 1000, task_add<int>); | ||
} | ||
|
||
TEST_F(TestThreadPool, StressSpawnThreaded) { | ||
auto pool = this->MakeThreadPool(30); | ||
SpawnAddsThreaded(pool.get(), 20, 100, task_add<int>); | ||
} | ||
|
||
TEST_F(TestThreadPool, SpawnSlow) { | ||
// This checks that Shutdown() waits for all tasks to finish | ||
auto pool = this->MakeThreadPool(2); | ||
SpawnAdds(pool.get(), 7, [](int x, int y, int* out) { | ||
return task_slow_add(0.02 /* seconds */, x, y, out); | ||
}); | ||
} | ||
|
||
TEST_F(TestThreadPool, StressSpawnSlow) { | ||
auto pool = this->MakeThreadPool(30); | ||
SpawnAdds(pool.get(), 1000, [](int x, int y, int* out) { | ||
return task_slow_add(0.002 /* seconds */, x, y, out); | ||
}); | ||
} | ||
|
||
TEST_F(TestThreadPool, StressSpawnSlowThreaded) { | ||
auto pool = this->MakeThreadPool(30); | ||
SpawnAddsThreaded(pool.get(), 20, 100, [](int x, int y, int* out) { | ||
return task_slow_add(0.002 /* seconds */, x, y, out); | ||
}); | ||
} | ||
|
||
TEST_F(TestThreadPool, QuickShutdown) { | ||
AddTester add_tester(100); | ||
{ | ||
auto pool = this->MakeThreadPool(3); | ||
add_tester.SpawnTasks(pool.get(), [](int x, int y, int* out) { | ||
return task_slow_add(0.02 /* seconds */, x, y, out); | ||
}); | ||
ASSERT_OK(pool->Shutdown(false /* wait */)); | ||
add_tester.CheckNotAllComputed(); | ||
} | ||
add_tester.CheckNotAllComputed(); | ||
} | ||
|
||
TEST_F(TestThreadPool, SetCapacity) { | ||
auto pool = this->MakeThreadPool(3); | ||
ASSERT_EQ(pool->GetCapacity(), 3); | ||
ASSERT_OK(pool->SetCapacity(5)); | ||
ASSERT_EQ(pool->GetCapacity(), 5); | ||
ASSERT_OK(pool->SetCapacity(2)); | ||
// Wait for workers to wake up and secede | ||
busy_wait(0.5, [&] { return pool->GetCapacity() == 2; }); | ||
ASSERT_EQ(pool->GetCapacity(), 2); | ||
ASSERT_OK(pool->SetCapacity(5)); | ||
ASSERT_EQ(pool->GetCapacity(), 5); | ||
// Downsize while tasks are pending | ||
for (int i = 0; i < 10; ++i) { | ||
ASSERT_OK(pool->Spawn(std::bind(sleep_for, 0.01 /* seconds */))); | ||
} | ||
ASSERT_OK(pool->SetCapacity(2)); | ||
busy_wait(0.5, [&] { return pool->GetCapacity() == 2; }); | ||
ASSERT_EQ(pool->GetCapacity(), 2); | ||
// Ensure nothing got stuck | ||
ASSERT_OK(pool->Shutdown()); | ||
} | ||
|
||
// Test Submit() functionality | ||
|
||
TEST_F(TestThreadPool, Submit) { | ||
auto pool = this->MakeThreadPool(3); | ||
{ | ||
auto fut = pool->Submit(add<int>, 4, 5); | ||
ASSERT_EQ(fut.get(), 9); | ||
} | ||
{ | ||
auto fut = pool->Submit(add<std::string>, "foo", "bar"); | ||
ASSERT_EQ(fut.get(), "foobar"); | ||
} | ||
{ | ||
auto fut = pool->Submit(slow_add<int>, 0.01 /* seconds */, 4, 5); | ||
ASSERT_EQ(fut.get(), 9); | ||
} | ||
{ | ||
// Reference passing | ||
std::string s = "foo"; | ||
auto fut = pool->Submit(inplace_add<std::string>, std::ref(s), "bar"); | ||
ASSERT_EQ(fut.get(), "foobar"); | ||
ASSERT_EQ(s, "foobar"); | ||
} | ||
{ | ||
// `void` return type | ||
auto fut = pool->Submit(sleep_for, 0.001); | ||
fut.get(); | ||
} | ||
} | ||
|
||
} // namespace internal | ||
} // namespace arrow |
Oops, something went wrong.