Skip to content

Commit

Permalink
butex_wake* support nosignal flag, use bthread_flush signal batch (#1751
Browse files Browse the repository at this point in the history
)
  • Loading branch information
yanglimingcn committed Jun 6, 2022
1 parent 958a4c3 commit 341ad4d
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 18 deletions.
44 changes: 31 additions & 13 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
}

extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group_nosignal;

// Returns 0 when no need to unschedule or successfully unscheduled,
// -1 otherwise.
Expand Down Expand Up @@ -256,12 +257,29 @@ void butex_destroy(void* butex) {
butil::return_object(b);
}

inline TaskGroup* get_task_group(TaskControl* c) {
TaskGroup* g = tls_task_group;
return g ? g : c->choose_one_group();
inline TaskGroup* get_task_group(TaskControl* c, bool nosignal = false) {
TaskGroup* g;
if (nosignal) {
g = tls_task_group_nosignal;
if (NULL == g) {
g = c->choose_one_group();
tls_task_group_nosignal = g;
}
} else {
g = tls_task_group ? tls_task_group : c->choose_one_group();
}
return g;
}

inline void run_in_local_task_group(TaskGroup* g, bthread_t tid, bool nosignal) {
if (!nosignal) {
TaskGroup::exchange(&g, tid);
} else {
g->ready_to_run(tid, nosignal);
}
}

int butex_wake(void* arg) {
int butex_wake(void* arg, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
ButexWaiter* front = NULL;
{
Expand All @@ -279,16 +297,16 @@ int butex_wake(void* arg) {
}
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
TaskGroup* g = tls_task_group;
if (g) {
TaskGroup::exchange(&g, bbw->tid);
TaskGroup* g = get_task_group(bbw->control, nosignal);
if (g == tls_task_group) {
run_in_local_task_group(g, bbw->tid, nosignal);
} else {
bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
g->ready_to_run_remote(bbw->tid, nosignal);
}
return 1;
}

int butex_wake_all(void* arg) {
int butex_wake_all(void* arg, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);

ButexWaiterList bthread_waiters;
Expand Down Expand Up @@ -324,7 +342,7 @@ int butex_wake_all(void* arg) {
next->RemoveFromList();
unsleep_if_necessary(next, get_global_timer_thread());
++nwakeup;
TaskGroup* g = get_task_group(next->control);
TaskGroup* g = get_task_group(next->control, nosignal);
const int saved_nwakeup = nwakeup;
while (!bthread_waiters.empty()) {
// pop reversely
Expand All @@ -335,13 +353,13 @@ int butex_wake_all(void* arg) {
g->ready_to_run_general(w->tid, true);
++nwakeup;
}
if (saved_nwakeup != nwakeup) {
if (!nosignal && saved_nwakeup != nwakeup) {
g->flush_nosignal_tasks_general();
}
if (g == tls_task_group) {
TaskGroup::exchange(&g, next->tid);
run_in_local_task_group(g, next->tid, nosignal);
} else {
g->ready_to_run_remote(next->tid);
g->ready_to_run_remote(next->tid, nosignal);
}
return nwakeup;
}
Expand Down
4 changes: 2 additions & 2 deletions src/bthread/butex.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ void butex_destroy(void* butex);

// Wake up at most 1 thread waiting on |butex|.
// Returns # of threads woken up.
int butex_wake(void* butex);
int butex_wake(void* butex, bool nosignal = false);

// Wake up all threads waiting on |butex|.
// Returns # of threads woken up.
int butex_wake_all(void* butex);
int butex_wake_all(void* butex, bool nosignal = false);

// Wake up all threads waiting on |butex| except a bthread whose identifier
// is |excluded_bthread|. This function does not yield.
Expand Down
4 changes: 2 additions & 2 deletions src/bthread/countdown_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ CountdownEvent::~CountdownEvent() {
butex_destroy(_butex);
}

void CountdownEvent::signal(int sig) {
void CountdownEvent::signal(int sig, bool flush) {
// Have to save _butex, *this is probably defreferenced by the wait thread
// which sees fetch_sub
void* const saved_butex = _butex;
Expand All @@ -50,7 +50,7 @@ void CountdownEvent::signal(int sig) {
return;
}
LOG_IF(ERROR, prev < sig) << "Counter is over decreased";
butex_wake_all(saved_butex);
butex_wake_all(saved_butex, flush);
}

int CountdownEvent::wait() {
Expand Down
3 changes: 2 additions & 1 deletion src/bthread/countdown_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class CountdownEvent {
void reset(int v = 1);

// Decrease the counter by |sig|
void signal(int sig = 1);
// when flush is true, after signal we need to call bthread_flush
void signal(int sig = 1, bool flush = false);

// Block current thread until the counter reaches 0.
// Returns 0 on success, error code otherwise.
Expand Down

0 comments on commit 341ad4d

Please sign in to comment.