Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions async/bounded_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ namespace async {

struct bounded_traits {
static constexpr bool NOEXCEPT_CHECK = false; // exception handling flag
static constexpr size_t CachelineSize = 64;
using sequence_type = uint64_t;
static constexpr std::size_t CachelineSize = 64;
static constexpr std::size_t CachelineAlignment = 16; // must not be larger than alignof(std::max_align_t), see issue #1
using sequence_type = std::uint64_t;
};

template <typename T, typename TRAITS = bounded_traits> class bounded_queue {
Expand All @@ -26,9 +27,10 @@ template <typename T, typename TRAITS = bounded_traits> class bounded_queue {
"T must be nothrow destructible");

public:
static constexpr size_t cacheline_size = TRAITS::CachelineSize;
static constexpr std::size_t cacheline_size = TRAITS::CachelineSize;
static constexpr std::size_t cacheline_alignment = TRAITS::CachelineAlignment;
using seq_t = typename TRAITS::sequence_type;
explicit bounded_queue(size_t size)
explicit bounded_queue(std::size_t size)
: fastmodulo((size > 0 && ((size & (size - 1)) == 0))),
bitshift(fastmodulo ? getShiftBitsCount(size) : 0),
elements(new element[size]), mask(fastmodulo ? size - 1 : 0),
Expand All @@ -40,7 +42,7 @@ template <typename T, typename TRAITS = bounded_traits> class bounded_queue {
bounded_queue &operator=(bounded_queue const &) = delete;
bounded_queue &operator=(bounded_queue &&) = delete;
~bounded_queue() { delete[] elements; }
size_t size() { return qsize; }
std::size_t size() { return qsize; }

template <typename... Args, // NON-SAFE
typename = typename std::enable_if<
Expand Down Expand Up @@ -329,12 +331,12 @@ template <typename T, typename TRAITS = bounded_traits> class bounded_queue {
bool const fastmodulo; // true if qsize is power of 2
int const bitshift; // used if fastmodulo is true
element *const elements; // pointer to buffer
size_t const mask; // used if fastmodulo is true
size_t const qsize; // queue size
alignas(cacheline_size) char cacheline_padding1[cacheline_size];
alignas(cacheline_size) std::atomic<seq_t> enqueueIx;
alignas(cacheline_size) char cacheline_padding2[cacheline_size];
alignas(cacheline_size) std::atomic<seq_t> dequeueIx;
alignas(cacheline_size) char cacheline_padding3[cacheline_size];
std::size_t const mask; // used if fastmodulo is true
std::size_t const qsize; // queue size
alignas(cacheline_alignment) char cacheline_padding1[cacheline_size];
alignas(cacheline_alignment) std::atomic<seq_t> enqueueIx;
alignas(cacheline_alignment) char cacheline_padding2[cacheline_size];
alignas(cacheline_alignment) std::atomic<seq_t> dequeueIx;
alignas(cacheline_alignment) char cacheline_padding3[cacheline_size];
};
} // namespace async
94 changes: 48 additions & 46 deletions async/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,36 @@ namespace async {
struct traits // 3-level (L3, L2, L1) depth of nested group design, total
// indexing space is pow(2, 64-Tagbits)
{ // user can change the bits settings by providing your own TRAITS
static constexpr uint64_t Tagbits = 24;
static constexpr uint64_t L3bits = 10;
static constexpr uint64_t L2bits = 10;
static constexpr uint64_t L1bits = 12;
static constexpr uint64_t Basebits = 8;
static constexpr std::uint64_t Tagbits = 24;
static constexpr std::uint64_t L3bits = 10;
static constexpr std::uint64_t L2bits = 10;
static constexpr std::uint64_t L1bits = 12;
static constexpr std::uint64_t Basebits = 8;
static constexpr bool NOEXCEPT_CHECK = false; // exception handling flag
static constexpr size_t CachelineSize = 64;
static constexpr std::size_t CachelineSize = 64;
static constexpr std::size_t CachelineAlignment = 16; // must not be larger than alignof(std::max_align_t), see issue #1
};

template <typename T, typename TRAITS = traits> class queue final {
public:
static bool is_lock_free_v() {
return std::atomic<uint64_t>{}.is_lock_free();
return std::atomic<std::uint64_t>{}.is_lock_free();
}
static constexpr size_t cacheline_size = TRAITS::CachelineSize;
static constexpr uint64_t BaseMask = getBitmask<uint64_t>(TRAITS::Basebits);
static constexpr uint64_t L1Mask = getBitmask<uint64_t>(TRAITS::L1bits)
static constexpr std::size_t cacheline_size = TRAITS::CachelineSize;
static constexpr std::size_t cacheline_alignment = TRAITS::CachelineAlignment;
static constexpr std::uint64_t BaseMask = getBitmask<std::uint64_t>(TRAITS::Basebits);
static constexpr std::uint64_t L1Mask = getBitmask<std::uint64_t>(TRAITS::L1bits)
<< TRAITS::Basebits;
static constexpr uint64_t L2Mask = getBitmask<uint64_t>(TRAITS::L2bits)
static constexpr std::uint64_t L2Mask = getBitmask<std::uint64_t>(TRAITS::L2bits)
<< (TRAITS::Basebits + TRAITS::L1bits);
static constexpr uint64_t L3Mask =
getBitmask<uint64_t>(TRAITS::L3bits)
static constexpr std::uint64_t L3Mask =
getBitmask<std::uint64_t>(TRAITS::L3bits)
<< (TRAITS::Basebits + TRAITS::L1bits + TRAITS::L2bits);
static constexpr uint64_t TagMask =
getBitmask<uint64_t>(TRAITS::Tagbits)
static constexpr std::uint64_t TagMask =
getBitmask<std::uint64_t>(TRAITS::Tagbits)
<< (TRAITS::Basebits + TRAITS::L1bits + TRAITS::L2bits + TRAITS::L3bits);
static constexpr uint64_t TagShift = 64 - TRAITS::Tagbits;
static constexpr uint64_t TagPlus1 = static_cast<uint64_t>(1) << TagShift;
static constexpr std::uint64_t TagShift = 64 - TRAITS::Tagbits;
static constexpr std::uint64_t TagPlus1 = static_cast<std::uint64_t>(1) << TagShift;

public: // assert bits settings meet requirements
static_assert(TRAITS::Tagbits + TRAITS::L3bits + TRAITS::L2bits +
Expand All @@ -59,13 +61,13 @@ template <typename T, typename TRAITS = traits> class queue final {
queue() : nodeCount(3), dequeueIx(2), enqueueIx(2), spawnIx(1), recycleIx(1) {
container.get(index(0)); // allocate initial space
}
queue(size_t size) // pre-allocate size
queue(std::size_t size) // pre-allocate size
: nodeCount(3), dequeueIx(2), enqueueIx(2), spawnIx(1), recycleIx(1) {
container.get(index(0));

if (size > (static_cast<uint64_t>(1) << TRAITS::Basebits)) {
if (size > (static_cast<std::uint64_t>(1) << TRAITS::Basebits)) {
index ix;
for (size_t i = (static_cast<uint64_t>(1) << TRAITS::Basebits); i < size;
for (std::size_t i = (static_cast<std::uint64_t>(1) << TRAITS::Basebits); i < size;
++i) {
auto &node = getNode(ix);
recycle(ix);
Expand Down Expand Up @@ -107,9 +109,9 @@ template <typename T, typename TRAITS = traits> class queue final {
return true;
}

template <typename IT> void bulk_enqueue(IT it, size_t count) {
template <typename IT> void bulk_enqueue(IT it, std::size_t count) {
index firstidx(0), preidx(0), lastidx(0);
for (size_t i = 0; i < count; ++i) {
for (std::size_t i = 0; i < count; ++i) {
lastidx = encapsulate(*it++);
if (firstidx == 0)
firstidx = lastidx;
Expand All @@ -126,9 +128,9 @@ template <typename T, typename TRAITS = traits> class queue final {
}

template <typename IT>
size_t bulk_dequeue(IT &&it, size_t maxcount) // or IT& it to return the
std::size_t bulk_dequeue(IT &&it, std::size_t maxcount) // or IT& it to return the
{
size_t count(0);
std::size_t count(0);
while (maxcount-- && dequeue(*it++)) {
++count;
}
Expand Down Expand Up @@ -181,15 +183,15 @@ template <typename T, typename TRAITS = traits> class queue final {
}
}
}
uint64_t getNodeCount() { return nodeCount; } // get in-use-nodes count
std::uint64_t getNodeCount() { return nodeCount; } // get in-use-nodes count

private: // internal data structures
struct index // simulate tagged pointer
{
index(uint64_t newval) noexcept
index(std::uint64_t newval) noexcept
: value(newval) {} // is_trivially_copyable must be true
index() noexcept : value(0) {}
inline operator uint64_t() const { return value; }
inline operator std::uint64_t() const { return value; }
std::uint64_t getVersion() { return (value & TagMask) >> TagShift; }
inline void increTag() {
value = (value & ~TagMask) | ((value + TagPlus1) & TagMask);
Expand Down Expand Up @@ -269,14 +271,14 @@ template <typename T, typename TRAITS = traits> class queue final {
inline node &get(index const &ix) { return operator[](ix); }
inline node &at(index const &ix) { return operator[](ix); }
inline node &operator[](index const &ix) { return nodes[ix & BaseMask]; }
std::array<node, static_cast<uint64_t>(1) << TRAITS::Basebits> nodes;
std::array<node, static_cast<std::uint64_t>(1) << TRAITS::Basebits> nodes;
};

template <typename SubGroup, uint64_t BitMask> struct nestedcontainer {
static constexpr uint64_t mask = BitMask;
static constexpr uint64_t bits = getSetBitsCount(mask);
static constexpr uint64_t shift = getShiftBitsCount(mask);
std::array<std::atomic<SubGroup *>, static_cast<uint64_t>(1) << bits>
template <typename SubGroup, std::uint64_t BitMask> struct nestedcontainer {
static constexpr std::uint64_t mask = BitMask;
static constexpr std::uint64_t bits = getSetBitsCount(mask);
static constexpr std::uint64_t shift = getShiftBitsCount(mask);
std::array<std::atomic<SubGroup *>, static_cast<std::uint64_t>(1) << bits>
subgroups;
nestedcontainer() {
for (auto &gptr : subgroups) {
Expand Down Expand Up @@ -411,17 +413,17 @@ template <typename T, typename TRAITS = traits> class queue final {
using L1container = nestedcontainer<basecontainer, L1Mask>;
using L2container = nestedcontainer<L1container, L2Mask>;
nestedcontainer<L2container, L3Mask> container;
alignas(cacheline_size) char cacheline_padding1[cacheline_size];
alignas(cacheline_size) std::atomic<uint64_t> nodeCount; // # of allocated nodes, not the #
// of elements stored in the queue
alignas(cacheline_size) char cacheline_padding2[cacheline_size];
alignas(cacheline_size) std::atomic<index> dequeueIx; // dequeue pointer
alignas(cacheline_size) char cacheline_padding3[cacheline_size];
alignas(cacheline_size) std::atomic<index> enqueueIx; // enqueue pointer
alignas(cacheline_size) char cacheline_padding4[cacheline_size];
alignas(cacheline_size) std::atomic<index> spawnIx; // spawn pointer
alignas(cacheline_size) char cacheline_padding5[cacheline_size];
alignas(cacheline_size) std::atomic<index> recycleIx; // recycle pointer
alignas(cacheline_size) char cacheline_padding6[cacheline_size];
alignas(cacheline_alignment) char cacheline_padding1[cacheline_size];
alignas(cacheline_alignment) std::atomic<std::uint64_t> nodeCount; // # of allocated nodes, not the #
// of elements stored in the queue
alignas(cacheline_alignment) char cacheline_padding2[cacheline_size];
alignas(cacheline_alignment) std::atomic<index> dequeueIx; // dequeue pointer
alignas(cacheline_alignment) char cacheline_padding3[cacheline_size];
alignas(cacheline_alignment) std::atomic<index> enqueueIx; // enqueue pointer
alignas(cacheline_alignment) char cacheline_padding4[cacheline_size];
alignas(cacheline_alignment) std::atomic<index> spawnIx; // spawn pointer
alignas(cacheline_alignment) char cacheline_padding5[cacheline_size];
alignas(cacheline_alignment) std::atomic<index> recycleIx; // recycle pointer
alignas(cacheline_alignment) char cacheline_padding6[cacheline_size];
};
} // namespace async
} // namespace async
10 changes: 5 additions & 5 deletions async/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class threadpool final {
public:
static int defaultpoolsize() { return std::thread::hardware_concurrency(); }

threadpool(int poolsize = static_cast<int>(defaultpoolsize()))
threadpool(int poolsize = defaultpoolsize())
: idlecount(0), conflag(false) {
configurepool(poolsize);
}
Expand All @@ -33,7 +33,7 @@ class threadpool final {

~threadpool() { cleanup(); }

inline size_t size() {
inline std::size_t size() {
std::lock_guard<std::mutex> lg(poolmux);
return threads.size();
}
Expand All @@ -43,11 +43,11 @@ class threadpool final {
// can be called to resize the pool at any time after construction and before
// destruction, recommand to be called from main thread or manager thread even
// though it is thread-safe
void configurepool(size_t poolsize) {
void configurepool(std::size_t poolsize) {
std::unique_lock<std::mutex> veclk(poolmux);
auto currentsize = threads.size();
if (currentsize < poolsize) { // expand the pool
for (auto const &v : std::vector<bool>(poolsize - currentsize)) {
for (std::size_t i = currentsize; i < poolsize; i++) {
tpstops.emplace_back(addthread());
}
} else if (currentsize > poolsize) { // shrink the pool
Expand Down Expand Up @@ -189,4 +189,4 @@ class threadpool final {
std::condition_variable qcv;
bool conflag; // continue flag for cv
};
} // namespace async
} // namespace async
18 changes: 9 additions & 9 deletions async/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ static constexpr unsigned int getSetBitsCount(std::uint64_t n) {
return count;
}

static constexpr unsigned int getShiftBitsCount(uint64_t n) {
static constexpr unsigned int getShiftBitsCount(std::uint64_t n) {
// requires c++14
unsigned int count{0};
if (n == 0)
Expand All @@ -48,7 +48,7 @@ static constexpr unsigned int getSetBitsCount(std::uint64_t n) {
return n == 0 ? 0 : 1 + getSetBitsCount(n & (n - 1));
}

static constexpr unsigned int getShiftBitsCount(uint64_t n) {
static constexpr unsigned int getShiftBitsCount(std::uint64_t n) {
return n == 0 ? 0 : ((n & 0x1) == 0 ? 1 + getShiftBitsCount(n >> 1) : 0);
}

Expand Down Expand Up @@ -81,8 +81,8 @@ inline std::string getErrorMsg(std::string const &message, char const *file,
#include <stdlib.h>
#include <windows.h>

static size_t cache_line_size() {
size_t line_size = 0;
static std::size_t cache_line_size() {
std::size_t line_size = 0;
DWORD buffer_size = 0;
DWORD i = 0;
SYSTEM_LOGICAL_PROCESSOR_INFORMATION *buffer = 0;
Expand All @@ -104,15 +104,15 @@ static size_t cache_line_size() {

#elif defined(__linux__)
#include <unistd.h>
static size_t cache_line_size() {
size_t line_size = sysconf(_SC_LEVEL1_DCACHE_LINESIZE);
static std::size_t cache_line_size() {
std::size_t line_size = sysconf(_SC_LEVEL1_DCACHE_LINESIZE);
return line_size;
}
#elif defined(__APPLE__)
#include <sys/sysctl.h>
static size_t cache_line_size() {
size_t line_size = 0;
size_t size_of_linesize = sizeof(line_size);
static std::size_t cache_line_size() {
std::size_t line_size = 0;
std::size_t size_of_linesize = sizeof(line_size);
sysctlbyname("hw.cachelinesize", &line_size, &size_of_linesize, 0, 0);
return line_size;
}
Expand Down