From 3f23533f0c5436570d9bea61a49d6592c85cdc56 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Fri, 6 Oct 2023 09:32:59 -0400 Subject: [PATCH] add an ability to suspend/resume a thread in a GC-safe way (#51489) This exposes the GC "stop the world" API to the user, for causing a thread to quickly stop executing Julia code. This adds two APIs (that will need to be exported and documented later): ``` julia> @ccall jl_safepoint_suspend_thread(#=tid=#1::Cint, #=magicnumber=#2::Cint)::Cint # roughly tkill(1, SIGSTOP) julia> @ccall jl_safepoint_resume_thread(#=tid=#1::Cint)::Cint # roughly tkill(1, SIGCONT) ``` You can even suspend yourself, if there is another task to resume you 10 seconds later: ``` julia> ccall(:jl_enter_threaded_region, Cvoid, ()) julia> t = @task let; Libc.systemsleep(10); print("\nhello from $(Threads.threadid())\n"); @ccall jl_safepoint_resume_thread(0::Cint)::Cint; end; ccall(:jl_set_task_tid, Cint, (Any, Cint), t, 1); schedule(t); julia> @time @ccall jl_safepoint_suspend_thread(0::Cint, 2::Cint)::Cint hello from 2 10 seconds (6 allocations: 264 bytes) 1 ``` The meaning of the magic number is actually the kind of stop that you want: ``` // n.b. suspended threads may still run in the GC or GC safe regions // but shouldn't be observable, depending on which enum the user picks (only 1 and 2 are typically recommended here) // waitstate = 0 : do not wait for suspend to finish // waitstate = 1 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE) // waitstate = 2 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE) and that GC is not running on that thread // waitstate = 3 : wait for full suspend (gc_state == JL_GC_STATE_WAITING) -- this may never happen if thread is sleeping currently // if another thread comes along and calls jl_safepoint_resume, we also return early // return new suspend count on success, 0 on failure ``` Only magic number 2 is currently meaningful to the user though. The difference between waitstate 1 and 2 is only relevant in C code which is calling this from JL_GC_STATE_SAFE, since otherwise it is a priori known that GC isn't running, else we too would be running the GC. But the distinction of those states might be useful if we have a concurrent collector. Very important warning: if the stopped thread is holding any locks (e.g. for codegen or types) that you then attempt to acquire, your thread will deadlock. This is very likely, unless you are very careful. A future update to this API may try to change the waitstate to give the option to wait for the thread to release internal or known locks. --- src/gc.c | 2 + src/jl_exported_funcs.inc | 2 + src/julia.h | 2 + src/julia_internal.h | 13 ++-- src/julia_threads.h | 29 ++++---- src/rtutils.c | 11 ++-- src/safepoint.c | 135 +++++++++++++++++++++++++++++++++++--- src/signals-mach.c | 97 ++++++++++++++++++--------- src/signals-unix.c | 6 ++ src/threading.c | 6 +- 10 files changed, 231 insertions(+), 72 deletions(-) diff --git a/src/gc.c b/src/gc.c index 1b5247b208429..42a9daa01a747 100644 --- a/src/gc.c +++ b/src/gc.c @@ -3527,6 +3527,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) if (!jl_safepoint_start_gc()) { // either another thread is running GC, or the GC got disabled just now. jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING); + jl_safepoint_wait_thread_resume(); // block in thread-suspend now if requested, after clearing the gc_state return; } @@ -3580,6 +3581,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection) jl_safepoint_end_gc(); jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING); JL_PROBE_GC_END(); + jl_safepoint_wait_thread_resume(); // block in thread-suspend now if requested, after clearing the gc_state // Only disable finalizers on current thread // Doing this on all threads is racy (it's impossible to check diff --git a/src/jl_exported_funcs.inc b/src/jl_exported_funcs.inc index cbcdf516fb4fd..c503eeb4407b3 100644 --- a/src/jl_exported_funcs.inc +++ b/src/jl_exported_funcs.inc @@ -416,6 +416,8 @@ XX(jl_rethrow_other) \ XX(jl_running_on_valgrind) \ XX(jl_safe_printf) \ + XX(jl_safepoint_suspend_thread) \ + XX(jl_safepoint_resume_thread) \ XX(jl_SC_CLK_TCK) \ XX(jl_set_ARGS) \ XX(jl_set_const) \ diff --git a/src/julia.h b/src/julia.h index a357bdf558360..4a05cd53b50e2 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1054,6 +1054,8 @@ JL_DLLEXPORT void *jl_gc_managed_malloc(size_t sz); JL_DLLEXPORT void *jl_gc_managed_realloc(void *d, size_t sz, size_t oldsz, int isaligned, jl_value_t *owner); JL_DLLEXPORT void jl_gc_safepoint(void); +JL_DLLEXPORT int jl_safepoint_suspend_thread(int tid, int waitstate); +JL_DLLEXPORT int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT; void *mtarraylist_get(small_arraylist_t *_a, size_t idx) JL_NOTSAFEPOINT; size_t mtarraylist_length(small_arraylist_t *_a) JL_NOTSAFEPOINT; diff --git a/src/julia_internal.h b/src/julia_internal.h index 9dff8e75cb2f5..7883844d908f8 100644 --- a/src/julia_internal.h +++ b/src/julia_internal.h @@ -892,7 +892,7 @@ extern char *jl_safepoint_pages; STATIC_INLINE int jl_addr_is_safepoint(uintptr_t addr) { uintptr_t safepoint_addr = (uintptr_t)jl_safepoint_pages; - return addr >= safepoint_addr && addr < safepoint_addr + jl_page_size * 3; + return addr >= safepoint_addr && addr < safepoint_addr + jl_page_size * 4; } extern _Atomic(uint32_t) jl_gc_running; extern _Atomic(uint32_t) jl_gc_disable_counter; @@ -918,7 +918,8 @@ void jl_safepoint_end_gc(void); // Wait for the GC to finish // This function does **NOT** modify the `gc_state` to inform the GC thread // The caller should set it **BEFORE** calling this function. -void jl_safepoint_wait_gc(void); +void jl_safepoint_wait_gc(void) JL_NOTSAFEPOINT; +void jl_safepoint_wait_thread_resume(void) JL_NOTSAFEPOINT; // Set pending sigint and enable the mechanisms to deliver the sigint. void jl_safepoint_enable_sigint(void); @@ -946,8 +947,7 @@ JL_DLLEXPORT void jl_pgcstack_getkey(jl_get_pgcstack_func **f, jl_pgcstack_key_t extern pthread_mutex_t in_signal_lock; #endif -#if !defined(__clang_gcanalyzer__) && !defined(_OS_DARWIN_) -static inline void jl_set_gc_and_wait(void) +static inline void jl_set_gc_and_wait(void) // n.b. not used on _OS_DARWIN_ { jl_task_t *ct = jl_current_task; // reading own gc state doesn't need atomic ops since no one else @@ -956,8 +956,8 @@ static inline void jl_set_gc_and_wait(void) jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_WAITING); jl_safepoint_wait_gc(); jl_atomic_store_release(&ct->ptls->gc_state, state); + jl_safepoint_wait_thread_resume(); // block in thread-suspend now if requested, after clearing the gc_state } -#endif // Query if a Julia object is if a permalloc region (due to part of a sys- pkg-image) STATIC_INLINE size_t n_linkage_blobs(void) JL_NOTSAFEPOINT @@ -1397,7 +1397,8 @@ extern jl_mutex_t typecache_lock; extern JL_DLLEXPORT jl_mutex_t jl_codegen_lock; #if defined(__APPLE__) -void jl_mach_gc_end(void); +void jl_mach_gc_end(void) JL_NOTSAFEPOINT; +void jl_safepoint_resume_thread_mach(jl_ptls_t ptls2, int16_t tid2) JL_NOTSAFEPOINT; #endif // -- smallintset.c -- // diff --git a/src/julia_threads.h b/src/julia_threads.h index 7510eae308d27..b8276682ee359 100644 --- a/src/julia_threads.h +++ b/src/julia_threads.h @@ -211,7 +211,7 @@ typedef struct _jl_tls_states_t { int16_t tid; int8_t threadpoolid; uint64_t rngseed; - volatile size_t *safepoint; + _Atomic(volatile size_t *) safepoint; // may be changed to the suspend page by any thread _Atomic(int8_t) sleep_check_state; // read/write from foreign threads // Whether it is safe to execute GC at the same time. #define JL_GC_STATE_WAITING 1 @@ -225,9 +225,9 @@ typedef struct _jl_tls_states_t { // statements is prohibited from certain // callbacks (such as generated functions) // as it may make compilation undecidable - int8_t in_pure_callback; - int8_t in_finalizer; - int8_t disable_gc; + int16_t in_pure_callback; + int16_t in_finalizer; + int16_t disable_gc; // Counter to disable finalizer **on the current thread** int finalizers_inhibited; jl_thread_heap_t heap; // this is very large, and the offset is baked into codegen @@ -264,6 +264,7 @@ typedef struct _jl_tls_states_t { void *signal_stack; #endif jl_thread_t system_id; + _Atomic(int16_t) suspend_count; arraylist_t finalizers; jl_gc_page_stack_t page_metadata_allocd; jl_gc_page_stack_t page_metadata_buffered; @@ -333,17 +334,17 @@ void jl_sigint_safepoint(jl_ptls_t tls); // This triggers a SegFault when we are in GC // Assign it to a variable to make sure the compiler emit the load // and to avoid Clang warning for -Wunused-volatile-lvalue -#define jl_gc_safepoint_(ptls) do { \ - jl_signal_fence(); \ - size_t safepoint_load = *ptls->safepoint; \ - jl_signal_fence(); \ - (void)safepoint_load; \ +#define jl_gc_safepoint_(ptls) do { \ + jl_signal_fence(); \ + size_t safepoint_load = jl_atomic_load_relaxed(&ptls->safepoint)[0]; \ + jl_signal_fence(); \ + (void)safepoint_load; \ } while (0) -#define jl_sigint_safepoint(ptls) do { \ - jl_signal_fence(); \ - size_t safepoint_load = ptls->safepoint[-1]; \ - jl_signal_fence(); \ - (void)safepoint_load; \ +#define jl_sigint_safepoint(ptls) do { \ + jl_signal_fence(); \ + size_t safepoint_load = jl_atomic_load_relaxed(&ptls->safepoint)[-1]; \ + jl_signal_fence(); \ + (void)safepoint_load; \ } while (0) #endif STATIC_INLINE int8_t jl_gc_state_set(jl_ptls_t ptls, int8_t state, diff --git a/src/rtutils.c b/src/rtutils.c index 35ab89d856783..afe8d24678a61 100644 --- a/src/rtutils.c +++ b/src/rtutils.c @@ -275,15 +275,12 @@ JL_DLLEXPORT void jl_eh_restore_state(jl_handler_t *eh) } ct->world_age = eh->world_age; ct->ptls->defer_signal = eh->defer_signal; - if (old_gc_state != eh->gc_state) { + if (old_gc_state != eh->gc_state) jl_atomic_store_release(&ct->ptls->gc_state, eh->gc_state); - if (old_gc_state) { - jl_gc_safepoint_(ct->ptls); - } - } - if (old_defer_signal && !eh->defer_signal) { + if (!eh->gc_state) + jl_gc_safepoint_(ct->ptls); + if (old_defer_signal && !eh->defer_signal) jl_sigint_safepoint(ct->ptls); - } if (jl_atomic_load_relaxed(&jl_gc_have_pending_finalizers) && unlocks && eh->locks_len == 0) { jl_gc_run_pending_finalizers(ct); diff --git a/src/safepoint.c b/src/safepoint.c index c6f9a42059d1a..5a845496f36c6 100644 --- a/src/safepoint.c +++ b/src/safepoint.c @@ -30,7 +30,8 @@ char *jl_safepoint_pages = NULL; // so that both safepoint load and pending signal load falls in this page. // The initialization of the `safepoint` pointer is done `ti_initthread` // in `threading.c`. -uint8_t jl_safepoint_enable_cnt[3] = {0, 0, 0}; +// The fourth page is the count of suspended threads +uint16_t jl_safepoint_enable_cnt[4] = {0, 0, 0, 0}; // This lock should be acquired before enabling/disabling the safepoint // or accessing one of the following variables: @@ -48,12 +49,12 @@ uv_cond_t safepoint_cond; static void jl_safepoint_enable(int idx) JL_NOTSAFEPOINT { // safepoint_lock should be held - assert(0 <= idx && idx < 3); + assert(0 <= idx && idx <= 3); if (jl_safepoint_enable_cnt[idx]++ != 0) { // We expect this to be enabled at most twice // one for the GC, one for SIGINT. // Update this if this is not the case anymore in the future. - assert(jl_safepoint_enable_cnt[idx] <= 2); + assert(jl_safepoint_enable_cnt[idx] <= (idx == 3 ? INT16_MAX : 2)); return; } // Now that we are requested to mprotect the page and it wasn't already. @@ -62,14 +63,15 @@ static void jl_safepoint_enable(int idx) JL_NOTSAFEPOINT DWORD old_prot; VirtualProtect(pageaddr, jl_page_size, PAGE_NOACCESS, &old_prot); #else - mprotect(pageaddr, jl_page_size, PROT_NONE); + int r = mprotect(pageaddr, jl_page_size, PROT_NONE); + (void)r; //if (r) perror("mprotect"); #endif } static void jl_safepoint_disable(int idx) JL_NOTSAFEPOINT { // safepoint_lock should be held - assert(0 <= idx && idx < 3); + assert(0 <= idx && idx <= 3); if (--jl_safepoint_enable_cnt[idx] != 0) { assert(jl_safepoint_enable_cnt[idx] > 0); return; @@ -81,7 +83,8 @@ static void jl_safepoint_disable(int idx) JL_NOTSAFEPOINT DWORD old_prot; VirtualProtect(pageaddr, jl_page_size, PAGE_READONLY, &old_prot); #else - mprotect(pageaddr, jl_page_size, PROT_READ); + int r = mprotect(pageaddr, jl_page_size, PROT_READ); + (void)r; //if (r) perror("mprotect"); #endif } @@ -92,9 +95,9 @@ void jl_safepoint_init(void) // jl_page_size isn't available yet. size_t pgsz = jl_getpagesize(); #ifdef _OS_WINDOWS_ - char *addr = (char*)VirtualAlloc(NULL, pgsz * 3, MEM_COMMIT, PAGE_READONLY); + char *addr = (char*)VirtualAlloc(NULL, pgsz * 4, MEM_COMMIT, PAGE_READONLY); #else - char *addr = (char*)mmap(0, pgsz * 3, PROT_READ, + char *addr = (char*)mmap(0, pgsz * 4, PROT_READ, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); if (addr == MAP_FAILED) addr = NULL; @@ -104,6 +107,18 @@ void jl_safepoint_init(void) jl_gc_debug_critical_error(); abort(); } +// // If we able to skip past the faulting safepoint instruction conditionally, +// // then we can make this safepoint page unconditional. But otherwise we +// // only enable this page when required, though it gives us less +// // fine-grained control over individual resume. +// char *pageaddr = addr + pgsz * 3; +//#ifdef _OS_WINDOWS_ +// DWORD old_prot; +// VirtualProtect(pageaddr, pgsz, PAGE_NOACCESS, &old_prot); +//#else +// int r = mprotect(pageaddr, pgsz, PROT_NONE); +// (void)r; //if (r) perror("mprotect"); +//#endif // The signal page is for the gc safepoint. // The page before it is the sigint pending flag. jl_safepoint_pages = addr; @@ -113,6 +128,7 @@ int jl_safepoint_start_gc(void) { // The thread should have set this already assert(jl_atomic_load_relaxed(&jl_current_task->ptls->gc_state) == JL_GC_STATE_WAITING); + jl_safepoint_wait_thread_resume(); // make sure we are permitted to run GC now (we might be required to stop instead) uv_mutex_lock(&safepoint_lock); // In case multiple threads enter the GC at the same time, only allow // one of them to actually run the collection. We can't just let the @@ -148,7 +164,7 @@ void jl_safepoint_end_gc(void) jl_safepoint_disable(2); jl_safepoint_disable(1); jl_atomic_store_release(&jl_gc_running, 0); -# ifdef __APPLE__ +# ifdef _OS_DARWIN_ // This wakes up other threads on mac. jl_mach_gc_end(); # endif @@ -156,7 +172,8 @@ void jl_safepoint_end_gc(void) uv_cond_broadcast(&safepoint_cond); } -void jl_safepoint_wait_gc(void) +// this is the core of jl_set_gc_and_wait +void jl_safepoint_wait_gc(void) JL_NOTSAFEPOINT { jl_task_t *ct = jl_current_task; (void)ct; JL_TIMING_SUSPEND_TASK(GC_SAFEPOINT, ct); @@ -175,6 +192,104 @@ void jl_safepoint_wait_gc(void) } } +// equivalent to jl_set_gc_and_wait, but waiting on resume-thread lock instead +void jl_safepoint_wait_thread_resume(void) +{ + jl_task_t *ct = jl_current_task; + // n.b. we do not permit a fast-path here that skips the lock acquire since + // we otherwise have no synchronization point to ensure that this thread + // will observe the change to the safepoint, even though the other thread + // might have already observed our gc_state. + // if (!jl_atomic_load_relaxed(&ct->ptls->suspend_count)) return; + JL_TIMING_SUSPEND_TASK(USER, ct); + int8_t state = jl_atomic_load_relaxed(&ct->ptls->gc_state); + jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_WAITING); + uv_mutex_lock(&ct->ptls->sleep_lock); + while (jl_atomic_load_relaxed(&ct->ptls->suspend_count)) + uv_cond_wait(&ct->ptls->wake_signal, &ct->ptls->sleep_lock); + // must while still holding the mutex_unlock, so we know other threads in + // jl_safepoint_suspend_thread will observe this thread in the correct GC + // state, and not still stuck in JL_GC_STATE_WAITING + jl_atomic_store_release(&ct->ptls->gc_state, state); + uv_mutex_unlock(&ct->ptls->sleep_lock); +} + +// n.b. suspended threads may still run in the GC or GC safe regions +// but shouldn't be observable, depending on which enum the user picks (only 1 and 2 are typically recommended here) +// waitstate = 0 : do not wait for suspend to finish +// waitstate = 1 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE) +// waitstate = 2 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE) and that GC is not running on that thread +// waitstate = 3 : wait for full suspend (gc_state == JL_GC_STATE_WAITING) -- this may never happen if thread is sleeping currently +// if another thread comes along and calls jl_safepoint_resume, we also return early +// return new suspend count on success, 0 on failure +int jl_safepoint_suspend_thread(int tid, int waitstate) +{ + if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads)) + return 0; + jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; + uv_mutex_lock(&ptls2->sleep_lock); + int16_t suspend_count = jl_atomic_load_relaxed(&ptls2->suspend_count) + 1; + jl_atomic_store_relaxed(&ptls2->suspend_count, suspend_count); + if (suspend_count == 1) { // first to suspend + jl_safepoint_enable(3); + jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 3 + sizeof(void*))); + } + uv_mutex_unlock(&ptls2->sleep_lock); + if (waitstate) { + // wait for suspend (or another thread to call resume) + if (waitstate >= 2) { + // We currently cannot distinguish if a thread is helping run GC or + // not, so assume it is running GC and wait for GC to finish first. + // It will be unable to reenter helping with GC because we have + // changed its safepoint page. + jl_set_gc_and_wait(); + } + while (jl_atomic_load_acquire(&ptls2->suspend_count) != 0) { + int8_t state2 = jl_atomic_load_acquire(&ptls2->gc_state); + if (waitstate <= 2 && state2 != 0) + break; + if (waitstate == 3 && state2 == JL_GC_STATE_WAITING) + break; + jl_cpu_pause(); // yield? + } + } + return suspend_count; +} + +// return old suspend count on success, 0 on failure +// n.b. threads often do not resume until after all suspended threads have been resumed! +int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT +{ + if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads)) + return 0; + jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; +# ifdef _OS_DARWIN_ + uv_mutex_lock(&safepoint_lock); +# endif + uv_mutex_lock(&ptls2->sleep_lock); + int16_t suspend_count = jl_atomic_load_relaxed(&ptls2->suspend_count); + if (suspend_count == 1) { // last to unsuspend + if (tid == 0) + jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size)); + else + jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 2 + sizeof(void*))); + uv_cond_signal(&ptls2->wake_signal); +#ifdef _OS_DARWIN_ + jl_safepoint_resume_thread_mach(ptls2, tid); +#endif + } + if (suspend_count != 0) { + jl_atomic_store_relaxed(&ptls2->suspend_count, suspend_count - 1); + if (suspend_count == 1) + jl_safepoint_disable(3); + } + uv_mutex_unlock(&ptls2->sleep_lock); +# ifdef _OS_DARWIN_ + uv_mutex_unlock(&safepoint_lock); +# endif + return suspend_count; +} + void jl_safepoint_enable_sigint(void) { uv_mutex_lock(&safepoint_lock); diff --git a/src/signals-mach.c b/src/signals-mach.c index 6ec8f95570f17..ebc54d35a5b46 100644 --- a/src/signals-mach.c +++ b/src/signals-mach.c @@ -45,50 +45,84 @@ static void attach_exception_port(thread_port_t thread, int segv_only); // low 16 bits are the thread id, the next 8 bits are the original gc_state static arraylist_t suspended_threads; extern uv_mutex_t safepoint_lock; -extern uv_cond_t safepoint_cond; -void jl_mach_gc_end(void) + +// see jl_safepoint_wait_thread_resume +void jl_safepoint_resume_thread_mach(jl_ptls_t ptls2, int16_t tid2) { - // Requires the safepoint lock to be held + // must be called with uv_mutex_lock(&safepoint_lock) and uv_mutex_lock(&ptls2->sleep_lock) held (in that order) for (size_t i = 0; i < suspended_threads.len; i++) { uintptr_t item = (uintptr_t)suspended_threads.items[i]; int16_t tid = (int16_t)item; int8_t gc_state = (int8_t)(item >> 8); - jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; + if (tid != tid2) + continue; jl_atomic_store_release(&ptls2->gc_state, gc_state); thread_resume(pthread_mach_thread_np(ptls2->system_id)); + suspended_threads.items[i] = suspended_threads.items[--suspended_threads.len]; + break; + } + // thread hadn't actually reached a jl_mach_gc_wait call where we suspended it +} + +void jl_mach_gc_end(void) +{ + // must be called with uv_mutex_lock(&safepoint_lock) held + size_t j = 0; + for (size_t i = 0; i < suspended_threads.len; i++) { + uintptr_t item = (uintptr_t)suspended_threads.items[i]; + int16_t tid = (int16_t)item; + int8_t gc_state = (int8_t)(item >> 8); + jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid]; + uv_mutex_lock(&ptls2->sleep_lock); + if (jl_atomic_load_relaxed(&ptls2->suspend_count) == 0) { + jl_atomic_store_release(&ptls2->gc_state, gc_state); + thread_resume(pthread_mach_thread_np(ptls2->system_id)); + } + else { + // this is the check for jl_safepoint_wait_thread_resume + suspended_threads.items[j++] = (void*)item; + } + uv_mutex_unlock(&ptls2->sleep_lock); } - suspended_threads.len = 0; + suspended_threads.len = j; } -// Suspend the thread and return `1` if the GC is running. -// Otherwise return `0` -static int jl_mach_gc_wait(jl_ptls_t ptls2, - mach_port_t thread, int16_t tid) +// implement jl_set_gc_and_wait from a different thread +static void jl_mach_gc_wait(jl_ptls_t ptls2, mach_port_t thread, int16_t tid) { + // relaxed, since we don't mind missing one--we will hit another soon (immediately probably) uv_mutex_lock(&safepoint_lock); - if (!jl_atomic_load_relaxed(&jl_gc_running)) { - // relaxed, since gets set to zero only while the safepoint_lock was held - // this means we can tell if GC is done before we got the message or - // the safepoint was enabled for SIGINT. - uv_mutex_unlock(&safepoint_lock); - return 0; + // Since this gets set to zero only while the safepoint_lock was held this + // means we can tell for sure if GC is done before we got the message or + // the safepoint was enabled for SIGINT instead. + int doing_gc = jl_atomic_load_relaxed(&jl_gc_running); + int do_suspend = doing_gc; + int relaxed_suspend_count = !doing_gc && jl_atomic_load_relaxed(&ptls2->suspend_count) != 0; + if (relaxed_suspend_count) { + uv_mutex_lock(&ptls2->sleep_lock); + do_suspend = jl_atomic_load_relaxed(&ptls2->suspend_count) != 0; + // only do_suspend while holding the sleep_lock, otherwise we might miss a resume + } + if (do_suspend) { + // Set the gc state of the thread, suspend and record it + // + // TODO: TSAN will complain that it never saw the faulting task do an + // atomic release (it was in the kernel). And our attempt here does + // nothing, since we are a different thread, and it is not transitive). + // + // This also means we are not making this thread available for GC work. + // Eventually, we should probably release this signal to the original + // thread, (return KERN_FAILURE instead of KERN_SUCCESS) so that it + // triggers a SIGSEGV and gets handled by the usual codepath for unix. + int8_t gc_state = ptls2->gc_state; + jl_atomic_store_release(&ptls2->gc_state, JL_GC_STATE_WAITING); + uintptr_t item = tid | (((uintptr_t)gc_state) << 16); + arraylist_push(&suspended_threads, (void*)item); + thread_suspend(thread); } - // Otherwise, set the gc state of the thread, suspend and record it - // TODO: TSAN will complain that it never saw the faulting task do an - // atomic release (it was in the kernel). And our attempt here does - // nothing, since we are a different thread, and it is not transitive). - // - // This also means we are not making this thread available for GC work. - // Eventually, we should probably release this signal to the original - // thread, (return KERN_FAILURE instead of KERN_SUCCESS) so that it - // triggers a SIGSEGV and gets handled by the usual codepath for unix. - int8_t gc_state = ptls2->gc_state; - jl_atomic_store_release(&ptls2->gc_state, JL_GC_STATE_WAITING); - uintptr_t item = tid | (((uintptr_t)gc_state) << 16); - arraylist_push(&suspended_threads, (void*)item); - thread_suspend(thread); + if (relaxed_suspend_count) + uv_mutex_unlock(&ptls2->sleep_lock); uv_mutex_unlock(&safepoint_lock); - return 1; } static mach_port_t segv_port = 0; @@ -314,8 +348,7 @@ kern_return_t catch_mach_exception_raise( kern_return_t ret = thread_get_state(thread, HOST_EXCEPTION_STATE, (thread_state_t)&exc_state, &exc_count); HANDLE_MACH_ERROR("thread_get_state", ret); if (jl_addr_is_safepoint(fault_addr) && !is_write_fault(exc_state)) { - if (jl_mach_gc_wait(ptls2, thread, tid)) - return KERN_SUCCESS; + jl_mach_gc_wait(ptls2, thread, tid); if (ptls2->tid != 0) return KERN_SUCCESS; if (ptls2->defer_signal) { diff --git a/src/signals-unix.c b/src/signals-unix.c index 0d5ad9b1be7c5..07d6d0bb72cc1 100644 --- a/src/signals-unix.c +++ b/src/signals-unix.c @@ -364,6 +364,12 @@ JL_NO_ASAN static void segv_handler(int sig, siginfo_t *info, void *context) // Do not raise sigint on worker thread if (jl_atomic_load_relaxed(&ct->tid) != 0) return; + // n.b. if the user might have seen that we were in a state where it + // was safe to run GC concurrently, we might briefly enter a state + // where our execution is not consistent with the gc_state of this + // thread. That will quickly be rectified when we rerun the faulting + // instruction and end up right back here, or we start to run the + // exception handler and immediately hit the safepoint there. if (ct->ptls->defer_signal) { jl_safepoint_defer_sigint(); } diff --git a/src/threading.c b/src/threading.c index 2ec9c220fbafb..319a2918fab3f 100644 --- a/src/threading.c +++ b/src/threading.c @@ -367,11 +367,11 @@ jl_ptls_t jl_init_threadtls(int16_t tid) // Conditionally initialize the safepoint address. See comment in // `safepoint.c` if (tid == 0) { - ptls->safepoint = (size_t*)(jl_safepoint_pages + jl_page_size); + jl_atomic_store_relaxed(&ptls->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size)); } else { - ptls->safepoint = (size_t*)(jl_safepoint_pages + jl_page_size * 2 + - sizeof(size_t)); + jl_atomic_store_relaxed(&ptls->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 2 + + sizeof(size_t))); } jl_bt_element_t *bt_data = (jl_bt_element_t*) malloc_s(sizeof(jl_bt_element_t) * (JL_MAX_BT_SIZE + 1));