Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial import

  • Loading branch information...
commit 1a4a658bbff471514d1d54e7d5ed4121b7eee053 0 parents
@Amanieu authored
13 .gitignore
@@ -0,0 +1,13 @@
+# Compiled Object files
+*.slo
+*.lo
+*.o
+
+# Compiled Dynamic libraries
+*.so
+*.dylib
+
+# Compiled Static libraries
+*.lai
+*.la
+*.a
19 LICENSE
@@ -0,0 +1,19 @@
+Copyright (c) 2013 Amanieu d'Antras
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
390 README.md
@@ -0,0 +1,390 @@
+Async++
+=======
+
+Async++ is a lightweight concurrency framework for C++11. The concept was inspired by the [Microsoft PPL library](http://msdn.microsoft.com/en-us/library/dd492418.aspx) and the [N3428](http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2012/n3428.pdf) C++ standard proposal.
+
+Building and installing
+------------------
+
+You need a C++11 compiler to build Async++. As of this writing, only GCC 4.7 on Linux is know to support enough of C++11 to be used.
+Clang is mostly supported, but it has library issues with both libc++ and libstdc++ which will cause errors at compile-time or run-time.
+Mac OS X is probably supported, but hasn't been tested.
+Windows is not supported because C++11 isn't fully implemented yet by either GCC or MSVC.
+
+Async++ can be built as a static or shared library using the `build-static.sh` and `build-shared.sh` scripts.
+
+To use it in a program, include `<async++.h>` and link with `libasync++`:
+```
+g++ -std=c++11 -pthread test.cpp -c -o test.o -I${LIBASYNC_PATH}/include
+g++ -std=c++11 -pthread test.o -o test -L${LIBASYNC_PATH} -lasync++
+```
+
+You can control the number of threads spawned in the thread pool by setting the `LIBASYNC_NUM_THREADS` environment variable. By default `std::thread::hardware_concurrency()` is used, which is the number of core/hardware threads on the system.
+
+Basic usage
+-----------
+
+### Task objects
+
+Task objects are the central component of Async++. The `task<T>` class represents a value of type `T` which may or may not have been produced yet. `T` can be void, in which case the task simply acts as a signal to indicate whether an event has occured.
+
+The most common way to create a task is to use the `spawn()` function, which will run a given function asynchronously on a thread pool and return a task which is linked to the result of the function.
+
+The result of a task can be obtained using the `get()` member function, which will wait for the task to complete if the value is not yet ready. Note that a task object can only be read once: After `get()` is called, the task object is cleared. If you want to call `get()` multiple times, use a `shared_task<T>` instead.
+
+You can wait for a task to complete without retrieving its value (and without clearing the task object) by calling the `wait()` member function. You can also poll whether the task has completed by calling the `ready()` member function, which doesn't block.
+
+It is also possible to create a task already containing a predefined value, using the `make_task()` function. In that case the task is already considered to have completed and no waiting is necessary.
+
+Example:
+```C++
+// Create a task which runs asynchronously
+auto my_task = async::spawn([] {
+ return 42;
+});
+
+// Do other stuff while the task is not finished
+while (!my_task.ready()) {
+ // Do stuff...
+}
+
+// Wait for the task to complete without getting the result
+my_task.wait();
+
+// Wait for the task to complete and get the result
+int answer = my_task.get();
+
+// Create a task with a preset value
+auto my_task2 = async::make_task(42);
+
+// Print the value stored in the task
+std::cout << my_task2.get() << std::endl;
+```
+
+### Continuations
+
+The issue with blocking is that it is hard to reliably predict when a task will complete. It is possible to schedule a function to run immediately after a task finishes by using the `then()` member function.
+
+The function passed must take 1 parameter, which is the result of the parent task, unless the parent has `void` type, in which case the continuation function takes no parameters. It is also possible to use a function that takes a reference to the parent task rather than its result as its parameter, which is useful when dealing with exceptions.
+
+Because `then()` returns a new task object linked to the continuation function, it is possible to chain continuations by calling `then()` on the returned task object.
+
+As with `get()`, the task object is cleared once `then()` is called. Use a `shared_task<T>` to add multiple continuations to a task.
+
+Example:
+```C++
+// Spawn a task
+auto t1 = async::spawn([] {
+ return 42;
+});
+
+// Chain a value-based continuation
+auto t2 = t1.then([](int result) {
+ return result;
+});
+
+// Chain a task-based continuation
+t2.then([](task<int> parent) {
+ std::cout << parent.get() << std::endl;
+});
+
+// Equivalent code with direct chaining
+async::spawn([] {
+ return 42;
+}).then([](int result) {
+ return result;
+}).then([](task<int> parent) {
+ std::cout << parent.get() << std::endl;
+});
+```
+
+### Composition
+
+The `when_any()` function returns a task which contains the result of the first task of a set to complete. The returned task is of type `async::task<std::pair<size_t, T>>` (for `void` tasks the type is `async::task<size_t>`) where the first element of the pair is the index of the task which completed and the second element is the result of that task. The tasks can be passed directly as parameters or through a range of iterators, but they must all have the same result type.
+
+The `when_all()` function returns a task which contains the result of all the tasks in a set. There are 2 forms of this function:
+- If the set of tasks is given directly as parameters, the returned task is of type `async::task<std::tuple<T...>>` where `T` is the result types of the passed tasks (`void` types are replaced with `async::void_`).
+- If the set of tasks is given as a range or pair of iterators, the returned task is of type `async::task<std::vector<T>>` (for `void` tasks the type is `async::task<void>`) where `T` is the result type of the tasks in the range, which must be identical for all tasks.
+
+Example:
+```C++
+// Using when_any to find task which finishes first
+async::task<char> tasks[] = {
+ async::spawn([] {return 'A';}),
+ async::spawn([] {return 'B';}),
+ async::spawn([] {return 'C';})
+};
+async::when_any(tasks).then([](std::pair<size_t, char> result) {
+ std::cout << "Task " << result.first << " finished first with value "
+ << result.second << std::endl;
+});
+
+// Using when_all to combine results of multiple tasks
+auto a = async::spawn([] {return std::string("Hello ");});
+auto b = async::spawn([] {return std::string("World!");});
+async::when_all(a, b).then([](std::tuple<std::string, std::string> result) {
+ std::cout << std::get<0>(result) << std::get<1>(result) << std::endl;
+});
+
+// Alternative way to use when_any with ranges
+async::task<std::string> tasks2[] = {
+ async::spawn([] {return std::string("Hello ");}),
+ async::spawn([] {return std::string("World!");})
+};
+async::when_all(tasks2).then([](std::vector<std::string> result) {
+ std::cout << result[0] << result[1] << std::endl;
+});
+
+// Output:
+// Task 0 finished first with value A
+// Hello World!
+// Hello World!
+```
+
+### Exceptions
+
+Exceptions thrown in a task function are rethrown when that task's `get()` function is called. If a task has continuations, exceptions are propagated to the continuations in different ways depending on the type of continuations:
+- Value-based continuations, which take the result type of the previous task as parameter, are not executed if the parent task throws. Instead the parent's exception is directly copied to the result of the continuation.
+- Task-based continuations, which take a reference to the parent task as parameter, are executed even if the parent task throws. The parent's exception can be handled by calling `get()` on the parent task.
+
+`when_any()` will return an exception if the first task to finish throws an exception. If later tasks throw exceptions, they are ignored.
+
+`when_all()` will return an exception if any task throws. Note that this may cause the returned task to appear finished when other tasks have not yet finished.
+
+```C++
+async::spawn([] {
+ throw std::runtime_error("Some error");
+}).then([](int result) {
+ // This is not executed because it is a value-based continuation
+}).then([](task<void> t) {
+ // The exception from the parent task is propagated through value-based
+ // continuations and caught in task-based continuations.
+ try {
+ t.get();
+ } catch (const std::runtime_error& e) {
+ std::cout << e.what() << std::endl;
+ }
+});
+```
+
+### Cancellation
+
+Async++ does not provide built-in cancellation support, instead it provides tools that allow you to specify *interruption points* where your task can be safely canceled. The `cancellation_token` class defines a simple boolean flag which indicates whether work should be canceled. You can then use the `interruption_point()` function with a cancellation token to throw a `task_canceled` exception if the token has been set.
+
+Example:
+```C++
+// Create a token
+async::cancellation_token c;
+
+auto t = async::spawn([&c] { // Capture a reference to the token
+ // Throw an exception if the task has been canceled
+ async::interruption_point(c);
+
+ // This is equivalent to the following:
+ // if (c.is_canceled())
+ // async::cancel_current_task(); // throws async::task_canceled
+});
+
+// Set the token to cancel work
+c.cancel();
+
+// Because the task and c.cancel() are executed concurrently, the token may or
+// may not be canceled by the time the task reaches the interruption point. So
+// depending on which comes first, this may throw async::task_canceled.
+t.get();
+```
+
+### Shared tasks
+
+Normal `task<T>` objects are single-use: once `get()` or `then()` are called, they become empty and any further operation on them is an error except assigning a new task to them (from `spawn()` or from another task object). In order to use a task multiple times, it is possible to convert it to a shared task by using the `share()` member function. This causes the original task to become empty, but returns a `shared_task<T>` which can have its result retrieved multiple times and multiple continuations added to it. It also becomes copyable and assignable, as opposed to the basic task class which is move-only. The downside of shared tasks is that using them involves extra overhead due to reference counting, and task results are copied instead of moved when using `get()` or `then()`.
+
+Example:
+```C++
+// Parent task, note the use of .shared() to get a shared task
+auto t = async::spawn([] {
+ std::cout << "Parent task" << std::endl;
+}).share();
+
+// First child, using value-based continuation
+t.then([] {
+ std::cout << "Child task A" << std::endl;
+});
+
+// Second child, using task-based continuation
+t.then([](async::shared_task<void>) {
+ std::cout << "Child task B" << std::endl;
+});
+```
+
+Advanced usage
+--------------
+
+### Task unwraping
+
+Sometimes it is necessary for a task to wait for another task to complete before returning a value. For example, a task might be waiting for file I/O (wrapped in a task) to complete before it can indicate it has completed. While it would be possible to call `get()` or `wait()` on the innner task, this would cause the thread to block while waiting for the task to complete.
+
+This problem can be solved using task unwraping: when a task function returns a task object, instead of setting its result to the task object, the inner task will "replace" the outer task. This means that the outer task will complete when the inner task finishes, and will acquire the result of the inner task.
+
+Example:
+```C++
+// The outer task is task<int>, and its result is set when the inner task completes
+async::spawn([] {
+ std::cout << "Outer task" << std::endl;
+ // Return a task<int>
+ return async::spawn([] {
+ std::cout << "Inner task" << std::endl;
+ return 42;
+ });
+}).then([](int result) {
+ std::cout << "Continuation task" << std::endl;
+ std::cout << result << std::endl;
+});
+
+// Output:
+// Outer task
+// Inner task
+// Continuation task
+// 42
+```
+
+### Event tasks
+
+Sometimes it is necessary to wait for an external event to happen, or some combination of tasks to complete. Async++ allows you to define custom task objects, which can be set to any value or exception arbitrarily, by using the `event_task<T>` class. You can retrieve a task object associated with the event by calling the `get_task()` member function. The task can be set using the `set()` member function, or it can be set to an exception using the `cancel()` or `set_exception()` functions. All set functions return a bool to indicate whether the value was sucessfully set: they will return false if a value has already been set since an event can only be set once.
+
+Example:
+```C++
+// Create an event
+async::event_task<int> e;
+
+// Get a task associated with the event
+auto t = e.get_task();
+
+// Add a continuation to the task
+t.then([](int result) {
+ std::cout << result << std::endl;
+});
+
+// Set the event value, which will cause the continuation to run
+e.set(42);
+
+// To set an exception:
+// e.cancel();
+// e.set_exception(std::make_exception_ptr(async::task_canceled));
+// These are equivalent but cancel is slightly more efficient
+```
+
+### Local tasks
+
+Sometimes maximum performance is necessary and not all of the features provided by Async++ are required. The `local_task<F>` class provides a non-copyable, non-movable task type which resides entirely on the stack. It requires no memory allocations but only support a restricted set of operations: it doesn't support continuations and composition, the only operations allowed on it are `get()`, `wait()` and `ready()`, and it has an implicit `wait()` in its destructor. Because it is non-movable, it must be captured directly from the return value of `local_spawn()` by rvalue-reference, as shown in the example.
+
+Example:
+```C++
+auto&& t = async::local_spawn([] {
+ std::cout << "Local task" << std::endl;
+});
+```
+
+### Task scheduling
+
+By default Async++ uses a work-stealing scheduler with a thread pool of `LIBASYNC_NUM_THREADS` (environment variable) threads (or `std::thread::hardware_concurrency()` if not specified). The scheduler is initialized on first use, and is destroyed at program exit while ensuring that all currently running tasks finish executing before the program exits.
+
+While this scheduler is sufficient for the majority of workloads, sometimes more control is needed on the scheduling. Async++ allows you to provide a scheduler parameter to `spawn()`, `local_spawn()` and `then()` before the task function. Three schedulers are provided:
+- `default_scheduler()`: This scheduler will run tasks in a work stealing thread pool, and is the scheduler used by default.
+- `inline_scheduler()`: This scheduler will run tasks immediately in the current thread.
+- `thread_scheduler()`: This scheduler will spawn a new thread to run tasks in.
+
+Example:
+```C++
+// Spawn a task using a scheduler
+auto t = async::spawn(default_scheduler(), [] {
+ std::cout << "Running in default scheduler!" << std::endl;
+});
+
+// Spawn a continuation using a scheduler
+t.then(inline_scheduler(), [] {
+ std::cout << "Running a continuation in inline scheduler!" << std::endl;
+});
+
+// Spawn a local task using a scheduler
+auto&& t2 = async::local_spawn(thread_scheduler(), [] {
+ std::cout << "Running a local task in thread scheduler!" << std::endl;
+});
+```
+
+Note that while `default_scheduler()` will wait for all currently running tasks to complete at program exit, `thread_scheduler()` does not. You should therefore always call `get()` or `wait()` on a task spawned in a separate thread.
+
+It is possible to override the scheduler used when none is specified by setting the `LIBASYNC_DEFAULT_SCHEDULER` macro before including `<async++.h>`.
+
+Example:
+```C++
+// Forward declare custom scheduler
+class my_scheduler_impl;
+my_scheduler_impl& my_scheduler();
+
+// Override default scheduler
+#define LIBASYNC_DEFAULT_SCHEDULER my_scheduler()
+
+// Include async++.h
+#include <async++.h>
+
+// This will use the custom scheduler
+auto t = async::spawn([] {
+ std::cout << "Running in custom scheduler!" << std::endl;
+});
+```
+
+### Custom schedulers
+
+You can also define your own task scheduler by creating a class which implements the `async::scheduler` interface, which consists of a single function: `schedule()`. You can use the example below as a template.
+
+Example:
+```C++
+// Define custom scheduler
+class my_scheduler_impl: public async::scheduler {
+private:
+ // Initialize your scheduler in the constructor, called on first use.
+ my_scheduler_impl() {}
+
+ // Destroy your scheduler in the destructor, called during application exit.
+ ~my_scheduler_impl() {}
+
+ // Only allow construction and destruction from my_scheduler()
+ friend my_scheduler_impl& my_scheduler();
+
+public:
+ // Function called to schedule a task. This must either guarantee the task
+ // will be executed at some point in the future, or thrown an exception
+ // and not execute the task.
+ virtual void schedule(async::task_handle t) override final
+ {
+ // In this example we run the task directly inline, but other approaches
+ // could include queuing the task for execution in a thread pool.
+ t.run();
+ }
+};
+
+// Singleton access to my_scheduler, initialized on first use and destroyed on
+// program exit.
+my_scheduler_impl& my_scheduler()
+{
+ // C++11 guarantees thread-safe initialization, so we don't need to worry
+ // about race condition in the initialization.
+ static my_scheduler_impl sched;
+ return sched;
+}
+
+// Spawn a task using the scheduler
+auto t = async::spawn(my_scheduler(), [] {
+ std::cout << "Running in custom scheduler!" << std::endl;
+});
+
+// Spawn a continuation using the scheduler
+t.then(my_scheduler(), [] {
+ std::cout << "Running a continuation in custom scheduler!" << std::endl;
+});
+
+// Spawn a local task using the scheduler
+auto&& t2 = async::local_spawn(my_scheduler(), [] {
+ std::cout << "Running a local task in custom scheduler!" << std::endl;
+});
+```
2  build-shared.sh
@@ -0,0 +1,2 @@
+#!/bin/sh
+g++ -std=c++11 -Wall -Wextra -pedantic -pthread -Iinclude src/scheduler.cpp -fvisibility=hidden -flto -O3 -fpic -DLIBASYNC_BUILD -shared -o libasync++.so
3  build-static.sh
@@ -0,0 +1,3 @@
+#!/bin/sh
+g++ -std=c++11 -Wall -Wextra -pedantic -pthread -Iinclude src/scheduler.cpp -fvisibility=hidden -O3 -DLIBASYNC_BUILD -DLIBASYNC_STATIC -c -o scheduler.o
+ar rcs libasync++.a scheduler.o
77 include/async++.h
@@ -0,0 +1,77 @@
+// Copyright (c) 2013 Amanieu d'Antras
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+#ifndef ASYNCXX_H_
+#define ASYNCXX_H_
+
+#include <algorithm>
+#include <atomic>
+#include <exception>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+// Export declaration to make symbols visible for dll/so
+#ifdef LIBASYNC_STATIC
+# define LIBASYNC_EXPORT
+#else
+# ifdef _WIN32
+# ifdef LIBASYNC_BUILD
+# define LIBASYNC_EXPORT __declspec(dllexport)
+# else
+# define LIBASYNC_EXPORT __declspec(dllimport)
+# endif
+# else
+# define LIBASYNC_EXPORT __attribute__((visibility("default")))
+# endif
+#endif
+
+// Set this to override the default scheduler for newly created tasks. The
+// original can still be accessed through async::default_scheduler().
+#ifndef LIBASYNC_DEFAULT_SCHEDULER
+# define LIBASYNC_DEFAULT_SCHEDULER async::default_scheduler()
+#endif
+
+// Some forward declarations
+namespace async {
+
+template<typename Result> class task;
+template<typename Result> class shared_task;
+template<typename Result> class event_task;
+
+// Exception thrown by cancel_current_task()
+struct LIBASYNC_EXPORT task_canceled {};
+
+} // namespace async
+
+#include "async++/spinlock.h"
+#include "async++/traits.h"
+#include "async++/ref_count.h"
+#include "async++/scheduler_fwd.h"
+#include "async++/task_base.h"
+#include "async++/scheduler.h"
+#include "async++/task.h"
+#include "async++/when_all_any.h"
+#include "async++/cancel.h"
+
+#endif
67 include/async++/cancel.h
@@ -0,0 +1,67 @@
+// Copyright (c) 2013 Amanieu d'Antras
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+#ifndef ASYNCXX_H_
+# error "Do not include this header directly, include <async++.h> instead."
+#endif
+
+namespace async {
+
+// A flag which can be used to request cancellation
+class cancellation_token {
+ std::atomic<bool> state{false};
+
+public:
+ // Non-copyable and non-movable
+ cancellation_token() = default;
+ cancellation_token(const cancellation_token&) = delete;
+ cancellation_token(cancellation_token&&) = delete;
+ cancellation_token& operator=(const cancellation_token&) = delete;
+ cancellation_token& operator=(cancellation_token&&) = delete;
+
+ bool is_canceled() const
+ {
+ bool s = state.load(std::memory_order_relaxed);
+ if (s)
+ std::atomic_thread_fence(std::memory_order_acquire);
+ return s;
+ }
+
+ void cancel()
+ {
+ state.store(true, std::memory_order_release);
+ }
+};
+
+// Cancel the current task by throwing a task_canceled exception. This is
+// prefered to another exception type because it is handled more efficiently.
+inline void cancel_current_task()
+{
+ throw task_canceled();
+}
+
+// Interruption point, calls cancel_current_task if the specified token is set.
+inline void interruption_point(const cancellation_token& token)
+{
+ if (token.is_canceled())
+ cancel_current_task();
+}
+
+} // namespace async
121 include/async++/ref_count.h
@@ -0,0 +1,121 @@
+// Copyright (c) 2013 Amanieu d'Antras
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+#ifndef ASYNCXX_H_
+# error "Do not include this header directly, include <async++.h> instead."
+#endif
+
+namespace async {
+namespace detail {
+
+// Reference-counted object base class
+template<typename T> struct ref_count_base {
+ std::atomic<unsigned int> ref_count;
+
+ // By default the reference count is initialized to 1
+ explicit ref_count_base(unsigned int count = 1): ref_count(count) {}
+
+ void add_ref(unsigned int count = 1)
+ {
+ ref_count.fetch_add(count, std::memory_order_relaxed);
+ }
+ void release(unsigned int count = 1)
+ {
+ if (ref_count.fetch_sub(count, std::memory_order_release) == count) {
+ std::atomic_thread_fence(std::memory_order_acquire);
+ delete static_cast<T*>(this);
+ }
+ }
+ void add_ref_unlocked()
+ {
+ ref_count.store(ref_count.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed);
+ }
+};
+
+// Pointer to reference counted object, based on boost::intrusive_ptr
+template<typename T> class ref_count_ptr {
+ T* p;
+
+public:
+ // Note that this doesn't increment the reference count, instead it takes
+ // ownership of a pointer which you already own a reference to.
+ explicit ref_count_ptr(T* t): p(t) {}
+
+ ref_count_ptr(): p(nullptr) {}
+ ref_count_ptr(std::nullptr_t): p(nullptr) {}
+ ref_count_ptr(const ref_count_ptr& other)
+ : p(other.p)
+ {
+ if (p)
+ p->add_ref();
+ }
+ ref_count_ptr(ref_count_ptr&& other) noexcept
+ : p(other.p)
+ {
+ other.p = nullptr;
+ }
+ ref_count_ptr& operator=(std::nullptr_t)
+ {
+ if (p)
+ p->release();
+ p = nullptr;
+ return *this;
+ }
+ ref_count_ptr& operator=(const ref_count_ptr& other)
+ {
+ if (p)
+ p->release();
+ p = other.p;
+ if (p)
+ p->add_ref();
+ return *this;
+ }
+ ref_count_ptr& operator=(ref_count_ptr&& other) noexcept
+ {
+ std::swap(p, other.p);
+ return *this;
+ }
+ ~ref_count_ptr()
+ {
+ if (p)
+ p->release();
+ }
+
+ T& operator*() const
+ {
+ return *p;
+ }
+ T* operator->() const
+ {
+ return p;
+ }
+ T* get() const
+ {
+ return p;
+ }
+
+ explicit operator bool() const
+ {
+ return p != nullptr;
+ }
+};
+
+} // namespace detail
+} // namespace async
110 include/async++/scheduler.h
@@ -0,0 +1,110 @@
+// Copyright (c) 2013 Amanieu d'Antras
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+#ifndef ASYNCXX_H_
+# error "Do not include this header directly, include <async++.h> instead."
+#endif
+
+namespace async {
+
+// Task handle used in scheduler, acts as a unique_ptr to a task object
+class task_handle {
+ detail::task_ptr handle;
+
+ // Allow access from schedule_task
+ template<typename Sched> friend void detail::schedule_task(Sched& sched, detail::task_ptr t);
+
+public:
+ task_handle() = default;
+ task_handle(const task_handle&) = delete;
+ task_handle(task_handle&&) = default;
+ task_handle& operator=(const task_handle&) = delete;
+ task_handle& operator=(task_handle&&) = default;
+
+ explicit operator bool() const
+ {
+ return static_cast<bool>(handle);
+ }
+
+ // Run the task and release the handle
+ void run()
+ {
+ handle->execute();
+ handle = nullptr;
+ }
+};
+
+// Scheduler interface
+class scheduler {
+public:
+ // Schedule a task for execution. Failure can be indicated by throwing, but
+ // then the task must not be executed.
+ virtual void schedule(task_handle t) = 0;
+};
+
+namespace detail {
+
+// Scheduler implementations
+class default_scheduler_impl: public scheduler {
+public:
+ default_scheduler_impl();
+ ~default_scheduler_impl();
+ LIBASYNC_EXPORT virtual void schedule(task_handle t) override final;
+};
+class inline_scheduler_impl: public scheduler {
+public:
+ virtual void schedule(task_handle t) override final
+ {
+ t.run();
+ }
+};
+class thread_scheduler_impl: public scheduler {
+public:
+ virtual void schedule(task_handle t) override final
+ {
+ std::thread([](task_handle t) {
+ t.run();
+ }, std::move(t));
+ }
+};
+
+// Schedule a task for execution using its scheduler
+template<typename Sched> void schedule_task(Sched& sched, task_ptr t)
+{
+ task_handle handle;
+ handle.handle = std::move(t);
+ sched.schedule(std::move(handle));
+}
+
+} // namespace detail
+
+inline detail::inline_scheduler_impl& inline_scheduler()
+{
+ static detail::inline_scheduler_impl sched;
+ return sched;
+}
+
+inline detail::thread_scheduler_impl& thread_scheduler()
+{
+ static detail::thread_scheduler_impl sched;
+ return sched;
+}
+
+} // namespace async
61 include/async++/scheduler_fwd.h
@@ -0,0 +1,61 @@
+// Copyright (c) 2013 Amanieu d'Antras
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+#ifndef ASYNCXX_H_
+# error "Do not include this header directly, include <async++.h> instead."
+#endif
+
+namespace async {
+namespace detail {
+
+// Predefined scheduler implementations
+class default_scheduler_impl;
+class inline_scheduler_impl;
+class thread_scheduler_impl;
+
+// Reference counted pointer to task data
+struct task_base;
+typedef ref_count_ptr<task_base> task_ptr;
+
+// Helper function to schedule a task using a scheduler
+template<typename Sched> void schedule_task(Sched& sched, task_ptr t);
+
+// Wait for the given task to finish. If the calling thread is from the default
+// scheduler then it will run other tasks while waiting.
+LIBASYNC_EXPORT void wait_for_task(task_base* wait_task);
+
+} // namespace detail
+
+// Scheduler interface
+class scheduler;
+
+// Run a task in a thread pool. This scheduler will wait for all tasks to finish
+// at program exit.
+LIBASYNC_EXPORT detail::default_scheduler_impl& default_scheduler();
+
+// Run a task directly
+detail::inline_scheduler_impl& inline_scheduler();
+
+// Run a task in a separate thread. Note that this scheduler does not wait for
+// threads to finish at process exit. You must ensure that all threads finish
+// before ending the process.
+detail::thread_scheduler_impl& thread_scheduler();
+
+} // namespace async
79 include/async++/spinlock.h
@@ -0,0 +1,79 @@
+// Copyright (c) 2013 Amanieu d'Antras
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+#ifndef ASYNCXX_H_
+# error "Do not include this header directly, include <async++.h> instead."
+#endif
+
+// SSE intrinsics for _mm_pause
+#if defined(__SSE__) || _M_IX86_FP > 0
+#include <xmmintrin.h>
+#endif
+
+namespace async {
+namespace detail {
+
+// Spinlock with same interface as std::mutex
+class spinlock {
+public:
+ // Non-copyable and non-movable
+ spinlock() = default;
+ spinlock(const spinlock&) = delete;
+ spinlock(spinlock&&) = delete;
+ spinlock& operator=(const spinlock&) = delete;
+ spinlock& operator=(spinlock&&) = delete;
+
+ void lock()
+ {
+ while (!try_lock()) {
+ // If direct locking fails then spin using load() only
+ // before trying again. This saves bus traffic since the
+ // spinlock is already in the cache.
+ while (locked.load(std::memory_order_relaxed))
+ spin_pause();
+ }
+ }
+
+ bool try_lock()
+ {
+ bool expected = false;
+ return locked.compare_exchange_strong(expected, true, std::memory_order_acquire, std::memory_order_relaxed);
+ }
+
+ void unlock()
+ {
+ locked.store(false, std::memory_order_release);
+ }
+
+ // Pause for use in spinloops. On hyperthreaded CPUs, this yields to the other
+ // hardware thread. Otherwise it is simply a no-op.
+ static void spin_pause()
+ {
+#if defined(__SSE__) || _M_IX86_FP > 0
+ _mm_pause();
+#endif
+ }
+
+private:
+ std::atomic<bool> locked{false};
+};
+
+} // namespace detail
+} // namespace async
464 include/async++/task.h
@@ -0,0 +1,464 @@
+// Copyright (c) 2013 Amanieu d'Antras
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+#ifndef ASYNCXX_H_
+# error "Do not include this header directly, include <async++.h> instead."
+#endif
+
+namespace async {
+namespace detail {
+
+// Common code for task and shared_task
+template<typename Result> class basic_task {
+protected:
+ // Reference counted internal task object
+ detail::task_ptr internal_task;
+
+ // Real result type, with void turned into fake_void
+ typedef typename void_to_fake_void<Result>::type internal_result;
+
+ // Type-specific task object
+ typedef task_result<internal_result> internal_task_type;
+
+ // Friend access
+ template<typename T> friend class basic_task;
+ template<typename T> friend typename T::internal_task_type* get_internal_task(const T& t);
+
+public:
+ // Task result type
+ typedef Result result_type;
+
+ explicit operator bool() const
+ {
+ return internal_task;
+ }
+
+ // Query whether the task has finished executing
+ bool ready() const
+ {
+ if (internal_task->state.load(std::memory_order_relaxed) >= task_state::TASK_COMPLETED) {
+ std::atomic_thread_fence(std::memory_order_acquire);
+ return true;
+ } else
+ return false;
+ }
+
+ // Wait for the task to complete
+ void wait() const
+ {
+ // Catch use of uninitialized task objects
+ if (!internal_task)
+ throw std::invalid_argument("Use of empty task object");
+
+ internal_task->wait();
+ }
+
+protected:
+ // Common code for get()
+ void get_internal() const
+ {
+ // Catch use of uninitialized task objects
+ if (!internal_task)
+ throw std::invalid_argument("Use of empty task object");
+
+ // If the task was canceled, throw the associated exception
+ internal_task->wait_and_throw();
+ }
+
+ // Common code for then()
+ template<typename Sched, typename Func, typename Parent> typename continuation_traits<Parent, Func>::task_type then_internal(Sched& sched, Func&& f, Parent&& parent) const
+ {
+ // Catch use of uninitialized task objects
+ if (!internal_task)
+ throw std::invalid_argument("Use of empty task object");
+
+ // Save a copy of internal_task because it might get moved into exec_func
+ task_base* my_internal = internal_task.get();
+
+ // Create continuation
+ typedef typename continuation_traits<Parent, Func>::task_type::internal_result cont_internal_result;
+ typedef continuation_exec_func<Parent, cont_internal_result, Func, continuation_traits<Parent, Func>::is_value_cont::value, is_task<typename continuation_traits<Parent, Func>::result_type>::value> exec_func;
+ typename continuation_traits<Parent, Func>::task_type cont;
+ cont.internal_task = task_ptr(new task_func<exec_func, cont_internal_result>(exec_func(std::forward<Func>(f), std::forward<Parent>(parent))));
+
+ // Set continuation parameters
+ cont.internal_task->sched = std::addressof(sched);
+ cont.internal_task->always_cont = !continuation_traits<Parent, Func>::is_value_cont::value;
+
+ // Add the continuation to this task
+ // Avoid an expensive ref-count modification since the task isn't shared yet
+ cont.internal_task->add_ref_unlocked();
+ my_internal->add_continuation(task_ptr(cont.internal_task.get()));
+
+ return cont;
+ }
+};
+
+} // namespace detail
+
+template<typename Result> class task: public detail::basic_task<Result> {
+ // Friend access for make_task, spawn and event_task::get_task
+ template<typename T> friend task<typename std::decay<T>::type> make_task(T&& value);
+ friend task<void> make_task();
+ template<typename Sched, typename Func> friend task<typename detail::remove_task<decltype(std::declval<Func>()())>::type> spawn(Sched& sched, Func&& f);
+ friend class event_task<Result>;
+
+public:
+ // Movable but not copyable
+ task() = default;
+ task(const task&) = delete;
+ task(task&&) = default;
+ task& operator=(const task&) = delete;
+ task& operator=(task&&) = default;
+
+ // Get the result of the task
+ template<typename T = Result, typename = typename std::enable_if<std::is_void<T>::value>::type>
+ void get()
+ {
+ this->get_internal();
+ this->internal_task = nullptr;
+ }
+ template<typename T = Result, typename = typename std::enable_if<!std::is_void<T>::value>::type>
+ T get()
+ {
+ // Release internal_task after this function, but make sure we copy the result over first
+ this->get_internal();
+ detail::task_ptr my_internal = std::move(this->internal_task);
+ return static_cast<typename task::internal_task_type*>(my_internal.get())->get_result(*this);
+ }
+
+ // Add a continuation to the task
+ template<typename Sched, typename Func> auto then(Sched& sched, Func&& f) -> decltype(this->then_internal(sched, std::forward<Func>(f), std::move(*this)))
+ {
+ auto result = this->then_internal(sched, std::forward<Func>(f), std::move(*this));
+ this->internal_task = nullptr;
+ return result;
+ }
+ template<typename Func> auto then(Func&& f) -> decltype(this->then(LIBASYNC_DEFAULT_SCHEDULER, std::forward<Func>(f)))
+ {
+ return then(LIBASYNC_DEFAULT_SCHEDULER, std::forward<Func>(f));
+ }
+
+ // Create a shared_task from this task
+ shared_task<Result> share()
+ {
+ shared_task<Result> out;
+ out.internal_task = std::move(this->internal_task);
+ return out;
+ }
+};
+
+template<typename Result> class shared_task: public detail::basic_task<Result> {
+ // Friend access for task::share
+ friend class task<Result>;
+
+public:
+ // Movable and copyable
+ shared_task() = default;
+ shared_task(const shared_task&) = default;
+ shared_task(shared_task&&) = default;
+ shared_task& operator=(const shared_task&) = default;
+ shared_task& operator=(shared_task&&) = default;
+
+ // Get the result of the task
+ template<typename T = Result, typename = typename std::enable_if<std::is_void<T>::value>::type>
+ void get() const
+ {
+ this->get_internal();
+ }
+ template<typename T = Result, typename = typename std::enable_if<!std::is_void<T>::value>::type>
+ const T& get() const
+ {
+ this->get_internal();
+ return detail::get_internal_task(*this)->get_result(*this);
+ }
+
+ // Add a continuation to the task
+ template<typename Sched, typename Func> auto then(Sched& sched, Func&& f) const -> decltype(this->then_internal(sched, std::forward<Func>(f), *this))
+ {
+ return this->then_internal(sched, std::forward<Func>(f), *this);
+ }
+ template<typename Func> auto then(Func&& f) const -> decltype(this->then(LIBASYNC_DEFAULT_SCHEDULER, std::forward<Func>(f)))
+ {
+ return then(LIBASYNC_DEFAULT_SCHEDULER, std::forward<Func>(f));
+ }
+};
+
+// Special task type which can be triggered manually rather than when a function executes.
+template<typename Result> class event_task {
+ // Reference counted internal task object
+ detail::task_ptr internal_task;
+
+ // Real result type, with void turned into fake_void
+ typedef typename detail::void_to_fake_void<Result>::type internal_result;
+
+ // Type-specific task object
+ typedef detail::task_result<internal_result> internal_task_type;
+
+public:
+ // Movable but not copyable
+ event_task(const event_task&) = delete;
+ event_task(event_task&&) = default;
+ event_task& operator=(const event_task&) = delete;
+ event_task& operator=(event_task&& other)
+ {
+ // Make sure the destructor is called on the previous value
+ std::swap(internal_task, other.internal_task);
+ }
+
+ // Main constructor
+ event_task()
+ : internal_task(new internal_task_type) {}
+
+ // Cancel events if they are destroyed before they are set
+ ~event_task()
+ {
+ // This has no effect if a result is already set
+ if (internal_task)
+ cancel();
+ }
+
+ // Get a task linked to this event
+ task<Result> get_task() const
+ {
+ // Catch use of uninitialized task objects
+ if (!internal_task)
+ throw std::invalid_argument("Use of empty event_task object");
+
+ // Make sure this is only called once (ref_count == 1)
+ unsigned int expected = 1;
+ if (!internal_task->ref_count.compare_exchange_strong(expected, 2, std::memory_order_relaxed, std::memory_order_relaxed))
+ throw std::invalid_argument("event_task::get_task() called more than once");
+
+ // Ref count is now 2, no need to increment it again
+ task<Result> out;
+ out.internal_task = detail::task_ptr(internal_task.get());
+ return out;
+ }
+
+ // Set the result of the task, mark it as completed and run its continuations
+ template<typename T = Result, typename = typename std::enable_if<std::is_void<T>::value>::type> bool set()
+ {
+ return set_internal(detail::fake_void());
+ }
+ template<typename T = Result> bool set(const typename std::enable_if<!std::is_void<T>::value, Result>::type& result) const
+ {
+ return set_internal(result);
+ }
+ template<typename T = Result> bool set(typename std::enable_if<!std::is_void<T>::value && !std::is_reference<T>::value, Result>::type&& result) const
+ {
+ return set_internal(std::move(result));
+ }
+
+ // Cancel the event with an exception and cancel continuations
+ bool set_exception(std::exception_ptr except) const
+ {
+ // Catch use of uninitialized task objects
+ if (!internal_task)
+ throw std::invalid_argument("Use of empty event_task object");
+
+ // Only allow setting the value once
+ detail::task_state expected = detail::task_state::TASK_PENDING;
+ if (!internal_task->state.compare_exchange_strong(expected, detail::task_state::TASK_LOCKED, std::memory_order_relaxed, std::memory_order_relaxed))
+ return false;
+
+ // Cancel the task
+ internal_task->task_base::cancel(std::move(except));
+ return true;
+ }
+
+ // Cancel the event as if with cancel_current_task
+ bool cancel() const
+ {
+ return set_exception(nullptr);
+ }
+
+private:
+ // Common code for set()
+ template<typename T> bool set_internal(T&& result) const
+ {
+ // Catch use of uninitialized task objects
+ if (!internal_task)
+ throw std::invalid_argument("Use of empty event_task object");
+
+ // Only allow setting the value once
+ detail::task_state expected = detail::task_state::TASK_PENDING;
+ if (!internal_task->state.compare_exchange_strong(expected, detail::task_state::TASK_LOCKED, std::memory_order_relaxed, std::memory_order_relaxed))
+ return false;
+
+ try {
+ // Store the result and finish
+ static_cast<internal_task_type*>(internal_task.get())->set_result(std::forward<T>(result));
+ internal_task->finish();
+ } catch (...) {
+ // If the copy/move constructor of the result threw, save the exception.
+ // We could also return the exception to the caller, but this would
+ // cause race conditions.
+ internal_task->cancel(std::current_exception());
+ }
+ return true;
+ }
+};
+
+// Task type returned by local_spawn()
+template<typename Func> class local_task {
+ // Task result type
+ typedef typename detail::remove_task<decltype(std::declval<Func>()())>::type result_type;
+ typedef typename detail::void_to_fake_void<result_type>::type internal_result;
+
+ // Task execution function type
+ typedef detail::root_exec_func<internal_result, Func, detail::is_task<decltype(std::declval<Func>()())>::value> exec_func;
+
+ // Task object embedded directly. The ref-count is initialized to 1 so it
+ // will never be freed using delete, only in destructor.
+ detail::task_func<exec_func, internal_result> internal_task;
+
+ // Friend access for local_spawn
+ template<typename Sched, typename F> friend local_task<F> local_spawn(Sched& sched, F&& f);
+ template<typename F> friend local_task<F> local_spawn(F&& f);
+
+ // Constructor, used by local_spawn
+ template<typename Sched> local_task(Sched& sched, Func&& f)
+ : internal_task(exec_func(std::forward<Func>(f)))
+ {
+ // Avoid an expensive ref-count modification since the task isn't shared yet
+ internal_task.add_ref_unlocked();
+ detail::schedule_task(sched, detail::task_ptr(&internal_task));
+ }
+
+public:
+ // Non-movable and non-copyable
+ local_task(const local_task&) = delete;
+ local_task(local_task&&) = delete;
+ local_task& operator=(const local_task&) = delete;
+ local_task& operator=(local_task&&) = delete;
+
+ // Wait for the task to complete when destroying
+ ~local_task()
+ {
+ wait();
+
+ // Now spin until the reference count to drops to 1, since other threads
+ // may still have a reference to the task.
+ while (internal_task.ref_count.load(std::memory_order_relaxed) != 1)
+ detail::spinlock::spin_pause();
+ std::atomic_thread_fence(std::memory_order_acquire);
+ }
+
+ // Query whether the task has finished executing
+ bool ready() const
+ {
+ if (internal_task.state.load(std::memory_order_relaxed) >= detail::task_state::TASK_COMPLETED) {
+ std::atomic_thread_fence(std::memory_order_acquire);
+ return true;
+ } else
+ return false;
+ }
+
+ // Wait for the task to complete
+ void wait()
+ {
+ internal_task.wait();
+ }
+
+ // Get the result of the task
+ template<typename T = result_type, typename = typename std::enable_if<std::is_void<T>::value>::type>
+ void get()
+ {
+ internal_task.wait_and_throw();
+ }
+ template<typename T = result_type, typename = typename std::enable_if<!std::is_void<T>::value>::type>
+ T get()
+ {
+ internal_task.wait_and_throw();
+ return internal_task.get_result(task<result_type>());
+ }
+};
+
+// Spawn a function asynchronously
+template<typename Sched, typename Func>
+task<typename detail::remove_task<decltype(std::declval<Func>()())>::type> spawn(Sched& sched, Func&& f)
+{
+ // Make sure the function type is callable
+ static_assert(detail::is_callable<Func()>::value, "Invalid function type passed to spawn()");
+
+ // Create task
+ typedef typename detail::void_to_fake_void<typename detail::remove_task<decltype(std::declval<Func>()())>::type>::type internal_result;
+ typedef detail::root_exec_func<internal_result, Func, detail::is_task<decltype(std::declval<Func>()())>::value> exec_func;
+ task<typename detail::remove_task<decltype(std::declval<Func>()())>::type> out;
+ out.internal_task = detail::task_ptr(new detail::task_func<exec_func, internal_result>(exec_func(std::forward<Func>(f))));
+
+ // Avoid an expensive ref-count modification since the task isn't shared yet
+ out.internal_task->add_ref_unlocked();
+ detail::schedule_task(sched, detail::task_ptr(out.internal_task.get()));
+
+ return out;
+}
+template<typename Func>
+auto spawn(Func&& f) -> decltype(async::spawn(LIBASYNC_DEFAULT_SCHEDULER, std::forward<Func>(f)))
+{
+ return async::spawn(LIBASYNC_DEFAULT_SCHEDULER, std::forward<Func>(f));
+}
+
+// Create a completed task containing a value
+template<typename T> task<typename std::decay<T>::type> make_task(T&& value)
+{
+ task<typename std::decay<T>::type> out;
+
+ out.internal_task = detail::task_ptr(new detail::task_result<typename std::decay<T>::type>);
+ detail::get_internal_task(out)->set_result(std::forward<T>(value));
+ out.internal_task->state.store(detail::task_state::TASK_COMPLETED, std::memory_order_relaxed);
+
+ return out;
+}
+inline task<void> make_task()
+{
+ task<void> out;
+
+ out.internal_task = detail::task_ptr(new detail::task_result<detail::fake_void>);
+ out.internal_task->state.store(detail::task_state::TASK_COMPLETED, std::memory_order_relaxed);
+
+ return out;
+}
+
+// Spawn a very limited task which is restricted to the current function and
+// joins on destruction. Because local_task is not movable, the result must
+// be captured in a reference, like this:
+// auto&& x = local_spawn(...);
+template<typename Sched, typename Func>
+#ifdef __GNUC__
+__attribute__((warn_unused_result))
+#endif
+local_task<Func> local_spawn(Sched& sched, Func&& f)
+{
+ // Since local_task is not movable, we construct it in-place and let the
+ // caller extend the lifetime of the returned object using a reference.
+ return {sched, std::forward<Func>(f)};
+}
+template<typename Func>
+#ifdef __GNUC__
+__attribute__((warn_unused_result))
+#endif
+local_task<Func> local_spawn(Func&& f)
+{
+ return {LIBASYNC_DEFAULT_SCHEDULER, std::forward<Func>(f)};
+}
+
+} // namespace async
473 include/async++/task_base.h
@@ -0,0 +1,473 @@
+// Copyright (c) 2013 Amanieu d'Antras
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+#ifndef ASYNCXX_H_
+# error "Do not include this header directly, include <async++.h> instead."
+#endif
+
+namespace async {
+namespace detail {
+
+// Task states
+enum class task_state: unsigned char {
+ TASK_PENDING, // Task has completed yet
+ TASK_LOCKED, // Task is locked (used by event_task to prevent double set)
+ TASK_COMPLETED, // Task has finished execution and a result is available
+ TASK_CANCELED // Task has been canceled and an exception is available
+};
+
+// Continuation vector optimized for single continuations. Only supports a
+// minimal set of operations.
+class continuation_vector {
+public:
+ task_ptr* begin()
+ {
+ if (count > 1)
+ return data.vector_data.get();
+ else
+ return &data.inline_data;
+ }
+ task_ptr* end()
+ {
+ return begin() + count;
+ }
+
+ void push_back(task_ptr t)
+ {
+ // First try to insert the continuation inline
+ if (count == 0) {
+ data.inline_data = std::move(t);
+ count = 1;
+ }
+
+ // Check if we need to go from an inline continuation to a vector
+ else if (count == 1) {
+ std::unique_ptr<task_ptr[]> ptr(new task_ptr[2]);
+ ptr[0] = std::move(data.inline_data);
+ ptr[1] = std::move(t);
+ data.inline_data.~task_ptr();
+ new(&data.vector_data) std::unique_ptr<task_ptr[]>(std::move(ptr));
+ count = 2;
+ }
+
+ // Check if the vector needs to be grown (size is a power of 2)
+ else if ((count & (count - 1)) == 0) {
+ std::unique_ptr<task_ptr[]> ptr(new task_ptr[count * 2]);
+ std::move(data.vector_data.get(), data.vector_data.get() + count, ptr.get());
+ ptr[count++] = std::move(t);
+ data.vector_data = std::move(ptr);
+ }
+ }
+
+ void clear()
+ {
+ if (count > 1) {
+ data.vector_data.~unique_ptr();
+ new(&data.inline_data) task_ptr;
+ } else
+ data.inline_data = nullptr;
+ count = 0;
+ }
+
+ size_t size() const
+ {
+ return count;
+ }
+
+ ~continuation_vector()
+ {
+ if (count > 1)
+ data.vector_data.~unique_ptr();
+ else
+ data.inline_data.~task_ptr();
+ }
+
+private:
+ size_t count{0};
+
+ union data_union {
+ data_union(): inline_data() {}
+
+ // Destruction is handled by the parent class
+ ~data_union() {}
+
+ // Inline continuation used for common case (only one continuation)
+ task_ptr inline_data;
+
+ // Vector of continuations. The capacity is the lowest power of 2
+ // which is >= count.
+ std::unique_ptr<task_ptr[]> vector_data;
+ } data;
+};
+
+// Type-generic base task object
+struct task_base: public ref_count_base<task_base> {
+ // Task state
+ std::atomic<task_state> state{task_state::TASK_PENDING};
+
+ // Whether this task should be run even if the parent was canceled
+ bool always_cont;
+
+ // Vector of continuations and lock protecting it
+ spinlock lock;
+ continuation_vector continuations;
+
+ // Scheduler to be used to schedule this task
+ scheduler* sched;
+
+ // Exception associated with the task if it was canceled
+ std::exception_ptr except;
+
+ // Virtual destructor so result gets destroyed properly
+ virtual ~task_base() {}
+
+ // Execution function called by the task scheduler. A default implementation
+ // is provided for event_task.
+ virtual void execute() {}
+
+ // Run a single continuation
+ void run_continuation(task_ptr&& cont, bool cancel)
+ {
+ // Handle continuations that run even if the parent task is canceled
+ if (!cancel || cont->always_cont) {
+ scheduler& s = *cont->sched;
+ schedule_task(s, std::move(cont));
+ } else
+ cont->cancel(except);
+ }
+
+ // Run all of the task's continuations after it has completed or canceled.
+ // The list of continuations is then emptied.
+ template<bool cancel> void run_continuations()
+ {
+ // Wait for any threads which may be adding a continuation. The lock is
+ // not needed afterwards because any future continuations are run
+ // directly instead of being added to the continuation list.
+ {
+ std::lock_guard<spinlock> locked(lock);
+ }
+
+ // Early exit for common case of zero continuations
+ if (continuations.size() == 0)
+ return;
+
+ for (auto& i: continuations)
+ run_continuation(std::move(i), cancel);
+ continuations.clear();
+ }
+
+ // Add a continuation to this task
+ void add_continuation(task_ptr cont)
+ {
+ // Check for task completion
+ task_state current_state = state.load(std::memory_order_relaxed);
+ if (current_state < task_state::TASK_COMPLETED) {
+ std::lock_guard<spinlock> locked(lock);
+
+ // If the task has not finished yet, add the continuation to it
+ current_state = state.load(std::memory_order_relaxed);
+ if (current_state < task_state::TASK_COMPLETED) {
+ continuations.push_back(cont);
+ return;
+ }
+ }
+
+ // Otherwise run the continuation directly
+ std::atomic_thread_fence(std::memory_order_acquire);
+ run_continuation(std::move(cont), current_state == task_state::TASK_CANCELED);
+ }
+
+ // Cancel the task with an exception. This function is virtual so that
+ // the associated function object can be freed (it is not needed anymore).
+ virtual void cancel(std::exception_ptr cancel_exception)
+ {
+ except = std::move(cancel_exception);
+ state.store(task_state::TASK_CANCELED, std::memory_order_release);
+ run_continuations<true>();
+ }
+
+ // Finish the task after it has been executed and the result set
+ void finish()
+ {
+ state.store(task_state::TASK_COMPLETED, std::memory_order_release);
+ run_continuations<false>();
+ }
+
+ // Wait for the task to finish executing
+ task_state wait()
+ {
+ task_state s = state.load(std::memory_order_relaxed);
+ if (s < task_state::TASK_COMPLETED) {
+ wait_for_task(this);
+ s = state.load(std::memory_order_relaxed);
+ }
+ std::atomic_thread_fence(std::memory_order_acquire);
+ return s;
+ }
+
+ // Wait and throw the exception if the task was canceled
+ void wait_and_throw()
+ {
+ if (wait() == task_state::TASK_CANCELED) {
+ if (except)
+ std::rethrow_exception(except);
+ else
+ throw task_canceled();
+ }
+ }
+};
+
+// Result type-specific task object
+template<typename Result> struct task_result: public task_base {
+ typename std::aligned_storage<sizeof(Result), alignof(Result)>::type result;
+
+ template<typename T> void set_result(T&& t)
+ {
+ new(&result) Result(std::forward<T>(t));
+ }
+
+ // Return a result using an lvalue or rvalue reference depending on the task
+ // type. The task parameter is not used, it is just there for overload resolution.
+ template<typename T> Result&& get_result(const task<T>&)
+ {
+ return std::move(*reinterpret_cast<Result*>(&result));
+ }
+ template<typename T> const Result& get_result(const shared_task<T>&)
+ {
+ return *reinterpret_cast<Result*>(&result);
+ }
+
+ virtual ~task_result()
+ {
+ // Result is only present if the task completed successfully
+ if (state.load(std::memory_order_relaxed) == task_state::TASK_COMPLETED)
+ reinterpret_cast<Result*>(&result)->~Result();
+ }
+};
+
+// Specialization for references
+template<typename Result> struct task_result<Result&>: public task_base {
+ // Store as pointer internally
+ Result* result;
+
+ void set_result(Result& obj)
+ {
+ result = std::addressof(obj);
+ }
+
+ template<typename T> Result& get_result(const task<T>&)
+ {
+ return *result;
+ }
+ template<typename T> Result& get_result(const shared_task<T>&)
+ {
+ return *result;
+ }
+};
+
+// Specialization for void
+template<> struct task_result<fake_void>: public task_base {
+ void set_result(fake_void) {}
+
+ // Get the result as fake_void so that it can be passed to set_result and
+ // continuations
+ template<typename T> fake_void get_result(const task<T>&)
+ {
+ return fake_void();
+ }
+ template<typename T> fake_void get_result(const shared_task<T>&)
+ {
+ return fake_void();
+ }
+};
+
+// Class to hold a function object and initialize/destroy it
+template<typename Func, typename = void> struct func_holder {
+ typename std::aligned_storage<sizeof(Func), alignof(Func)>::type func;
+
+ Func& get_func()
+ {
+ return *reinterpret_cast<Func*>(&func);
+ }
+ void init_func(Func&& f)
+ {
+ new(&func) Func(std::move(f));
+ }
+ void destroy_func()
+ {
+ get_func().~Func();
+ }
+};
+
+// Specialization for empty function objects
+template<typename Func> struct func_holder<Func, typename std::enable_if<std::is_empty<Func>::value>::type> {
+ Func& get_func()
+ {
+ return *reinterpret_cast<Func*>(this);
+ }
+ void init_func(Func&& f)
+ {
+ new(this) Func(std::move(f));
+ }
+ void destroy_func()
+ {
+ get_func().~Func();
+ }
+};
+
+// Task object with an associated function object
+// Using private inheritance so empty Func doesn't take up space
+template<typename Func, typename Result> struct task_func: public task_result<Result>, private func_holder<Func> {
+ explicit task_func(Func&& f)
+ {
+ this->init_func(std::move(f));
+ }
+
+ // Execution function called by the scheduler
+ virtual void execute() override final
+ {
+ try {
+ // Dispatch to execution function
+ this->get_func()(this);
+
+ // If we successfully ran, destroy the function object so that it
+ // can release any references (shared_ptr) it holds. Behaviour is
+ // undefined if the destructor throws.
+ this->destroy_func();
+ } catch (task_canceled) {
+ // Optimize task_canceled by encoding it as a null exception_ptr
+ cancel(nullptr);
+ } catch (...) {
+ cancel(std::current_exception());
+ }
+ }
+
+ // Destroy the function when being canceled
+ virtual void cancel(std::exception_ptr cancel_exception) override final
+ {
+ this->destroy_func();
+ task_base::cancel(std::move(cancel_exception));
+ }
+};
+
+// Helper function to access the internal_task member of a task object, which
+// avoids us having to specify half of the functions in the detail namespace
+// as friend. Also, internal_task is downcast to the appropriate task_result<>.
+template<typename Task> typename Task::internal_task_type* get_internal_task(const Task& t)
+{
+ return static_cast<typename Task::internal_task_type*>(t.internal_task.get());
+}
+
+// Common code for task unwrapping
+template<typename Result, typename Func, typename Child>
+void unwrapped_finish(task_base* parent_base, Child child_task)
+{
+ struct unwrap_func {
+ unwrap_func(task_ptr t): parent_task(std::move(t)) {}
+ void operator()(Child child_task) const
+ {
+ // Forward completion state and result to parent task
+ try {
+ if (get_internal_task(child_task)->state.load(std::memory_order_relaxed) == task_state::TASK_COMPLETED) {
+ static_cast<task_result<Result>*>(this->parent_task.get())->set_result(get_internal_task(child_task)->get_result(child_task));
+ this->parent_task->finish();
+ } else
+ static_cast<task_func<Func, Result>*>(this->parent_task.get())->cancel(get_internal_task(child_task)->except);
+ } catch (...) {
+ // If the copy/move constructor of the result threw, propagate the exception
+ static_cast<task_func<Func, Result>*>(this->parent_task.get())->cancel(std::current_exception());
+ }
+ }
+ task_ptr parent_task;
+ };
+
+ // Save a reference to the parent in the continuation
+ parent_base->add_ref();
+ child_task.then(inline_scheduler(), unwrap_func(task_ptr(parent_base)));
+}
+
+// Execution functions for root tasks:
+// - With and without task unwraping
+template<typename Result, typename Func, bool Unwrap> struct root_exec_func: private std::decay<Func>::type {
+ typedef typename std::decay<Func>::type func_type;
+ root_exec_func(Func&& f): func_type(std::forward<Func>(f)) {}
+ void operator()(task_base* t)
+ {
+ static_cast<task_result<Result>*>(t)->set_result(invoke_fakevoid(std::move(*static_cast<func_type*>(this))));
+ t->finish();
+ }
+};
+template<typename Result, typename Func> struct root_exec_func<Result, Func, true>: private std::decay<Func>::type {
+ typedef typename std::decay<Func>::type func_type;
+ root_exec_func(Func&& f): func_type(std::forward<Func>(f)) {}
+ void operator()(task_base* t)
+ {
+ unwrapped_finish<Result, root_exec_func>(t, std::move(std::move(*static_cast<func_type*>(this)))());
+ }
+};
+
+// Execution functions for continuation tasks:
+// - With and without task unwraping
+// - For value-based and task-based continuations
+template<typename Parent, typename Result, typename Func, bool ValueCont, bool Unwrap> struct continuation_exec_func: private std::decay<Func>::type {
+ typedef typename std::decay<Func>::type func_type;
+ continuation_exec_func(Func&& f, Parent&& p): func_type(std::forward<Func>(f)), parent(std::forward<Parent>(p)) {}
+ void operator()(task_base* t)
+ {
+ static_cast<task_result<Result>*>(t)->set_result(invoke_fakevoid([this]{return invoke_fakevoid_param(std::move(*static_cast<func_type*>(this)), std::move(this->parent));}));
+ t->finish();
+ }
+ typename std::decay<Parent>::type parent;
+};
+template<typename Parent, typename Result, typename Func> struct continuation_exec_func<Parent, Result, Func, true, false>: private std::decay<Func>::type {
+ typedef typename std::decay<Func>::type func_type;
+ continuation_exec_func(Func&& f, Parent&& p): func_type(std::forward<Func>(f)), parent(std::forward<Parent>(p)) {}
+ void operator()(task_base* t)
+ {
+ auto&& result = get_internal_task(parent)->get_result(parent);
+ static_cast<task_result<Result>*>(t)->set_result(invoke_fakevoid([this, &result]{return invoke_fakevoid_param(std::move(*static_cast<func_type*>(this)), std::forward<decltype(result)>(result));}));
+ t->finish();
+ }
+ typename std::decay<Parent>::type parent;
+};
+template<typename Parent, typename Result, typename Func> struct continuation_exec_func<Parent, Result, Func, false, true>: private std::decay<Func>::type {
+ typedef typename std::decay<Func>::type func_type;
+ continuation_exec_func(Func&& f, Parent&& p): func_type(std::forward<Func>(f)), parent(std::forward<Parent>(p)) {}
+ void operator()(task_base* t)
+ {
+ typedef typename detail::void_to_fake_void<typename Parent::result_type>::type internal_result;
+ unwrapped_finish<internal_result, continuation_exec_func>(t, invoke_fakevoid_param(std::move(*static_cast<func_type*>(this)), std::move(parent)));
+ }
+ typename std::decay<Parent>::type parent;
+};
+template<typename Parent, typename Result, typename Func> struct continuation_exec_func<Parent, Result, Func, true, true>: private std::decay<Func>::type {
+ typedef typename std::decay<Func>::type func_type;
+ continuation_exec_func(Func&& f, Parent&& p): func_type(std::forward<Func>(f)), parent(std::forward<Parent>(p)) {}
+ void operator()(task_base* t)
+ {
+ auto&& result = get_internal_task(parent)->get_result(parent);
+ typedef typename detail::void_to_fake_void<typename Parent::result_type>::type internal_result;
+ unwrapped_finish<internal_result, continuation_exec_func>(t, invoke_fakevoid_param(std::move(*static_cast<func_type*>(this)), std::forward<decltype(result)>(result)));
+ }
+ typename std::decay<Parent>::type parent;
+};
+
+} // namespace detail
+} // namespace async
106 include/async++/traits.h
@@ -0,0 +1,106 @@
+// Copyright (c) 2013 Amanieu d'Antras
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+#ifndef ASYNCXX_H_
+# error "Do not include this header directly, include <async++.h> instead."
+#endif
+
+namespace async {
+namespace detail {
+
+// Pseudo-void type: it takes up no space but can be moved and copied
+struct fake_void {};
+template<typename T> using void_to_fake_void = std::conditional<std::is_void<T>::value, fake_void, T>;
+
+// Check if type is a task type, used to detect task unwraping
+template<typename T> struct is_task: public std::false_type {};
+template<typename T> struct is_task<task<T>>: public std::true_type {};
+template<typename T> struct is_task<const task<T>>: public std::true_type {};
+template<typename T> struct is_task<shared_task<T>>: public std::true_type {};
+template<typename T> struct is_task<const shared_task<T>>: public std::true_type {};
+
+// Extract the result type of a task if T is a task, otherwise just return T
+template<typename T> struct remove_task {
+ typedef T type;
+};
+template<typename T> struct remove_task<task<T>> {
+ typedef T type;
+};
+template<typename T> struct remove_task<const task<T>> {
+ typedef T type;
+};
+template<typename T> struct remove_task<shared_task<T>> {
+ typedef T type;
+};
+template<typename T> struct remove_task<const shared_task<T>> {
+ typedef T type;
+};
+
+// Check if a type is callable with the given arguments
+template<typename Func, typename... Args, typename = decltype(std::declval<Func>()(std::declval<Args>()...))> std::true_type is_callable_helper(int);
+template<typename Func, typename... Args> std::false_type is_callable_helper(...);
+template<typename T> struct is_callable;
+template<typename Func, typename... Args> struct is_callable<Func(Args...)>: public decltype(is_callable_helper<Func, Args...>(0)) {};
+
+// Wrapper to run a function object and transform void returns into fake_void
+template<typename Func, typename = typename std::enable_if<!std::is_void<decltype(std::declval<Func>()())>::value>::type>
+decltype(std::declval<Func>()()) invoke_fakevoid(Func&& f)
+{
+ return std::forward<Func>(f)();
+}
+template<typename Func, typename = typename std::enable_if<std::is_void<decltype(std::declval<Func>()())>::value>::type>
+fake_void invoke_fakevoid(Func&& f)
+{
+ std::forward<Func>(f)();
+ return fake_void();
+}
+
+// Wrapper to run a continuation function with an optional parameter
+template<typename Func, typename Param> auto invoke_fakevoid_param(Func&& f, Param&& p) -> decltype(std::forward<Func>(f)(std::forward<Param>(p)))
+{
+ return std::forward<Func>(f)(std::forward<Param>(p));
+}
+template<typename Func> auto invoke_fakevoid_param(Func&& f, fake_void) -> decltype(std::forward<Func>(f)())
+{
+ return std::forward<Func>(f)();
+}
+
+// Various properties of a continuation function
+template<typename Func, typename Parent, typename = decltype(std::declval<Func>()(std::declval<Parent>().get()))>
+std::true_type is_value_cont_helper(Func&&, const Parent&, int, int);
+template<typename Func, typename = decltype(std::declval<Func>()())>
+std::true_type is_value_cont_helper(Func&&, const task<void>&, int, int);
+template<typename Func, typename = decltype(std::declval<Func>()())>
+std::true_type is_value_cont_helper(Func&&, const shared_task<void>&, int, int);
+template<typename Func, typename Parent, typename = decltype(std::declval<Func>()(std::declval<Parent>()))>
+std::false_type is_value_cont_helper(Func&&, const Parent&, int, ...);
+template<typename Func, typename Parent>
+void is_value_cont_helper(Func&&, const Parent&, ...);
+template<typename Parent, typename Func>
+struct continuation_traits {
+ typedef decltype(is_value_cont_helper(std::declval<Func>(), std::declval<Parent>(), 0, 0)) is_value_cont;
+ static_assert(!std::is_void<is_value_cont>::value, "Parameter type for continuation function is invalid for parent task type");
+ typedef typename std::conditional<is_value_cont::value, typename void_to_fake_void<decltype(std::declval<Parent>().get())>::type, Parent>::type param_type;
+ typedef decltype(invoke_fakevoid_param(std::declval<Func>(), std::declval<param_type>())) result_type;
+ typedef task<typename remove_task<result_type>::type> task_type;
+};
+
+} // namespace detail
+} // namespace async
297 include/async++/when_all_any.h
@@ -0,0 +1,297 @@
+// Copyright (c) 2013 Amanieu d'Antras
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+#ifndef ASYNCXX_H_
+# error "Do not include this header directly, include <async++.h> instead."
+#endif
+
+namespace async {
+namespace detail {
+
+// when_all shared state for ranges
+template<typename T> struct when_all_state_range: public ref_count_base<when_all_state_range<T>> {
+ typedef std::vector<T> task_type;
+ task_type results;
+ event_task<task_type> event;
+
+ when_all_state_range(int count)
+ : ref_count_base<when_all_state_range<T>>(count), results(count) {}
+
+ // When all references are dropped, signal the event
+ ~when_all_state_range()
+ {
+ event.set(std::move(results));
+ }
+
+ template<typename U> void set(int i, U&& u)
+ {
+ results[i] = std::forward<U>(u);
+ }
+
+ static task<task_type> empty_range()
+ {
+ return async::make_task(task_type());
+ }
+};
+template<> struct when_all_state_range<void>: public ref_count_base<when_all_state_range<void>> {
+ typedef void task_type;
+ event_task<void> event;
+
+ when_all_state_range(int count)
+ : ref_count_base(count) {}
+
+ // When all references are dropped, signal the event
+ ~when_all_state_range()
+ {
+ event.set();
+ }
+
+ void set(int, fake_void) {}
+
+ static task<task_type> empty_range()
+ {
+ return make_task();
+ }
+};
+
+// when_all shared state for varidic arguments
+template<typename Tuple> struct when_all_state_variadic: public ref_count_base<when_all_state_variadic<Tuple>> {
+ Tuple results;
+ event_task<Tuple> event;
+
+ when_all_state_variadic()
+ : ref_count_base<when_all_state_variadic<Tuple>>(std::tuple_size<Tuple>::value) {}
+
+ // When all references are dropped, signal the event
+ ~when_all_state_variadic()
+ {
+ event.set(std::move(results));
+ }
+};
+
+// when_any shared state
+template<typename T> struct when_any_state: public ref_count_base<when_any_state<T>> {
+ typedef std::pair<size_t, T> task_type;
+ event_task<task_type> event;
+
+ when_any_state(int count)
+ : ref_count_base<when_any_state<T>>(count) {}
+
+ template<typename U> void set(size_t i, U&& u)
+ {
+ event.set(std::make_pair(i, std::forward<U>(u)));
+ }
+};
+template<> struct when_any_state<void>: public ref_count_base<when_any_state<void>> {
+ typedef size_t task_type;
+ event_task<task_type> event;
+
+ when_any_state(int count)
+ : ref_count_base(count) {}
+
+ void set(size_t i, fake_void)
+ {
+ event.set(i);
+ }
+};
+
+// Internal implementation of when_all for variadic arguments
+template<int index, typename State> void when_all_variadic(when_all_state_variadic<State>*) {}
+template<int index, typename State, typename First, typename... T> void when_all_variadic(when_all_state_variadic<State>* state_ptr, First&& first, T&&... tasks)
+{
+ // Add a continuation to the task
+ try {
+ first.then(inline_scheduler(), [state_ptr](typename std::decay<First>::type t) {
+ detail::ref_count_ptr<when_all_state_variadic<State>> state(state_ptr);
+ try {
+ if (detail::get_internal_task(t)->state.load(std::memory_order_relaxed) == detail::task_state::TASK_COMPLETED)
+ std::get<index>(state->results) = detail::get_internal_task(t)->get_result(t);
+ else
+ state->event.set_exception(detail::get_internal_task(t)->except);
+ } catch (...) {
+ // If the assignment of the result threw, propagate the exception
+ state->event.set_exception(std::current_exception());
+ }
+ });
+ } catch (...) {
+ // Make sure we don't leak memory if then() throws
+ state_ptr->release(sizeof...(T) + 1);
+ throw;
+ }
+
+ // Add continuations to rest of tasks
+ detail::when_all_variadic<index + 1>(state_ptr, std::forward<T>(tasks)...);
+}
+
+// Internal implementation of when_any for variadic arguments
+template<int index, typename State> void when_any_variadic(when_any_state<State>*) {}
+template<int index, typename State, typename First, typename... T> void when_any_variadic(when_any_state<State>* state_ptr, First&& first, T&&... tasks)
+{
+ // Add a continuation to the task
+ try {
+ first.then(inline_scheduler(), [state_ptr](typename std::decay<First>::type t) {
+ detail::ref_count_ptr<when_any_state<State>> state(state_ptr);
+ try {
+ if (detail::get_internal_task(t)->state.load(std::memory_order_relaxed) == detail::task_state::TASK_COMPLETED)
+ state->set(index, detail::get_internal_task(t)->get_result(t));
+ else
+ state->event.set_exception(detail::get_internal_task(t)->except);
+ } catch (...) {
+ // If the copy/move constructor of the result threw, propagate the exception
+ state->event.set_exception(std::current_exception());
+ }
+ });
+ } catch (...) {
+ // Make sure we don't leak memory if then() throws
+ state_ptr->release(sizeof...(T) + 1);
+ throw;
+ }
+
+ // Add continuations to rest of tasks
+ detail::when_any_variadic<index + 1>(state_ptr, std::forward<T>(tasks)...);
+}
+
+} // namespace detail
+
+// Alias for fake_void, used in variadic when_all
+typedef detail::fake_void void_;
+
+// Combine a set of tasks into one task which is signaled when all specified tasks finish
+template<typename Iter> task<typename detail::when_all_state_range<typename std::iterator_traits<Iter>::value_type::result_type>::task_type> when_all(Iter begin, Iter end)
+{
+ typedef typename std::iterator_traits<Iter>::value_type task_type;
+ typedef typename task_type::result_type result_type;
+
+ // Handle empty range
+ if (begin == end)
+ return detail::when_all_state_range<result_type>::empty_range();
+
+ // Create shared state
+ auto state_ptr = new detail::when_all_state_range<result_type>(std::distance(begin, end));
+ auto out = state_ptr->event.get_task();
+
+ // Add a continuation to each task to add its result to the shared state
+ // Last task sets the event result
+ for (size_t i = 0; begin != end; i++, ++begin) {
+ try {
+ (*begin).then(inline_scheduler(), [state_ptr, i](task_type t) {
+ detail::ref_count_ptr<detail::when_all_state_range<result_type>> state(state_ptr);
+ try {
+ if (detail::get_internal_task(t)->state.load(std::memory_order_relaxed) == detail::task_state::TASK_COMPLETED)
+ state->set(i, detail::get_internal_task(t)->get_result(t));
+ else
+ state->event.set_exception(detail::get_internal_task(t)->except);
+ } catch (...) {
+ // If the assignment of the result threw, propagate the exception
+ state->event.set_exception(std::current_exception());
+ }
+ });
+ } catch (...) {
+ // Make sure we don't leak memory if then() throws
+ state_ptr->release(std::distance(begin, end));
+ throw;
+ }
+ }
+
+ return out;
+}
+
+// Combine a set of tasks into one task which is signaled when one of the tasks finishes
+template<typename Iter> task<typename detail::when_any_state<typename std::iterator_traits<Iter>::value_type::result_type>::task_type> when_any(Iter begin, Iter end)
+{
+ typedef typename std::iterator_traits<Iter>::value_type task_type;
+ typedef typename task_type::result_type result_type;
+
+ // Handle empty range
+ if (begin == end)
+ throw std::invalid_argument("when_any called with empty range");
+
+ // Create shared state
+ auto* state_ptr = new detail::when_any_state<result_type>(std::distance(begin, end));
+ auto out = state_ptr->event.get_task();
+
+ // Add a continuation to each task to set the event. First one wins.
+ for (size_t i = 0; begin != end; i++, ++begin) {
+ try {
+ (*begin).then(inline_scheduler(), [state_ptr, i](task_type t) {
+ detail::ref_count_ptr<detail::when_any_state<result_type>> state(state_ptr);
+ try {
+ if (detail::get_internal_task(t)->state.load(std::memory_order_relaxed) == detail::task_state::TASK_COMPLETED)
+ state->set(i, detail::get_internal_task(t)->get_result(t));
+ else
+ state->event.set_exception(detail::get_internal_task(t)->except);
+ } catch (...) {
+ // If the copy/move constructor of the result threw, propagate the exception
+ state->event.set_exception(std::current_exception());
+ }
+ });
+ } catch (...) {
+ // Make sure we don't leak memory if then() throws
+ state_ptr->release(std::distance(begin, end));
+ throw;
+ }
+ }
+
+ return out;
+}
+
+// when_all wrapper accepting ranges
+template<typename T> auto when_all(T&& tasks) -> decltype(async::when_all(std::begin(std::forward<T>(tasks)), std::end(std::forward<T>(tasks))))
+{
+ return async::when_all(std::begin(std::forward<T>(tasks)), std::end(std::forward<T>(tasks)));
+}
+
+// when_any wrapper accepting ranges
+template<typename T> auto when_any(T&& tasks) -> decltype(async::when_any(std::begin(std::forward<T>(tasks)), std::end(std::forward<T>(tasks))))
+{
+ return async::when_any(std::begin(std::forward<T>(tasks)), std::end(std::forward<T>(tasks)));
+}
+
+// when_all with variadic arguments
+template<typename First, typename... T> task<std::tuple<typename detail::void_to_fake_void<typename std::decay<First>::type::result_type>::type, typename detail::void_to_fake_void<typename std::decay<T>::type::result_type>::type...>> when_all(First&& first, T&&... tasks)
+{
+ typedef std::tuple<typename detail::void_to_fake_void<typename std::decay<First>::type::result_type>::type, typename detail::void_to_fake_void<typename std::decay<T>::type::result_type>::type...> result_type;
+
+ // Create shared state
+ auto state = new detail::when_all_state_variadic<result_type>;
+ auto out = state->event.get_task();
+
+ // Add continuations to the tasks
+ detail::when_all_variadic<0>(state, std::forward<First>(first), std::forward<T>(tasks)...);
+
+ return out;
+}
+
+// when_any with variadic arguments
+template<typename First, typename... T> task<typename detail::when_any_state<typename std::decay<First>::type::result_type>::task_type> when_any(First&& first, T&&... tasks)
+{
+ typedef typename std::decay<First>::type::result_type result_type;
+
+ // Create shared state
+ auto state = new detail::when_any_state<result_type>(sizeof...(tasks) + 1);
+ auto out = state->event.get_task();
+
+ // Add continuations to the tasks
+ detail::when_any_variadic<0>(state, std::forward<First>(first), std::forward<T>(tasks)...);
+
+ return out;
+}
+
+} // namespace async
62 src/aligned_alloc.h
@@ -0,0 +1,62 @@
+// Copyright (c) 2013 Amanieu d'Antras
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+#ifdef _WIN32
+# include <malloc.h>
+# ifdef __MINGW32__
+# define _aligned_malloc __mingw_aligned_malloc
+# define _aligned_free __mingw_aligned_free
+# endif
+#else
+# include <stdlib.h>
+#endif
+
+namespace async {
+namespace detail {
+
+// Allocate an aligned block of memory
+inline void* aligned_alloc(size_t size, size_t align)
+{
+#ifdef _WIN32
+ void* ptr = _aligned_malloc(size, align);
+ if (!ptr)
+ throw std::bad_alloc();
+ return ptr;
+#else
+ void* result;
+ if (posix_memalign(&result, align, size))
+ throw std::bad_alloc();
+ else
+ return result;
+#endif
+}
+
+// Free an aligned block of memory
+inline void aligned_free(void* addr)
+{
+#ifdef _WIN32
+ _aligned_free(addr);
+#else
+ free(addr);
+#endif
+}
+
+} // namespace detail
+} // namespace async
116 src/auto_reset_event.h
@@ -0,0 +1,116 @@
+// Copyright (c) 2013 Amanieu d'Antras
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+#ifdef __linux__
+# include <unistd.h>
+# include <sys/syscall.h>
+# include <linux/futex.h>
+#else
+# include <mutex>
+# include <condition_variable>
+#endif
+
+namespace async {
+namespace detail {
+
+// Windows-style auto-reset event, essentially a semaphore with a max
+// value of 1.
+// This implementation makes one assumption:
+// - Only the thread owning the event calls reset() and wait()
+// - Therefore there can only be at most one thread waiting on an event
+#ifdef __linux__
+// Linux-specific implementation using futex
+class auto_reset_event {
+public:
+ void wait()
+ {
+ // If futex_val goes below 0, sleep
+ if (futex_val.fetch_sub(1, std::memory_order_relaxed) <= 0) {
+ int ret;
+ do {
+ // Possible results:
+ // - Success => we were woken up, so return
+ // - EWOULDBLOCK => futex_val is not -1 anymore, so return
+ // - EINTR => spurious wakeup, try again
+ ret = syscall(SYS_futex, reinterpret_cast<int*>(&futex_val), FUTEX_WAIT_PRIVATE, -1, NULL);
+ } while (ret == -1 && errno == EINTR);
+ } else
+ std::atomic_thread_fence(std::memory_order_acquire);
+ }
+
+ void reset()
+ {
+ futex_val.store(0, std::memory_order_relaxed);
+ }
+
+ void signal()
+ {
+ // Increment futex_val, but don't go above 1
+ int val = futex_val.load(std::memory_order_relaxed);
+ do {
+ if (val > 0)
+ return;
+ } while (!futex_val.compare_exchange_weak(val, val + 1, std::memory_order_release, std::memory_order_relaxed));
+
+ // Wake up a sleeping thread if futex_val was negative
+ if (val < 0)
+ syscall(SYS_futex, reinterpret_cast<int*>(&futex_val), FUTEX_WAKE_PRIVATE, 1);
+ }
+
+private:
+ // Valid values:
+ // 1 = set
+ // 0 = not set
+ // -1 = not set and sleeping thread
+ std::atomic<int> futex_val{0};
+};
+#else
+// Generic implementation using std::mutex and std::condition_variable
+class auto_reset_event {
+public:
+ void wait()
+ {
+ std::unique_lock<std::mutex> lock(m);
+ while (!signaled)
+ c.wait(lock);
+ signaled = false;
+ }
+
+ void reset()
+ {
+ signaled = false;
+ }
+
+ void signal()
+ {
+ std::lock_guard<std::mutex> lock(m);
+ signaled = true;
+ c.notify_one();
+ }
+
+private:
+ std::mutex m;
+ std::condition_variable c;
+ bool signaled{false};
+};
+#endif
+
+} // namespace detail
+} // namespace async
72 src/fifo_queue.h
@@ -0,0 +1,72 @@
+// Copyright (c) 2013 Amanieu d'Antras
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal