Skip to content

Commit

Permalink
Added done callback to BLThread so it's possible to do notification a…
Browse files Browse the repository at this point in the history
…fter the thread status gets to IDLE, removed BLThread::makeIdle()
  • Loading branch information
kobalicek committed Apr 17, 2019
1 parent 8aaee49 commit 16c7c89
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 53 deletions.
67 changes: 33 additions & 34 deletions src/blend2d/blthreading.cpp
Expand Up @@ -104,16 +104,14 @@ BLResult blThreadEventTimedWait(BLThreadEvent* self, uint64_t microseconds) noex
struct BLThreadEventPosixImpl {
BLConditionVariable cond;
BLMutex mutex;
uint8_t manualReset;
uint8_t signaled;
uint16_t reserved;
uint32_t manualReset;
uint32_t signaled;

BL_INLINE BLThreadEventPosixImpl(bool manualReset, bool signaled) noexcept
: cond(),
mutex(),
manualReset(manualReset),
signaled(signaled),
reserved(0) {}
signaled(signaled) {}
};

BLResult blThreadEventCreate(BLThreadEvent* self, bool manualReset, bool signaled) noexcept {
Expand Down Expand Up @@ -222,8 +220,6 @@ BLResult blThreadEventTimedWait(BLThreadEvent* self, uint64_t microseconds) noex

class BLInternalThread : public BLThread {
public:
typedef void* VoidPtr;

#ifdef _WIN32
intptr_t handle;
#else
Expand All @@ -234,19 +230,21 @@ class BLInternalThread : public BLThread {
volatile uint32_t internalStatus;
volatile uint32_t reserved;

volatile BLThreadWorkFunc workFunc;
volatile VoidPtr workData;
BLThreadFunc workFunc;
BLThreadFunc doneFunc;
void* workData;

BLThreadExitFunc exitFunc;
VoidPtr exitData;
BLThreadFunc exitFunc;
void* exitData;

BL_INLINE BLInternalThread(BLThreadExitFunc exitFunc, void* exitData) noexcept
BL_INLINE BLInternalThread(BLThreadFunc exitFunc, void* exitData) noexcept
: BLThread { &blThreadVirt },
handle {},
event(true, false),
internalStatus(BL_THREAD_STATUS_IDLE),
reserved(0),
workFunc(nullptr),
doneFunc(nullptr),
workData(nullptr),
exitFunc(exitFunc),
exitData(exitData) {}
Expand All @@ -260,7 +258,7 @@ class BLInternalThread : public BLThread {
}
};

static BLInternalThread* blThreadNew(BLThreadExitFunc exitFunc, void* exitData) noexcept {
static BLInternalThread* blThreadNew(BLThreadFunc exitFunc, void* exitData) noexcept {
BLInternalThread* self = static_cast<BLInternalThread*>(malloc(sizeof(BLInternalThread)));
if (BL_UNLIKELY(!self))
return nullptr;
Expand All @@ -282,11 +280,15 @@ static BL_INLINE void blThreadEntryPoint(BLInternalThread* thread) noexcept {
// Wait for some work to do.
thread->event.wait();

BLThreadWorkFunc workFunc = thread->workFunc;
void* workData = thread->workData;
BLThreadFunc workFunc = thread->workFunc;
BLThreadFunc doneFunc = thread->doneFunc;
void* workData = thread->workData;

thread->workFunc = nullptr;
thread->doneFunc = nullptr;
thread->workData = nullptr;

blAtomicThreadFence();
thread->event.reset();

// If the compare-exchange fails and the function was not provided it means that this thread is quitting.
Expand All @@ -299,39 +301,37 @@ static BL_INLINE void blThreadEntryPoint(BLInternalThread* thread) noexcept {

// Again, if the compare-exchange fails it means we are quitting.
value = BL_THREAD_STATUS_RUNNING;
if (!std::atomic_compare_exchange_strong((std::atomic<uint32_t>*)&thread->internalStatus, &value, uint32_t(BL_THREAD_STATUS_IDLE))) {
if (value == BL_THREAD_STATUS_QUITTING)
break;
}
bool res = !std::atomic_compare_exchange_strong((std::atomic<uint32_t>*)&thread->internalStatus, &value, uint32_t(BL_THREAD_STATUS_IDLE));

if (doneFunc)
doneFunc(thread, workData);

if (!res && value == BL_THREAD_STATUS_QUITTING)
break;
}

thread->exitFunc(thread, thread->exitData);
}

static uint32_t BL_CDECL blThreadStatus(const BLThread* self_) noexcept {
const BLInternalThread* self = static_cast<const BLInternalThread*>(self_);
return self->internalStatus;
return blAtomicFetch(&self->internalStatus);
}

static BLResult BL_CDECL blThreadRun(BLThread* self_, BLThreadWorkFunc func, void* data) noexcept {
static BLResult BL_CDECL blThreadRun(BLThread* self_, BLThreadFunc workFunc, BLThreadFunc doneFunc, void* data) noexcept {
BLInternalThread* self = static_cast<BLInternalThread*>(self_);
if (self->internalStatus != BL_THREAD_STATUS_IDLE)
if (self->event.isSignaled())
return blTraceError(BL_ERROR_BUSY);

self->workFunc = func;
self->workFunc = workFunc;
self->doneFunc = doneFunc;
self->workData = data;
blAtomicThreadFence();

self->event.signal();
return BL_SUCCESS;
}

static bool BL_CDECL blThreadMakeIdle(BLThread* self_) noexcept {
const BLInternalThread* self = static_cast<const BLInternalThread*>(self_);

uint32_t expected = BL_THREAD_STATUS_RUNNING;
return std::atomic_compare_exchange_strong((std::atomic<uint32_t>*)&self->internalStatus, &expected, uint32_t(BL_THREAD_STATUS_IDLE));
}

static BLResult BL_CDECL blThreadQuit(BLThread* self_) noexcept {
BLInternalThread* self = static_cast<BLInternalThread*>(self_);

Expand All @@ -352,7 +352,7 @@ static unsigned BL_STDCALL blThreadEntryPointWrapper(void* arg) {
return 0;
}

BLResult BL_CDECL blThreadCreate(BLThread** threadOut, const BLThreadAttributes* attributes, BLThreadExitFunc exitFunc, void* exitData) noexcept {
BLResult BL_CDECL blThreadCreate(BLThread** threadOut, const BLThreadAttributes* attributes, BLThreadFunc exitFunc, void* exitData) noexcept {
BLInternalThread* thread = blThreadNew(exitFunc, exitData);
if (BL_UNLIKELY(!thread))
return blTraceError(BL_ERROR_OUT_OF_MEMORY);
Expand Down Expand Up @@ -399,7 +399,7 @@ static void* blThreadEntryPointWrapper(void* arg) {
return nullptr;
}

BLResult BL_CDECL blThreadCreate(BLThread** threadOut, const BLThreadAttributes* attributes, BLThreadExitFunc exitFunc, void* exitData) noexcept {
BLResult BL_CDECL blThreadCreate(BLThread** threadOut, const BLThreadAttributes* attributes, BLThreadFunc exitFunc, void* exitData) noexcept {
pthread_attr_t ptAttr;
int err = pthread_attr_init(&ptAttr);

Expand All @@ -416,7 +416,7 @@ BLResult BL_CDECL blThreadCreate(BLThread** threadOut, const BLThreadAttributes*
return result;
}

BLResult blThreadCreatePt(BLThread** threadOut, const pthread_attr_t* ptAttr, BLThreadExitFunc exitFunc, void* exitData) noexcept {
BLResult blThreadCreatePt(BLThread** threadOut, const pthread_attr_t* ptAttr, BLThreadFunc exitFunc, void* exitData) noexcept {
BLInternalThread* thread = blThreadNew(exitFunc, exitData);
if (BL_UNLIKELY(!thread))
return blTraceError(BL_ERROR_OUT_OF_MEMORY);
Expand Down Expand Up @@ -460,6 +460,5 @@ void blThreadingRtInit(BLRuntimeContext* rt) noexcept {
blThreadVirt.destroy = blThreadDestroy;
blThreadVirt.status = blThreadStatus;
blThreadVirt.run = blThreadRun;
blThreadVirt.makeIdle = blThreadMakeIdle;
blThreadVirt.quit = blThreadQuit;
}
45 changes: 29 additions & 16 deletions src/blend2d/blthreading_p.h
Expand Up @@ -34,20 +34,38 @@ struct BLThreadAttributes;
// [Typedefs]
// ============================================================================

typedef void (BL_CDECL* BLThreadWorkFunc)(BLThread* thread, void* data) BL_NOEXCEPT;
typedef void (BL_CDECL* BLThreadExitFunc)(BLThread* thread, void* data) BL_NOEXCEPT;
typedef void (BL_CDECL* BLThreadFunc)(BLThread* thread, void* data) BL_NOEXCEPT;

// ============================================================================
// [Constants]
// ============================================================================

enum BLThreadStatus : uint32_t {
BL_THREAD_STATUS_NONE = 0,
BL_THREAD_STATUS_IDLE = 1,
BL_THREAD_STATUS_RUNNING = 2,
BL_THREAD_STATUS_QUITTING = 3
BL_THREAD_STATUS_IDLE = 0,
BL_THREAD_STATUS_RUNNING = 1,
BL_THREAD_STATUS_QUITTING = 2
};

// ============================================================================
// [Atomics]
// ============================================================================

static BL_INLINE void blAtomicThreadFence(std::memory_order order = std::memory_order_acq_rel) noexcept {
std::atomic_thread_fence(order);
}

template<typename T>
static BL_INLINE typename std::remove_volatile<T>::type blAtomicFetch(const T* p, std::memory_order order = std::memory_order_relaxed) noexcept {
typedef typename BLInternal::StdInt<sizeof(T), 0>::Type RawT;
return (typename std::remove_volatile<T>::type)((const std::atomic<RawT>*)p)->load(order);
}

template<typename T>
static BL_INLINE void blAtomicStore(T* p, typename std::remove_volatile<T>::type value, std::memory_order order = std::memory_order_acq_rel) noexcept {
typedef typename BLInternal::StdInt<sizeof(T), 0>::Type RawT;
return ((std::atomic<RawT>*)p)->store((RawT)value, order);
}

// ============================================================================
// [Utilities]
// ============================================================================
Expand Down Expand Up @@ -295,8 +313,7 @@ struct BLThreadAttributes {
struct BLThreadVirt {
BLResult (BL_CDECL* destroy)(BLThread* self) BL_NOEXCEPT;
uint32_t (BL_CDECL* status)(const BLThread* self) BL_NOEXCEPT;
BLResult (BL_CDECL* run)(BLThread* self, BLThreadWorkFunc func, void* data) BL_NOEXCEPT;
bool (BL_CDECL* makeIdle)(BLThread* self) BL_NOEXCEPT;
BLResult (BL_CDECL* run)(BLThread* self, BLThreadFunc workFunc, BLThreadFunc doneFunc, void* data) BL_NOEXCEPT;
BLResult (BL_CDECL* quit)(BLThread* self) BL_NOEXCEPT;
};

Expand All @@ -313,12 +330,8 @@ struct BLThread {
return virt->status(this);
}

BL_INLINE BLResult run(BLThreadWorkFunc func, void* data) noexcept {
return virt->run(this, func, data);
}

BL_INLINE bool makeIdle() noexcept {
return virt->makeIdle(this);
BL_INLINE BLResult run(BLThreadFunc workFunc, BLThreadFunc doneFunc, void* data) noexcept {
return virt->run(this, workFunc, doneFunc, data);
}

BL_INLINE BLResult quit() noexcept {
Expand All @@ -328,10 +341,10 @@ struct BLThread {
// --------------------------------------------------------------------------
};

BL_HIDDEN BLResult BL_CDECL blThreadCreate(BLThread** threadOut, const BLThreadAttributes* attributes, BLThreadExitFunc exitFunc, void* exitData) noexcept;
BL_HIDDEN BLResult BL_CDECL blThreadCreate(BLThread** threadOut, const BLThreadAttributes* attributes, BLThreadFunc exitFunc, void* exitData) noexcept;

#ifndef _WIN32
BL_HIDDEN BLResult blThreadCreatePt(BLThread** threadOut, const pthread_attr_t* ptAttr, BLThreadExitFunc exitFunc, void* exitData) noexcept;
BL_HIDDEN BLResult blThreadCreatePt(BLThread** threadOut, const pthread_attr_t* ptAttr, BLThreadFunc exitFunc, void* exitData) noexcept;
BL_HIDDEN BLResult blThreadSetPtAttributes(pthread_attr_t* ptAttr, const BLThreadAttributes* src) noexcept;
#endif

Expand Down
13 changes: 10 additions & 3 deletions src/blend2d/blthreadpool.cpp
Expand Up @@ -430,6 +430,12 @@ struct ThreadTestData {
static void BL_CDECL test_thread_entry(BLThread* thread, void* data_) noexcept {
ThreadTestData* data = static_cast<ThreadTestData*>(data_);
INFO("[#%u] Thread %p running\n", data->iter, thread);
}

static void BL_CDECL test_thread_done(BLThread* thread, void* data_) noexcept {
ThreadTestData* data = static_cast<ThreadTestData*>(data_);
INFO("[#%u] Thread %p done\n", data->iter, thread);

if (blAtomicFetchSub(&data->counter) == 1)
data->event.signal();
}
Expand All @@ -447,17 +453,18 @@ UNIT(blend2d_thread_pool) {
EXPECT(n == 0);

INFO("Repeatedly acquiring / releasing %u threads with a simple task", kThreadCount);
for (uint32_t i = 0; i < 5; i++) {
for (uint32_t i = 0; i < 10; i++) {
data.iter = i;

INFO("[#%u] Acquiring %u threads from thread-pool", i, kThreadCount);
uint32_t acquiredCount = tp->acquireThreads(threads, kThreadCount);
EXPECT(acquiredCount == kThreadCount);

data.counter = kThreadCount;
blAtomicStore(&data.counter, kThreadCount);
INFO("[#%u] Running %u threads", i, kThreadCount);
for (BLThread* thread : threads) {
thread->run(test_thread_entry, &data);
BLResult result = thread->run(test_thread_entry, test_thread_done, &data);
EXPECT(result == BL_SUCCESS);
}

INFO("[#%u] Waiting and releasing", i);
Expand Down

0 comments on commit 16c7c89

Please sign in to comment.