Skip to content

Commit

Permalink
* thread.c, vm_core.h: make gvl_acquire/release/init/destruct
Browse files Browse the repository at this point in the history
  APIs to modularize GVL implementation.
* thread_pthread.c, thread_pthread.h: Two GVL implementations.
  (1) Simple locking GVL which is same as existing GVL.
  (2) Wake-up queued threads.  The wake-up order is simple FIFO.
  (We can make several queues to support exact priorities, however
  this causes some issues such as priority inversion and so on.)
  This impl. prevents spin-loop (*1) caused on SMP environemnts.
  *1: Only one Ruby thread acqures GVL again and again.
  Bug ruby#2359 [ruby-core:26694]
* thread_win32.c, thread_win32.h: Using simple lock
  not by CRITICAL_SECTION but by Mutex.
  Bug ruby#3890 [ruby-dev:42315]
* vm.c (ruby_vm_destruct): ditto.



git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@29956 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
  • Loading branch information
ko1 committed Nov 27, 2010
1 parent ac0f5a5 commit 450463d
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 82 deletions.
20 changes: 20 additions & 0 deletions ChangeLog
@@ -1,3 +1,23 @@
Sun Nov 28 12:23:57 2010 Koichi Sasada <ko1@atdot.net>

* thread.c, vm_core.h: make gvl_acquire/release/init/destruct
APIs to modularize GVL implementation.

* thread_pthread.c, thread_pthread.h: Two GVL implementations.
(1) Simple locking GVL which is same as existing GVL.
(2) Wake-up queued threads. The wake-up order is simple FIFO.
(We can make several queues to support exact priorities, however
this causes some issues such as priority inversion and so on.)
This impl. prevents spin-loop (*1) caused on SMP environemnts.
*1: Only one Ruby thread acqures GVL again and again.
Bug #2359 [ruby-core:26694]

* thread_win32.c, thread_win32.h: Using simple lock
not by CRITICAL_SECTION but by Mutex.
Bug #3890 [ruby-dev:42315]

* vm.c (ruby_vm_destruct): ditto.

Sun Nov 28 04:40:00 2010 Luis Lavena <luislavena@gmail.com>

* io.c (io_fwrite): use rb_w32_write_console under Windows.
Expand Down
35 changes: 21 additions & 14 deletions thread.c
Expand Up @@ -103,10 +103,10 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
#define GVL_UNLOCK_BEGIN() do { \
rb_thread_t *_th_stored = GET_THREAD(); \
RB_GC_SAVE_MACHINE_CONTEXT(_th_stored); \
native_mutex_unlock(&_th_stored->vm->global_vm_lock)
gvl_release(_th_stored->vm);

#define GVL_UNLOCK_END() \
native_mutex_lock(&_th_stored->vm->global_vm_lock); \
gvl_acquire(_th_stored->vm, _th_stored); \
rb_thread_set_current(_th_stored); \
} while(0)

Expand All @@ -125,7 +125,7 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
(th)->status = THREAD_STOPPED; \
thread_debug("enter blocking region (%p)\n", (void *)(th)); \
RB_GC_SAVE_MACHINE_CONTEXT(th); \
native_mutex_unlock(&(th)->vm->global_vm_lock); \
gvl_release((th)->vm); \
} while (0)

#define BLOCKING_REGION(exec, ubf, ubfarg) do { \
Expand Down Expand Up @@ -246,6 +246,13 @@ rb_thread_debug(
}
#endif

void
rb_vm_gvl_destroy(rb_vm_t *vm)
{
gvl_release(vm);
gvl_destroy(vm);
}

void
rb_thread_lock_unlock(rb_thread_lock_t *lock)
{
Expand Down Expand Up @@ -426,7 +433,7 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s
#endif
thread_debug("thread start: %p\n", (void *)th);

native_mutex_lock(&th->vm->global_vm_lock);
gvl_acquire(th->vm, th);
{
thread_debug("thread start (get lock): %p\n", (void *)th);
rb_thread_set_current(th);
Expand Down Expand Up @@ -519,7 +526,7 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s
}
else {
thread_cleanup_func(th);
native_mutex_unlock(&th->vm->global_vm_lock);
gvl_release(th->vm);
}

return 0;
Expand Down Expand Up @@ -1002,11 +1009,11 @@ rb_thread_schedule_rec(int sched_depth)
thread_debug("rb_thread_schedule/switch start\n");

RB_GC_SAVE_MACHINE_CONTEXT(th);
native_mutex_unlock(&th->vm->global_vm_lock);
gvl_release(th->vm);
{
native_thread_yield();
}
native_mutex_lock(&th->vm->global_vm_lock);
gvl_acquire(th->vm, th);

rb_thread_set_current(th);
thread_debug("rb_thread_schedule/switch done\n");
Expand All @@ -1028,7 +1035,7 @@ rb_thread_schedule(void)
static inline void
blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
{
native_mutex_lock(&th->vm->global_vm_lock);
gvl_acquire(th->vm, th);
rb_thread_set_current(th);
thread_debug("leave blocking region (%p)\n", (void *)th);
remove_signal_thread_list(th);
Expand Down Expand Up @@ -2753,7 +2760,7 @@ rb_thread_atfork_internal(int (*atfork)(st_data_t, st_data_t, st_data_t))
VALUE thval = th->self;
vm->main_thread = th;

native_mutex_reinitialize_atfork(&th->vm->global_vm_lock);
gvl_atfork(th->vm);
st_foreach(vm->living_threads, atfork, (st_data_t)th);
st_clear(vm->living_threads);
st_insert(vm->living_threads, thval, (st_data_t)th->thread_id);
Expand Down Expand Up @@ -4297,6 +4304,7 @@ Init_Thread(void)
#define rb_intern(str) rb_intern_const(str)

VALUE cThGroup;
rb_thread_t *th = GET_THREAD();

rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
Expand Down Expand Up @@ -4349,7 +4357,6 @@ Init_Thread(void)
rb_define_method(cThGroup, "add", thgroup_add, 1);

{
rb_thread_t *th = GET_THREAD();
th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup);
rb_define_const(cThGroup, "Default", th->thgroup);
}
Expand All @@ -4376,10 +4383,9 @@ Init_Thread(void)
/* main thread setting */
{
/* acquire global vm lock */
rb_thread_lock_t *lp = &GET_THREAD()->vm->global_vm_lock;
native_mutex_initialize(lp);
native_mutex_lock(lp);
native_mutex_initialize(&GET_THREAD()->interrupt_lock);
gvl_init(th->vm);
gvl_acquire(th->vm, th);
native_mutex_initialize(&th->interrupt_lock);
}
}

Expand Down Expand Up @@ -4502,3 +4508,4 @@ rb_reset_coverages(void)
GET_VM()->coverages = Qfalse;
rb_remove_event_hook(update_coverage);
}

166 changes: 161 additions & 5 deletions thread_pthread.c
Expand Up @@ -29,10 +29,158 @@ static void native_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
static void native_cond_initialize(pthread_cond_t *cond);
static void native_cond_destroy(pthread_cond_t *cond);

static void native_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void));

#define native_mutex_reinitialize_atfork(lock) (\
native_mutex_unlock(lock), \
native_mutex_initialize(lock), \
native_mutex_lock(lock))

#define GVL_SIMPLE_LOCK 0
#define GVL_DEBUG 0

static void
gvl_show_waiting_threads(rb_vm_t *vm)
{
rb_thread_t *th = vm->gvl.waiting_threads;
int i = 0;
while (th) {
fprintf(stderr, "waiting (%d): %p\n", i++, th);
th = th->native_thread_data.gvl_next;
}
}

static void
gvl_waiting_push(rb_vm_t *vm, rb_thread_t *th)
{
th->native_thread_data.gvl_next = 0;

if (vm->gvl.waiting_threads) {
vm->gvl.waiting_last_thread->native_thread_data.gvl_next = th;
vm->gvl.waiting_last_thread = th;
}
else {
vm->gvl.waiting_threads = th;
vm->gvl.waiting_last_thread = th;
}
th = vm->gvl.waiting_threads;
vm->gvl.waiting++;
}

static void
gvl_waiting_shift(rb_vm_t *vm, rb_thread_t *th)
{
vm->gvl.waiting_threads = vm->gvl.waiting_threads->native_thread_data.gvl_next;
vm->gvl.waiting--;
}

static void
gvl_acquire(rb_vm_t *vm, rb_thread_t *th)
{
#if GVL_SIMPLE_LOCK
native_mutex_lock(&vm->gvl.lock);
#else
native_mutex_lock(&vm->gvl.lock);
if (vm->gvl.waiting > 0 || vm->gvl.acquired != 0) {
if (GVL_DEBUG) fprintf(stderr, "gvl acquire (%p): sleep\n", th);
gvl_waiting_push(vm, th);
if (GVL_DEBUG) gvl_show_waiting_threads(vm);

while (vm->gvl.acquired != 0 || vm->gvl.waiting_threads != th) {
native_cond_wait(&th->native_thread_data.gvl_cond, &vm->gvl.lock);
}
gvl_waiting_shift(vm, th);
}
else {
/* do nothing */
}
vm->gvl.acquired = 1;
native_mutex_unlock(&vm->gvl.lock);
#endif
if (GVL_DEBUG) gvl_show_waiting_threads(vm);
if (GVL_DEBUG) fprintf(stderr, "gvl acquire (%p): acquire\n", th);
}

static void
gvl_release(rb_vm_t *vm)
{
#if GVL_SIMPLE_LOCK
native_mutex_unlock(&vm->gvl.lock);
#else
native_mutex_lock(&vm->gvl.lock);
if (vm->gvl.waiting > 0) {
rb_thread_t *th = vm->gvl.waiting_threads;
if (GVL_DEBUG) fprintf(stderr, "gvl release (%p): wakeup: %p\n", GET_THREAD(), th);
native_cond_signal(&th->native_thread_data.gvl_cond);
}
else {
if (GVL_DEBUG) fprintf(stderr, "gvl release (%p): wakeup: %p\n", GET_THREAD(), 0);
/* do nothing */
}
vm->gvl.acquired = 0;
native_mutex_unlock(&vm->gvl.lock);
#endif
}

static void
gvl_atfork(rb_vm_t *vm)
{
#if GVL_SIMPLE_LOCK
native_mutex_reinitialize_atfork(&vm->gvl.lock);
#else
/* do nothing */
#endif
}

static void gvl_init(rb_vm_t *vm);

static void
gvl_atfork_child(void)
{
gvl_init(GET_VM());
}

static void
gvl_init(rb_vm_t *vm)
{
int r;
if (GVL_DEBUG) fprintf(stderr, "gvl init\n");
native_mutex_initialize(&vm->gvl.lock);
native_atfork(0, 0, gvl_atfork_child);

vm->gvl.waiting_threads = 0;
vm->gvl.waiting_last_thread = 0;
vm->gvl.waiting = 0;
vm->gvl.acquired = 0;
}

static void
gvl_destroy(rb_vm_t *vm)
{
if (GVL_DEBUG) fprintf(stderr, "gvl destroy\n");
native_mutex_destroy(&vm->gvl.lock);
}

static void
mutex_debug(const char *msg, pthread_mutex_t *lock)
{
if (0) {
int r;
static pthread_mutex_t dbglock = PTHREAD_MUTEX_INITIALIZER;

if ((r = pthread_mutex_lock(&dbglock)) != 0) {exit(1);}
fprintf(stdout, "%s: %p\n", msg, lock);
if ((r = pthread_mutex_unlock(&dbglock)) != 0) {exit(1);}
}
}

#define NATIVE_MUTEX_LOCK_DEBUG 1

static void
native_mutex_lock(pthread_mutex_t *lock)
{
int r;
mutex_debug("lock", lock);
if ((r = pthread_mutex_lock(lock)) != 0) {
rb_bug_errno("pthread_mutex_lock", r);
}
Expand All @@ -42,6 +190,7 @@ static void
native_mutex_unlock(pthread_mutex_t *lock)
{
int r;
mutex_debug("unlock", lock);
if ((r = pthread_mutex_unlock(lock)) != 0) {
rb_bug_errno("pthread_mutex_unlock", r);
}
Expand All @@ -51,6 +200,7 @@ static inline int
native_mutex_trylock(pthread_mutex_t *lock)
{
int r;
mutex_debug("trylock", lock);
if ((r = pthread_mutex_trylock(lock)) != 0) {
if (r == EBUSY) {
return EBUSY;
Expand All @@ -66,20 +216,17 @@ static void
native_mutex_initialize(pthread_mutex_t *lock)
{
int r = pthread_mutex_init(lock, 0);
mutex_debug("init", lock);
if (r != 0) {
rb_bug_errno("pthread_mutex_init", r);
}
}

#define native_mutex_reinitialize_atfork(lock) (\
native_mutex_unlock(lock), \
native_mutex_initialize(lock), \
native_mutex_lock(lock))

static void
native_mutex_destroy(pthread_mutex_t *lock)
{
int r = pthread_mutex_destroy(lock);
mutex_debug("destroy", lock);
if (r != 0) {
rb_bug_errno("pthread_mutex_destroy", r);
}
Expand Down Expand Up @@ -127,6 +274,14 @@ native_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, struct times
return pthread_cond_timedwait(cond, mutex, ts);
}

static void
native_atfork(void (*prepare)(void), void (*parent)(void), void (*child)(void))
{
int r = pthread_atfork(prepare, parent, child);
if (r != 0) {
rb_bug_errno("native_atfork", r);
}
}

#define native_cleanup_push pthread_cleanup_push
#define native_cleanup_pop pthread_cleanup_pop
Expand Down Expand Up @@ -171,6 +326,7 @@ Init_native_thread(void)
pthread_key_create(&ruby_native_thread_key, NULL);
th->thread_id = pthread_self();
native_cond_initialize(&th->native_thread_data.sleep_cond);
native_cond_initialize(&th->native_thread_data.gvl_cond);
ruby_thread_set_native(th);
native_mutex_initialize(&signal_thread_list_lock);
posix_signal(SIGVTALRM, null_func);
Expand Down
12 changes: 12 additions & 0 deletions thread_pthread.h
Expand Up @@ -22,6 +22,18 @@ typedef pthread_cond_t rb_thread_cond_t;
typedef struct native_thread_data_struct {
void *signal_thread_list;
pthread_cond_t sleep_cond;
pthread_cond_t gvl_cond;
struct rb_thread_struct *gvl_next;
} native_thread_data_t;

#include <semaphore.h>

typedef struct rb_global_vm_lock_struct {
pthread_mutex_t lock;
struct rb_thread_struct * volatile waiting_threads;
struct rb_thread_struct *waiting_last_thread;
int waiting;
int volatile acquired;
} rb_global_vm_lock_t;

#endif /* RUBY_THREAD_PTHREAD_H */

0 comments on commit 450463d

Please sign in to comment.