Skip to content

Commit

Permalink
butex_wake_all support nosignal flag, use bthread_flush signal batch
Browse files Browse the repository at this point in the history
  • Loading branch information
yanglimingcn committed May 13, 2022
1 parent 7536d0b commit 972e867
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 39 deletions.
68 changes: 38 additions & 30 deletions src/bthread/butex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ int wait_pthread(ButexPthreadWaiter& pw, timespec* ptimeout) {
}

extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group_nosignal;

// Returns 0 when no need to unschedule or successfully unscheduled,
// -1 otherwise.
Expand Down Expand Up @@ -256,12 +257,29 @@ void butex_destroy(void* butex) {
butil::return_object(b);
}

inline TaskGroup* get_task_group(TaskControl* c) {
TaskGroup* g = tls_task_group;
return g ? g : c->choose_one_group();
inline TaskGroup* get_task_group(TaskControl* c, bool nosignal = false) {
TaskGroup* g;
if (nosignal) {
g = tls_task_group_nosignal;
if (NULL == g) {
g = c->choose_one_group();
tls_task_group_nosignal = g;
}
} else {
g = c->choose_one_group();
}
return g;
}

inline void run_in_this_task_group(TaskGroup* g, bthread_t tid, bool nosched, bool nosignal) {
if (!nosched) {
TaskGroup::exchange(&g, bbw->tid, nosignal);
} else {
g->ready_to_run(bbw->tid, nosignal);
}
}

int butex_wake(void* arg, bool nosched) {
int butex_wake(void* arg, bool nosched, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);
ButexWaiter* front = NULL;
{
Expand All @@ -281,18 +299,15 @@ int butex_wake(void* arg, bool nosched) {
unsleep_if_necessary(bbw, get_global_timer_thread());
TaskGroup* g = tls_task_group;
if (g) {
if (!nosched) {
TaskGroup::exchange(&g, bbw->tid);
} else {
g->ready_to_run_general(bbw->tid, false);
}
run_in_this_task_group(g, bbw->tid, nosched, nosignal);
} else {
bbw->control->choose_one_group()->ready_to_run_remote(bbw->tid);
g = get_task_group(bbw->control, nosignal);
g->ready_to_run_remote(bbw->tid, nosignal);
}
return 1;
}

int butex_wake_all(void* arg, bool nosched) {
int butex_wake_all(void* arg, bool nosched, bool nosignal) {
Butex* b = container_of(static_cast<butil::atomic<int>*>(arg), Butex, value);

ButexWaiterList bthread_waiters;
Expand All @@ -313,8 +328,7 @@ int butex_wake_all(void* arg, bool nosched) {

int nwakeup = 0;
while (!pthread_waiters.empty()) {
ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>(
pthread_waiters.head()->value());
ButexPthreadWaiter* bw = static_cast<ButexPthreadWaiter*>(pthread_waiters.head()->value());
bw->RemoveFromList();
wakeup_pthread(bw);
++nwakeup;
Expand All @@ -323,33 +337,27 @@ int butex_wake_all(void* arg, bool nosched) {
return nwakeup;
}
// We will exchange with first waiter in the end.
ButexBthreadWaiter* next =
static_cast<ButexBthreadWaiter*>(bthread_waiters.head()->value());
if (!nosched) {
next->RemoveFromList();
unsleep_if_necessary(next, get_global_timer_thread());
++nwakeup;
}
TaskGroup* g = get_task_group(next->control);
ButexBthreadWaiter* next = static_cast<ButexBthreadWaiter*>(bthread_waiters.head()->value());
next->RemoveFromList();
unsleep_if_necessary(next, get_global_timer_thread());
++nwakeup;
TaskGroup* g = get_task_group(next->control, nosignal);
const int saved_nwakeup = nwakeup;
while (!bthread_waiters.empty()) {
// pop reversely
ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(
bthread_waiters.tail()->value());
ButexBthreadWaiter* w = static_cast<ButexBthreadWaiter*>(bthread_waiters.tail()->value());
w->RemoveFromList();
unsleep_if_necessary(w, get_global_timer_thread());
g->ready_to_run_general(w->tid, true);
++nwakeup;
}
if (saved_nwakeup != nwakeup) {
if (!nosignal && saved_nwakeup != nwakeup) {
g->flush_nosignal_tasks_general();
}
if (!nosched) {
if (g == tls_task_group) {
TaskGroup::exchange(&g, next->tid);
} else {
g->ready_to_run_remote(next->tid);
}
if (g == tls_task_group) {
run_in_this_task_group(g, next->tid, nosched, nosignal);
} else {
g->ready_to_run_remote(next->tid, nosignal);
}
return nwakeup;
}
Expand Down
4 changes: 2 additions & 2 deletions src/bthread/butex.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ void butex_destroy(void* butex);

// Wake up at most 1 thread waiting on |butex|.
// Returns # of threads woken up.
int butex_wake(void* butex, bool nosched = false);
int butex_wake(void* butex, bool nosched = false, bool nosignal = false);

// Wake up all threads waiting on |butex|.
// Returns # of threads woken up.
int butex_wake_all(void* butex, bool nosched = false);
int butex_wake_all(void* butex, bool nosched = false, bool nosignal = false);

// Wake up all threads waiting on |butex| except a bthread whose identifier
// is |excluded_bthread|. This function does not yield.
Expand Down
4 changes: 2 additions & 2 deletions src/bthread/countdown_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ CountdownEvent::~CountdownEvent() {
butex_destroy(_butex);
}

void CountdownEvent::signal(int sig) {
void CountdownEvent::signal(int sig, bool nosched, bool nosignal) {
// Have to save _butex, *this is probably defreferenced by the wait thread
// which sees fetch_sub
void* const saved_butex = _butex;
Expand All @@ -50,7 +50,7 @@ void CountdownEvent::signal(int sig) {
return;
}
LOG_IF(ERROR, prev < sig) << "Counter is over decreased";
butex_wake_all(saved_butex, true);
butex_wake_all(saved_butex, nosched, nosignal);
}

int CountdownEvent::wait() {
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/countdown_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class CountdownEvent {
void reset(int v = 1);

// Decrease the counter by |sig|
void signal(int sig = 1);
void signal(int sig = 1, bool nosched = false, bool nosignal = false);

// Block current thread until the counter reaches 0.
// Returns 0 on success, error code otherwise.
Expand Down
2 changes: 1 addition & 1 deletion src/bthread/task_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class TaskGroup {
// then being popped by sched(pg), which is not necessary.
static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
static void sched_to(TaskGroup** pg, bthread_t next_tid);
static void exchange(TaskGroup** pg, bthread_t next_tid);
static void exchange(TaskGroup** pg, bthread_t next_tid, bool nosignal = false);

// The callback will be run in the beginning of next-run bthread.
// Can't be called by current bthread directly because it often needs
Expand Down
6 changes: 3 additions & 3 deletions src/bthread/task_group_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ inline TaskMeta* TaskGroup::address_meta(bthread_t tid) {
return address_resource(get_slot(tid));
}

inline void TaskGroup::exchange(TaskGroup** pg, bthread_t next_tid) {
inline void TaskGroup::exchange(TaskGroup** pg, bthread_t next_tid, bool nosignal) {
TaskGroup* g = *pg;
if (g->is_current_pthread_task()) {
return g->ready_to_run(next_tid);
return g->ready_to_run(next_tid, nosignal);
}
ReadyToRunArgs args = { g->current_tid(), false };
ReadyToRunArgs args = {g->current_tid(), nosignal};
g->set_remained((g->current_task()->about_to_quit
? ready_to_run_in_worker_ignoresignal
: ready_to_run_in_worker),
Expand Down

0 comments on commit 972e867

Please sign in to comment.