Skip to content
Permalink
9099074416
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
341 lines (301 sloc) 12.1 KB
// Copyright 2017 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef HIGHWAYHASH_DATA_PARALLEL_H_
#define HIGHWAYHASH_DATA_PARALLEL_H_
// Portable C++11 alternative to OpenMP for data-parallel computations:
// provides low-overhead ThreadPool, plus PerThread with support for reduction.
#include <stdio.h>
#include <algorithm> // find_if
#include <atomic>
#include <condition_variable> //NOLINT
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <memory>
#include <mutex> //NOLINT
#include <thread> //NOLINT
#include <utility>
#include <vector>
#define DATA_PARALLEL_CHECK(condition) \
while (!(condition)) { \
printf("data_parallel check failed at line %d\n", __LINE__); \
abort(); \
}
namespace highwayhash {
// Highly scalable thread pool, especially suitable for data-parallel
// computations in the fork-join model, where clients need to know when all
// tasks have completed.
//
// Thread pools usually store small numbers of heterogeneous tasks in a queue.
// When tasks are identical or differ only by an integer input parameter, it is
// much faster to store just one function of an integer parameter and call it
// for each value.
//
// This thread pool can efficiently load-balance millions of tasks using an
// atomic counter, thus avoiding per-task syscalls. With 48 hyperthreads and
// 1M tasks that add to an atomic counter, overall runtime is 10-20x higher
// when using std::async, and up to 200x for a queue-based ThreadPool.
//
// Usage:
// ThreadPool pool;
// pool.Run(0, 1000000, [](const int i) { Func1(i); });
// // When Run returns, all of its tasks have finished.
//
// pool.RunTasks({Func2, Func3, Func4});
// // The destructor waits until all worker threads have exited cleanly.
class ThreadPool {
public:
// Starts the given number of worker threads and blocks until they are ready.
// "num_threads" defaults to one per hyperthread.
explicit ThreadPool(
const int num_threads = std::thread::hardware_concurrency())
: num_threads_(num_threads) {
DATA_PARALLEL_CHECK(num_threads_ > 0);
threads_.reserve(num_threads_);
for (int i = 0; i < num_threads_; ++i) {
threads_.emplace_back(ThreadFunc, this);
}
padding_[0] = 0; // avoid unused member warning.
WorkersReadyBarrier();
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator&(const ThreadPool&) = delete;
// Waits for all threads to exit.
~ThreadPool() {
StartWorkers(kWorkerExit);
for (std::thread& thread : threads_) {
thread.join();
}
}
// Runs func(i) on worker thread(s) for every i in [begin, end).
// Not thread-safe - no two calls to Run and RunTasks may overlap.
// Subsequent calls will reuse the same threads.
//
// Precondition: 0 <= begin <= end.
template <class Func>
void Run(const int begin, const int end, const Func& func) {
DATA_PARALLEL_CHECK(0 <= begin && begin <= end);
if (begin == end) {
return;
}
const WorkerCommand worker_command = (WorkerCommand(end) << 32) + begin;
// Ensure the inputs do not result in a reserved command.
DATA_PARALLEL_CHECK(worker_command != kWorkerWait);
DATA_PARALLEL_CHECK(worker_command != kWorkerExit);
// If Func is large (many captures), this will allocate memory, but it is
// still slower to use a std::ref wrapper.
task_ = func;
num_reserved_.store(0);
StartWorkers(worker_command);
WorkersReadyBarrier();
}
// Runs each task (closure, typically a lambda function) on worker thread(s).
// Not thread-safe - no two calls to Run and RunTasks may overlap.
// Subsequent calls will reuse the same threads.
//
// This is a more conventional interface for heterogeneous tasks that may be
// independent/unrelated.
void RunTasks(const std::vector<std::function<void(void)>>& tasks) {
Run(0, static_cast<int>(tasks.size()),
[&tasks](const int i) { tasks[i](); });
}
// Statically (and deterministically) splits [begin, end) into ranges and
// calls "func" for each of them. Useful when "func" involves some overhead
// (e.g. for PerThread::Get or random seeding) that should be amortized over
// a range of values. "func" is void(int chunk, uint32_t begin, uint32_t end).
template <class Func>
void RunRanges(const uint32_t begin, const uint32_t end, const Func& func) {
const uint32_t length = end - begin;
// Use constant rather than num_threads_ for machine-independent splitting.
const uint32_t chunk = std::max(1U, (length + 127) / 128);
std::vector<std::pair<uint32_t, uint32_t>> ranges; // begin/end
ranges.reserve(length / chunk + 1);
for (uint32_t i = 0; i < length; i += chunk) {
ranges.emplace_back(begin + i, begin + std::min(i + chunk, length));
}
Run(0, static_cast<int>(ranges.size()), [&ranges, func](const int i) {
func(i, ranges[i].first, ranges[i].second);
});
}
private:
// After construction and between calls to Run, workers are "ready", i.e.
// waiting on worker_start_cv_. They are "started" by sending a "command"
// and notifying all worker_start_cv_ waiters. (That is why all workers
// must be ready/waiting - otherwise, the notification will not reach all of
// them and the main thread waits in vain for them to report readiness.)
using WorkerCommand = uint64_t;
// Special values; all others encode the begin/end parameters.
static constexpr WorkerCommand kWorkerWait = 0;
static constexpr WorkerCommand kWorkerExit = ~0ULL;
void WorkersReadyBarrier() {
std::unique_lock<std::mutex> lock(mutex_);
workers_ready_cv_.wait(lock,
[this]() { return workers_ready_ == num_threads_; });
workers_ready_ = 0;
}
// Precondition: all workers are ready.
void StartWorkers(const WorkerCommand worker_command) {
std::unique_lock<std::mutex> lock(mutex_);
worker_start_command_ = worker_command;
// Workers will need this lock, so release it before they wake up.
lock.unlock();
worker_start_cv_.notify_all();
}
// Attempts to reserve and perform some work from the global range of tasks,
// which is encoded within "command". Returns after all tasks are reserved.
static void RunRange(ThreadPool* self, const WorkerCommand command) {
const int begin = command & 0xFFFFFFFF;
const int end = command >> 32;
const int num_tasks = end - begin;
// OpenMP introduced several "schedule" strategies:
// "single" (static assignment of exactly one chunk per thread): slower.
// "dynamic" (allocates k tasks at a time): competitive for well-chosen k.
// "guided" (allocates k tasks, decreases k): computing k = remaining/n
// is faster than halving k each iteration. We prefer this strategy
// because it avoids user-specified parameters.
for (;;) {
const int num_reserved = self->num_reserved_.load();
const int num_remaining = num_tasks - num_reserved;
const int my_size = std::max(num_remaining / (self->num_threads_ * 2), 1);
const int my_begin = begin + self->num_reserved_.fetch_add(my_size);
const int my_end = std::min(my_begin + my_size, begin + num_tasks);
// Another thread already reserved the last task.
if (my_begin >= my_end) {
break;
}
for (int i = my_begin; i < my_end; ++i) {
self->task_(i);
}
}
}
static void ThreadFunc(ThreadPool* self) {
// Until kWorkerExit command received:
for (;;) {
std::unique_lock<std::mutex> lock(self->mutex_);
// Notify main thread that this thread is ready.
if (++self->workers_ready_ == self->num_threads_) {
self->workers_ready_cv_.notify_one();
}
RESUME_WAIT:
// Wait for a command.
self->worker_start_cv_.wait(lock);
const WorkerCommand command = self->worker_start_command_;
switch (command) {
case kWorkerWait: // spurious wakeup:
goto RESUME_WAIT; // lock still held, avoid incrementing ready.
case kWorkerExit:
return; // exits thread
}
lock.unlock();
RunRange(self, command);
}
}
const int num_threads_;
// Unmodified after ctor, but cannot be const because we call thread::join().
std::vector<std::thread> threads_;
std::mutex mutex_; // guards both cv and their variables.
std::condition_variable workers_ready_cv_;
int workers_ready_ = 0;
std::condition_variable worker_start_cv_;
WorkerCommand worker_start_command_;
// Written by main thread, read by workers (after mutex lock/unlock).
std::function<void(int)> task_;
// Updated by workers; alignment/padding avoids false sharing.
alignas(64) std::atomic<int> num_reserved_{0};
int padding_[15];
};
// Thread-local storage with support for reduction (combining into one result).
// The "T" type must be unique to the call site because the list of threads'
// copies is a static member. (With knowledge of the underlying threads, we
// could eliminate this list and T allocations, but that is difficult to
// arrange and we prefer this to be usable independently of ThreadPool.)
//
// Usage:
// for (int i = 0; i < N; ++i) {
// // in each thread:
// T& my_copy = PerThread<T>::Get();
// my_copy.Modify();
//
// // single-threaded:
// T& combined = PerThread<T>::Reduce();
// Use(combined);
// PerThread<T>::Destroy();
// }
//
// T is duck-typed and implements the following interface:
//
// // Returns true if T is default-initialized or Destroy was called without
// // any subsequent re-initialization.
// bool IsNull() const;
//
// // Releases any resources. Postcondition: IsNull() == true.
// void Destroy();
//
// // Merges in data from "victim". Precondition: !IsNull() && !victim.IsNull().
// void Assimilate(const T& victim);
template <class T>
class PerThread {
public:
// Returns reference to this thread's T instance (dynamically allocated,
// so its address is unique). Callers are responsible for any initialization
// beyond the default ctor.
static T& Get() {
static thread_local T* t;
if (t == nullptr) {
t = new T;
static std::mutex mutex;
std::lock_guard<std::mutex> lock(mutex);
Threads().push_back(t);
}
return *t;
}
// Returns vector of all per-thread T. Used inside Reduce() or by clients
// that require direct access to T instead of Assimilating them.
// Function wrapper avoids separate static member variable definition.
static std::vector<T*>& Threads() {
static std::vector<T*> threads;
return threads;
}
// Returns the first non-null T after assimilating all other threads' T
// into it. Precondition: at least one non-null T exists (caller must have
// called Get() and initialized the result).
static T& Reduce() {
std::vector<T*>& threads = Threads();
// Find first non-null T
const auto it = std::find_if(threads.begin(), threads.end(),
[](const T* t) { return !t->IsNull(); });
if (it == threads.end()) {
abort();
}
T* const first = *it;
for (const T* t : threads) {
if (t != first && !t->IsNull()) {
first->Assimilate(*t);
}
}
return *first;
}
// Calls each thread's T::Destroy to release resources and/or prepare for
// reuse by the same threads/ThreadPool. Note that all T remain allocated
// (we need thread-independent pointers for iterating over each thread's T,
// and deleting them would leave dangling pointers in each thread, which is
// unacceptable because the same thread may call Get() again later.)
static void Destroy() {
for (T* t : Threads()) {
t->Destroy();
}
}
};
} // namespace highwayhash
#endif // HIGHWAYHASH_DATA_PARALLEL_H_