Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

butex_wake_all support nosignal flag #1751

Merged
merged 1 commit into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 31 additions & 13 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
}

extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group_nosignal;

// Returns 0 when no need to unschedule or successfully unscheduled,
// -1 otherwise.
Expand Down Expand Up @@ -256,12 +257,29 @@ void butex_destroy(void* butex) {
butil::return_object(b);
}

inline TaskGroup* get_task_group(TaskControl* c) {
TaskGroup* g = tls_task_group;
return g ? g : c->choose_one_group();
inline TaskGroup* get_task_group(TaskControl* c, bool nosignal = false) {
TaskGroup* g;
if (nosignal) {
g = tls_task_group_nosignal;
if (NULL == g) {
g = c->choose_one_group();
tls_task_group_nosignal = g;
}
} else {
g = tls_task_group ? tls_task_group : c->choose_one_group();
}
return g;
}

inline void run_in_local_task_group(TaskGroup* g, bthread_t tid, bool nosignal) {
if (!nosignal) {
TaskGroup::exchange(&g, tid);
} else {
g->ready_to_run(tid, nosignal);
}
}

int butex_wake(void* arg) {
int butex_wake(void* arg, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
ButexWaiter* front = NULL;
{
Expand All @@ -279,16 +297,16 @@ int butex_wake(void* arg) {
}
ButexBthreadWaiter* bbw = static_cast<ButexBthreadWaiter*>(front);
unsleep_if_necessary(bbw, get_global_timer_thread());
TaskGroup* g = tls_task_group;
if (g) {
TaskGroup::exchange(&g, bbw->tid);
TaskGroup* g = get_task_group(bbw->control, nosignal);
if (g == tls_task_group) {
run_in_local_task_group(g, bbw->tid, nosignal);
} else {
bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
g->ready_to_run_remote(bbw->tid, nosignal);
}
return 1;
}

int butex_wake_all(void* arg) {
int butex_wake_all(void* arg, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);

ButexWaiterList bthread_waiters;
Expand Down Expand Up @@ -324,7 +342,7 @@ int butex_wake_all(void* arg) {
next->RemoveFromList();
unsleep_if_necessary(next, get_global_timer_thread());
++nwakeup;
TaskGroup* g = get_task_group(next->control);
TaskGroup* g = get_task_group(next->control, nosignal);
const int saved_nwakeup = nwakeup;
while (!bthread_waiters.empty()) {
// pop reversely
Expand All @@ -335,13 +353,13 @@ int butex_wake_all(void* arg) {
g->ready_to_run_general(w->tid, true);
++nwakeup;
}
if (saved_nwakeup != nwakeup) {
if (!nosignal && saved_nwakeup != nwakeup) {
g->flush_nosignal_tasks_general();
}
if (g == tls_task_group) {
TaskGroup::exchange(&g, next->tid);
run_in_local_task_group(g, next->tid, nosignal);
} else {
g->ready_to_run_remote(next->tid);
g->ready_to_run_remote(next->tid, nosignal);
}
return nwakeup;
}
Expand Down
4 changes: 2 additions & 2 deletions src/bthread/butex.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ void butex_destroy(void* butex);

// Wake up at most 1 thread waiting on |butex|.
// Returns # of threads woken up.
int butex_wake(void* butex);
int butex_wake(void* butex, bool nosignal = false);

// Wake up all threads waiting on |butex|.
// Returns # of threads woken up.
int butex_wake_all(void* butex);
int butex_wake_all(void* butex, bool nosignal = false);

// Wake up all threads waiting on |butex| except a bthread whose identifier
// is |excluded_bthread|. This function does not yield.
Expand Down
4 changes: 2 additions & 2 deletions src/bthread/countdown_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ CountdownEvent::~CountdownEvent() {
butex_destroy(_butex);
}

void CountdownEvent::signal(int sig) {
void CountdownEvent::signal(int sig, bool flush) {
// Have to save _butex, *this is probably defreferenced by the wait thread
// which sees fetch_sub
void* const saved_butex = _butex;
Expand All @@ -50,7 +50,7 @@ void CountdownEvent::signal(int sig) {
return;
}
LOG_IF(ERROR, prev < sig) << "Counter is over decreased";
butex_wake_all(saved_butex);
butex_wake_all(saved_butex, flush);
}

int CountdownEvent::wait() {
Expand Down
3 changes: 2 additions & 1 deletion src/bthread/countdown_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class CountdownEvent {
void reset(int v = 1);

// Decrease the counter by |sig|
void signal(int sig = 1);
// when flush is true, after signal we need to call bthread_flush
void signal(int sig = 1, bool flush = false);

// Block current thread until the counter reaches 0.
// Returns 0 on success, error code otherwise.
Expand Down