Skip to content

Commit

Permalink
butex_wake_all support nosignal flag, use bthread_flush signal batch
Browse files Browse the repository at this point in the history
  • Loading branch information
yanglimingcn committed Apr 26, 2022
1 parent 7536d0b commit a1f5745
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 16 deletions.
28 changes: 20 additions & 8 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
}

extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group_nosignal;

// Returns 0 when no need to unschedule or successfully unscheduled,
// -1 otherwise.
Expand Down Expand Up @@ -261,7 +262,7 @@ inline TaskGroup* get_task_group(TaskControl* c) {
return g ? g : c->choose_one_group();
}

int butex_wake(void* arg, bool nosched) {
int butex_wake(void* arg, bool nosched, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
ButexWaiter* front = NULL;
{
Expand All @@ -282,17 +283,23 @@ int butex_wake(void* arg, bool nosched) {
TaskGroup* g = tls_task_group;
if (g) {
if (!nosched) {
TaskGroup::exchange(&g, bbw->tid);
TaskGroup::exchange(&g, bbw->tid, nosignal);
} else {
g->ready_to_run_general(bbw->tid, false);
g->ready_to_run_general(bbw->tid, nosignal);
}
} else {
bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid, nosignal);
}
if (nosignal) {
if (!tls_task_group) {
tls_task_group_nosignal = g;
}
return 0;
}
return 1;
}

int butex_wake_all(void* arg, bool nosched) {
int butex_wake_all(void* arg, bool nosched, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);

ButexWaiterList bthread_waiters;
Expand Down Expand Up @@ -341,14 +348,19 @@ int butex_wake_all(void* arg, bool nosched) {
g->ready_to_run_general(w->tid, true);
++nwakeup;
}
if (saved_nwakeup != nwakeup) {
if (nosignal) {
if (!tls_task_group) {
tls_task_group_nosignal = g;
}
nwakeup = 0;
} else if (saved_nwakeup != nwakeup) {
g->flush_nosignal_tasks_general();
}
if (!nosched) {
if (g == tls_task_group) {
TaskGroup::exchange(&g, next->tid);
TaskGroup::exchange(&g, next->tid, nosignal);
} else {
g->ready_to_run_remote(next->tid);
g->ready_to_run_remote(next->tid, nosignal);
}
}
return nwakeup;
Expand Down
4 changes: 2 additions & 2 deletions src/bthread/butex.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ void butex_destroy(void* butex);

// Wake up at most 1 thread waiting on |butex|.
// Returns # of threads woken up.
int butex_wake(void* butex, bool nosched = false);
int butex_wake(void* butex, bool nosched = false, bool nosignal = false);

// Wake up all threads waiting on |butex|.
// Returns # of threads woken up.
int butex_wake_all(void* butex, bool nosched = false);
int butex_wake_all(void* butex, bool nosched = false, bool nosignal = false);

// Wake up all threads waiting on |butex| except a bthread whose identifier
// is |excluded_bthread|. This function does not yield.
Expand Down
4 changes: 2 additions & 2 deletions src/bthread/countdown_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ CountdownEvent::~CountdownEvent() {
butex_destroy(_butex);
}

void CountdownEvent::signal(int sig) {
void CountdownEvent::signal(int sig, bool nosignal) {
// Have to save _butex, *this is probably defreferenced by the wait thread
// which sees fetch_sub
void* const saved_butex = _butex;
Expand All @@ -50,7 +50,7 @@ void CountdownEvent::signal(int sig) {
return;
}
LOG_IF(ERROR, prev < sig) << "Counter is over decreased";
butex_wake_all(saved_butex, true);
butex_wake_all(saved_butex, nosignal);
}

int CountdownEvent::wait() {
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/countdown_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class CountdownEvent {
void reset(int v = 1);

// Decrease the counter by |sig|
void signal(int sig = 1);
void signal(int sig = 1, bool nosignal = false);

// Block current thread until the counter reaches 0.
// Returns 0 on success, error code otherwise.
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/task_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class TaskGroup {
// then being popped by sched(pg), which is not necessary.
static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
static void sched_to(TaskGroup** pg, bthread_t next_tid);
static void exchange(TaskGroup** pg, bthread_t next_tid);
static void exchange(TaskGroup** pg, bthread_t next_tid, bool nosignal = false);

// The callback will be run in the beginning of next-run bthread.
// Can't be called by current bthread directly because it often needs
Expand Down
4 changes: 2 additions & 2 deletions src/bthread/task_group_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ inline TaskMeta* TaskGroup::address_meta(bthread_t tid) {
return address_resource(get_slot(tid));
}

inline void TaskGroup::exchange(TaskGroup** pg, bthread_t next_tid) {
inline void TaskGroup::exchange(TaskGroup** pg, bthread_t next_tid, bool nosignal) {
TaskGroup* g = *pg;
if (g->is_current_pthread_task()) {
return g->ready_to_run(next_tid);
return g->ready_to_run(next_tid, nosignal);
}
ReadyToRunArgs args = { g->current_tid(), false };
g->set_remained((g->current_task()->about_to_quit
Expand Down

0 comments on commit a1f5745

Please sign in to comment.