Skip to content

Commit

Permalink
Avoid the need for CMPXCHG16B by using a tagged pointer instead
Browse files Browse the repository at this point in the history
  • Loading branch information
Amanieu committed Feb 7, 2015
1 parent 2e1af00 commit 9e0e4d5
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 142 deletions.
15 changes: 2 additions & 13 deletions CMakeLists.txt
Expand Up @@ -23,7 +23,6 @@ project(Async++ C CXX)

option(BUILD_SHARED_LIBS "Build Async++ as a shared library" ON)
option(USE_CXX_EXCEPTIONS "Enable C++ exception support" ON)
option(USE_AMD64_CMPXCHG16B "Use the CMPXCHG16B instruction on x86_64" ON)
if (APPLE)
option(BUILD_FRAMEWORK "Build a Mac OS X framework instead of a library" OFF)
if (BUILD_FRAMEWORK AND NOT BUILD_SHARED_LIBS)
Expand All @@ -50,7 +49,7 @@ set(ASYNCXX_INCLUDE
${PROJECT_SOURCE_DIR}/include/async++/when_all_any.h
)
set(ASYNCXX_SRC
${PROJECT_SOURCE_DIR}/src/common.h
${PROJECT_SOURCE_DIR}/src/internal.h
${PROJECT_SOURCE_DIR}/src/fifo_queue.h
${PROJECT_SOURCE_DIR}/src/scheduler.cpp
${PROJECT_SOURCE_DIR}/src/singleton.h
Expand All @@ -74,7 +73,7 @@ if (APPLE)
endif()
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
target_link_libraries(Async++ PUBLIC ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(Async++ PUBLIC Threads::Threads)

# Set up preprocessor definitions
target_include_directories(Async++ PRIVATE ${PROJECT_SOURCE_DIR}/include)
Expand Down Expand Up @@ -107,16 +106,6 @@ if (NOT USE_CXX_EXCEPTIONS)
endif()
endif()

# Allow disabling the CMPXCHG16B instruction which is not available on some
# early AMD64 processors. This will result in 128-bit atomics being implemented
# using the default implementation in std::atomic, which uses a lock.
if (NOT USE_AMD64_CMPXCHG16B)
message(WARNING "The CMPXCHG16B instruction has been disabled. Make sure "
"to define LIBASYNC_NO_CMPXCHG16B in all files that include "
"async++.h because this affects the library ABI.")
target_compile_definitions(Async++ PUBLIC LIBASYNC_NO_CMPXCHG16B)
endif()

# Install the library and produce a CMake export script
install(TARGETS Async++
EXPORT Async++
Expand Down
17 changes: 10 additions & 7 deletions README.md
Expand Up @@ -21,15 +21,18 @@ int main()
return 42;
});
auto task3 = task2.then([](int value) -> int {
std::cout << "Task 3 executes after task 2, which returned " << value << std::endl;
std::cout << "Task 3 executes after task 2, which returned "
<< value << std::endl;
return value * 3;
});
auto task4 = async::when_all(task1, task3).then([](std::tuple<async::void_, int> results) {
std::cout << "Task 4 executes after tasks 1 and 3. Task 3 returned " << std::get<1>(results) << std::endl;
auto task4 = async::when_all(task1, task3);
auto task5 = task4.then([](std::tuple<async::void_, int> results) {
std::cout << "Task 5 executes after tasks 1 and 3. Task 3 returned "
<< std::get<1>(results) << std::endl;
});

task4.get();
std::cout << "Task 4 has completed" << std::endl;
task5.get();
std::cout << "Task 5 has completed" << std::endl;

async::parallel_invoke([] {
std::cout << "This is executed in parallel..." << std::endl;
Expand All @@ -52,8 +55,8 @@ int main()
// Task 1 executes asynchronously
// Task 2 executes in parallel with task 1
// Task 3 executes after task 2, which returned 42
// Task 4 executes after tasks 1 and 3. Task 3 returned 126
// Task 4 has completed
// Task 5 executes after tasks 1 and 3. Task 3 returned 126
// Task 5 has completed
// This is executed in parallel...
// with this
// 01234
Expand Down
232 changes: 110 additions & 122 deletions include/async++/continuation_vector.h
Expand Up @@ -25,6 +25,66 @@
namespace async {
namespace detail {

// Compress the flags in the low bits of the pointer if the structures are
// suitably aligned. Fall back to a separate flags variable otherwise.
template<std::uintptr_t Mask, bool Enable = false>
class compressed_ptr {
void* ptr;
std::uintptr_t flags;

public:
compressed_ptr() = default;
compressed_ptr(void* ptr, std::uintptr_t flags)
: ptr(ptr), flags(flags) {}

template<typename T>
T* get_ptr() const
{
return static_cast<T*>(ptr);
}
std::uintptr_t get_flags() const
{
return flags;
}

void set_ptr(void* p)
{
ptr = p;
}
void set_flags(std::uintptr_t f)
{
flags = f;
}
};
template<std::uintptr_t Mask>
class compressed_ptr<Mask, true> {
std::uintptr_t data;

public:
compressed_ptr() = default;
compressed_ptr(void* ptr, std::uintptr_t flags)
: data(reinterpret_cast<std::uintptr_t>(ptr) | flags) {}

template<typename T>
T* get_ptr() const
{
return reinterpret_cast<T*>(data & ~Mask);
}
std::uintptr_t get_flags() const
{
return data & Mask;
}

void set_ptr(void* p)
{
data = reinterpret_cast<std::uintptr_t>(p) | (data & Mask);
}
void set_flags(std::uintptr_t f)
{
data = (data & ~Mask) | f;
}
};

// Thread-safe vector of task_ptr which is optimized for the common case of
// only having a single continuation.
class continuation_vector {
Expand All @@ -34,117 +94,46 @@ class continuation_vector {
std::mutex lock;
};

// Internal data of the vector
struct internal_data {
// If this is true then no more changes are allowed
bool is_locked;
// Flags to describe the state of the vector
enum flags {
// If set, no more changes are allowed to internal_data
is_locked = 1,

// Indicates which element of the union is currently active
bool is_vector;

union {
// Fast path: This represents zero (nullptr) or one elements
task_base* inline_ptr;

// Slow path: This is used for two or more elements
vector_data* vector_ptr;
};
// If set, the pointer is a vector_data* instead of a task_base*. If
// there are 0 or 1 elements in the vector, the task_base* form is used.
is_vector = 2
};
static const std::uintptr_t flags_mask = 3;

// On x86_64, some compilers will not use CMPXCHG16B because some early AMD
// processors do not implement that instruction. Since these are quite rare,
// we force the use of CMPXCHG16B unless the user has explicitly asked
// otherwise using LIBASYNC_NO_CMPXCHG16B.
#if !defined(LIBASYNC_NO_CMPXCHG16B) && \
((defined(__GNUC__) && defined(__x86_64__) && !defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_16)) || \
(defined(_MSC_VER) && defined(_M_AMD64)))
class atomic_data_type {
// Internal storage for the atomic data
# ifdef __GNUC__
__uint128_t storage;
# else
# pragma intrinsic(_InterlockedCompareExchange128)
std::int64_t storage[2];
# endif
static_assert(sizeof(internal_data) == 16, "Wrong size for internal_data");

public:
// These functions use memcpy to convert between internal_data and
// integer types to avoid any potential issues with strict aliasing.
// Tests have shown that compilers are smart enough to optimize the
// memcpy calls away and produce optimal code.
atomic_data_type(internal_data data)
{
std::memcpy(&storage, &data, sizeof(internal_data));
}
internal_data load(std::memory_order)
{
# ifdef __GNUC__
std::int64_t value[2];
__asm__ __volatile__ (
"movq %%rbx, %%rax\n\t"
"movq %%rcx, %%rdx\n\t"
"lock; cmpxchg16b %[storage]"
: "=&a" (value[0]), "=&d" (value[1])
: [storage] "m" (storage)
: "cc", "memory", "rbx", "rcx"
);
# else
std::int64_t value[2] = {};
_InterlockedCompareExchange128(storage, value[0], value[1], value);
# endif
internal_data result;
std::memcpy(&result, value, sizeof(internal_data));
return result;
}
bool compare_exchange_weak(internal_data& expected, internal_data desired, std::memory_order, std::memory_order)
{
std::int64_t desired_value[2];
std::memcpy(desired_value, &desired, sizeof(internal_data));
std::int64_t expected_value[2];
std::memcpy(expected_value, &expected, sizeof(internal_data));
bool success;
# ifdef __GNUC__
__asm__ __volatile__ (
"lock; cmpxchg16b %[storage]\n\t"
"sete %[success]"
: "+a,a" (expected_value[0]), "+d,d" (expected_value[1]), [storage] "+m,m" (storage), [success] "=q,m" (success)
: "b,b" (desired_value[0]), "c,c" (desired_value[1])
: "cc", "memory"
);
# else
success = _InterlockedCompareExchange128(storage, desired_value[0], desired_value[1], expected_value) != 0;
# endif
std::memcpy(&expected, expected_value, sizeof(internal_data));
return success;
}
};
#else
typedef std::atomic<internal_data> atomic_data_type;
#endif
// Embed the two bits in the data if they are suitably aligned. We only
// check the alignment of vector_data here because task_base isn't defined
// yet. Since we align task_base to LIBASYNC_CACHELINE_SIZE just use that.
typedef compressed_ptr<flags_mask, (LIBASYNC_CACHELINE_SIZE & flags_mask) == 0 &&
(std::alignment_of<vector_data>::value & flags_mask) == 0> internal_data;

// All changes to the internal data are atomic
atomic_data_type atomic_data;
std::atomic<internal_data> atomic_data;

public:
// Start unlocked with zero elements in the fast path
continuation_vector()
: atomic_data(internal_data{false, false, {}}) {}
: atomic_data(internal_data(nullptr, 0)) {}

// Free any left over data
~continuation_vector()
{
// Converting task_ptr instead of using remove_ref because task_base
// Converting to task_ptr instead of using remove_ref because task_base
// isn't defined yet at this point.
internal_data data = atomic_data.load(std::memory_order_relaxed);
if (!data.is_vector) {
// If the data is locked then the inline pointer is already gone
if (!data.is_locked)
task_ptr tmp(data.inline_ptr);
} else {
for (task_base* i: data.vector_ptr->vector)
if (data.get_flags() & flags::is_vector) {
// No need to lock the mutex, we are the only thread at this point
for (task_base* i: data.get_ptr<vector_data>()->vector)
task_ptr tmp(i);
delete data.vector_ptr;
delete data.get_ptr<vector_data>();
} else {
// If the data is locked then the inline pointer is already gone
if (!(data.get_flags() & flags::is_locked))
task_ptr tmp(data.get_ptr<task_base>());
}
}

Expand All @@ -159,37 +148,36 @@ class continuation_vector {
// Compare-exchange loop on atomic_data
internal_data data = atomic_data.load(std::memory_order_relaxed);
internal_data new_data;
new_data.is_locked = false;
do {
// Return immediately if the vector is locked
if (data.is_locked)
if (data.get_flags() & flags::is_locked)
return false;

if (!data.is_vector) {
if (!data.inline_ptr) {
// Going from 0 to 1 elements
new_data.inline_ptr = t.get();
new_data.is_vector = false;
} else {
// Going from 1 to 2 elements, allocate a vector_data
if (!vector)
vector.reset(new vector_data{{data.inline_ptr, t.get()}, {}});
new_data.vector_ptr = vector.get();
new_data.is_vector = true;
}
} else {
if (data.get_flags() & flags::is_vector) {
// Larger vectors use a mutex, so grab the lock
std::atomic_thread_fence(std::memory_order_acquire);
std::lock_guard<std::mutex> locked(data.vector_ptr->lock);
std::lock_guard<std::mutex> locked(data.get_ptr<vector_data>()->lock);

// We need to check again if the vector has been locked here
// to avoid a race condition with flush_and_lock
if (atomic_data.load(std::memory_order_relaxed).is_locked)
if (atomic_data.load(std::memory_order_relaxed).get_flags() & flags::is_locked)
return false;

// Add the element to the vector and return
data.vector_ptr->vector.push_back(t.release());
data.get_ptr<vector_data>()->vector.push_back(t.release());
return true;
} else {
if (data.get_ptr<task_base>()) {
// Going from 1 to 2 elements, allocate a vector_data
if (!vector)
vector.reset(new vector_data{{data.get_ptr<task_base>(), t.get()}, {}});
new_data.set_ptr(vector.get());
new_data.set_flags(flags::is_vector);
} else {
// Going from 0 to 1 elements
new_data.set_ptr(t.get());
new_data.set_flags(0);
}
}
} while (!atomic_data.compare_exchange_weak(data, new_data, std::memory_order_release, std::memory_order_relaxed));

Expand All @@ -209,24 +197,24 @@ class continuation_vector {
internal_data new_data;
do {
new_data = data;
new_data.is_locked = true;
new_data.set_flags(data.get_flags() | flags::is_locked);
} while (!atomic_data.compare_exchange_weak(data, new_data, std::memory_order_acquire, std::memory_order_relaxed));

if (!data.is_vector) {
// If there is an inline element, just pass it on
if (data.inline_ptr)
func(task_ptr(data.inline_ptr));
} else {
if (data.get_flags() & flags::is_vector) {
// If we are using vector_data, lock it and flush all elements
std::lock_guard<std::mutex> locked(data.vector_ptr->lock);
for (auto i: data.vector_ptr->vector)
std::lock_guard<std::mutex> locked(data.get_ptr<vector_data>()->lock);
for (auto i: data.get_ptr<vector_data>()->vector)
func(task_ptr(i));

// Clear the vector to save memory. Note that we don't actually free
// the vector_data here because other threads may still be using it.
// This isn't a very significant cost since multiple continuations
// are relatively rare.
data.vector_ptr->vector.clear();
data.get_ptr<vector_data>()->vector.clear();
} else {
// If there is an inline element, just pass it on
if (data.get_ptr<task_base>())
func(task_ptr(data.get_ptr<task_base>()));
}
}
};
Expand Down

0 comments on commit 9e0e4d5

Please sign in to comment.