Skip to content

Commit

Permalink
add an ability to suspend/resume a thread in a GC-safe way (#51489)
Browse files Browse the repository at this point in the history
This exposes the GC "stop the world" API to the user, for causing a
thread to quickly stop executing Julia code. This adds two APIs (that
will need to be exported and documented later):
```
julia> @CCall jl_safepoint_suspend_thread(#=tid=#1::Cint, #=magicnumber=#2::Cint)::Cint # roughly tkill(1, SIGSTOP)

julia> @CCall jl_safepoint_resume_thread(#=tid=#1::Cint)::Cint # roughly tkill(1, SIGCONT)
```

You can even suspend yourself, if there is another task to resume you 10
seconds later:
```
julia> ccall(:jl_enter_threaded_region, Cvoid, ())

julia> t = @task let; Libc.systemsleep(10); print("\nhello from $(Threads.threadid())\n"); @CCall jl_safepoint_resume_thread(0::Cint)::Cint; end; ccall(:jl_set_task_tid, Cint, (Any, Cint), t, 1); schedule(t);

julia> @time @CCall jl_safepoint_suspend_thread(0::Cint, 2::Cint)::Cint

hello from 2
  10 seconds (6 allocations: 264 bytes)
1
```

The meaning of the magic number is actually the kind of stop that you
want:
```
// n.b. suspended threads may still run in the GC or GC safe regions
// but shouldn't be observable, depending on which enum the user picks (only 1 and 2 are typically recommended here)
// waitstate = 0 : do not wait for suspend to finish
// waitstate = 1 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE)
// waitstate = 2 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE) and that GC is not running on that thread
// waitstate = 3 : wait for full suspend (gc_state == JL_GC_STATE_WAITING) -- this may never happen if thread is sleeping currently
// if another thread comes along and calls jl_safepoint_resume, we also return early
// return new suspend count on success, 0 on failure
```
Only magic number 2 is currently meaningful to the user though. The
difference between waitstate 1 and 2 is only relevant in C code which is
calling this from JL_GC_STATE_SAFE, since otherwise it is a priori known
that GC isn't running, else we too would be running the GC. But the
distinction of those states might be useful if we have a concurrent
collector.

Very important warning: if the stopped thread is holding any locks
(e.g. for codegen or types) that you then attempt to acquire, your
thread will deadlock. This is very likely, unless you are very careful.
A future update to this API may try to change the waitstate to give the
option to wait for the thread to release internal or known locks.
  • Loading branch information
vtjnash committed Oct 6, 2023
1 parent 0ab032a commit 3f23533
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 72 deletions.
2 changes: 2 additions & 0 deletions src/gc.c
Expand Up @@ -3527,6 +3527,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
if (!jl_safepoint_start_gc()) {
// either another thread is running GC, or the GC got disabled just now.
jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING);
jl_safepoint_wait_thread_resume(); // block in thread-suspend now if requested, after clearing the gc_state
return;
}

Expand Down Expand Up @@ -3580,6 +3581,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
jl_safepoint_end_gc();
jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING);
JL_PROBE_GC_END();
jl_safepoint_wait_thread_resume(); // block in thread-suspend now if requested, after clearing the gc_state

// Only disable finalizers on current thread
// Doing this on all threads is racy (it's impossible to check
Expand Down
2 changes: 2 additions & 0 deletions src/jl_exported_funcs.inc
Expand Up @@ -416,6 +416,8 @@
XX(jl_rethrow_other) \
XX(jl_running_on_valgrind) \
XX(jl_safe_printf) \
XX(jl_safepoint_suspend_thread) \
XX(jl_safepoint_resume_thread) \
XX(jl_SC_CLK_TCK) \
XX(jl_set_ARGS) \
XX(jl_set_const) \
Expand Down
2 changes: 2 additions & 0 deletions src/julia.h
Expand Up @@ -1054,6 +1054,8 @@ JL_DLLEXPORT void *jl_gc_managed_malloc(size_t sz);
JL_DLLEXPORT void *jl_gc_managed_realloc(void *d, size_t sz, size_t oldsz,
int isaligned, jl_value_t *owner);
JL_DLLEXPORT void jl_gc_safepoint(void);
JL_DLLEXPORT int jl_safepoint_suspend_thread(int tid, int waitstate);
JL_DLLEXPORT int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT;

void *mtarraylist_get(small_arraylist_t *_a, size_t idx) JL_NOTSAFEPOINT;
size_t mtarraylist_length(small_arraylist_t *_a) JL_NOTSAFEPOINT;
Expand Down
13 changes: 7 additions & 6 deletions src/julia_internal.h
Expand Up @@ -892,7 +892,7 @@ extern char *jl_safepoint_pages;
STATIC_INLINE int jl_addr_is_safepoint(uintptr_t addr)
{
uintptr_t safepoint_addr = (uintptr_t)jl_safepoint_pages;
return addr >= safepoint_addr && addr < safepoint_addr + jl_page_size * 3;
return addr >= safepoint_addr && addr < safepoint_addr + jl_page_size * 4;
}
extern _Atomic(uint32_t) jl_gc_running;
extern _Atomic(uint32_t) jl_gc_disable_counter;
Expand All @@ -918,7 +918,8 @@ void jl_safepoint_end_gc(void);
// Wait for the GC to finish
// This function does **NOT** modify the `gc_state` to inform the GC thread
// The caller should set it **BEFORE** calling this function.
void jl_safepoint_wait_gc(void);
void jl_safepoint_wait_gc(void) JL_NOTSAFEPOINT;
void jl_safepoint_wait_thread_resume(void) JL_NOTSAFEPOINT;

// Set pending sigint and enable the mechanisms to deliver the sigint.
void jl_safepoint_enable_sigint(void);
Expand Down Expand Up @@ -946,8 +947,7 @@ JL_DLLEXPORT void jl_pgcstack_getkey(jl_get_pgcstack_func **f, jl_pgcstack_key_t
extern pthread_mutex_t in_signal_lock;
#endif

#if !defined(__clang_gcanalyzer__) && !defined(_OS_DARWIN_)
static inline void jl_set_gc_and_wait(void)
static inline void jl_set_gc_and_wait(void) // n.b. not used on _OS_DARWIN_
{
jl_task_t *ct = jl_current_task;
// reading own gc state doesn't need atomic ops since no one else
Expand All @@ -956,8 +956,8 @@ static inline void jl_set_gc_and_wait(void)
jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_WAITING);
jl_safepoint_wait_gc();
jl_atomic_store_release(&ct->ptls->gc_state, state);
jl_safepoint_wait_thread_resume(); // block in thread-suspend now if requested, after clearing the gc_state
}
#endif

// Query if a Julia object is if a permalloc region (due to part of a sys- pkg-image)
STATIC_INLINE size_t n_linkage_blobs(void) JL_NOTSAFEPOINT
Expand Down Expand Up @@ -1397,7 +1397,8 @@ extern jl_mutex_t typecache_lock;
extern JL_DLLEXPORT jl_mutex_t jl_codegen_lock;

#if defined(__APPLE__)
void jl_mach_gc_end(void);
void jl_mach_gc_end(void) JL_NOTSAFEPOINT;
void jl_safepoint_resume_thread_mach(jl_ptls_t ptls2, int16_t tid2) JL_NOTSAFEPOINT;
#endif

// -- smallintset.c -- //
Expand Down
29 changes: 15 additions & 14 deletions src/julia_threads.h
Expand Up @@ -211,7 +211,7 @@ typedef struct _jl_tls_states_t {
int16_t tid;
int8_t threadpoolid;
uint64_t rngseed;
volatile size_t *safepoint;
_Atomic(volatile size_t *) safepoint; // may be changed to the suspend page by any thread
_Atomic(int8_t) sleep_check_state; // read/write from foreign threads
// Whether it is safe to execute GC at the same time.
#define JL_GC_STATE_WAITING 1
Expand All @@ -225,9 +225,9 @@ typedef struct _jl_tls_states_t {
// statements is prohibited from certain
// callbacks (such as generated functions)
// as it may make compilation undecidable
int8_t in_pure_callback;
int8_t in_finalizer;
int8_t disable_gc;
int16_t in_pure_callback;
int16_t in_finalizer;
int16_t disable_gc;
// Counter to disable finalizer **on the current thread**
int finalizers_inhibited;
jl_thread_heap_t heap; // this is very large, and the offset is baked into codegen
Expand Down Expand Up @@ -264,6 +264,7 @@ typedef struct _jl_tls_states_t {
void *signal_stack;
#endif
jl_thread_t system_id;
_Atomic(int16_t) suspend_count;
arraylist_t finalizers;
jl_gc_page_stack_t page_metadata_allocd;
jl_gc_page_stack_t page_metadata_buffered;
Expand Down Expand Up @@ -333,17 +334,17 @@ void jl_sigint_safepoint(jl_ptls_t tls);
// This triggers a SegFault when we are in GC
// Assign it to a variable to make sure the compiler emit the load
// and to avoid Clang warning for -Wunused-volatile-lvalue
#define jl_gc_safepoint_(ptls) do { \
jl_signal_fence(); \
size_t safepoint_load = *ptls->safepoint; \
jl_signal_fence(); \
(void)safepoint_load; \
#define jl_gc_safepoint_(ptls) do { \
jl_signal_fence(); \
size_t safepoint_load = jl_atomic_load_relaxed(&ptls->safepoint)[0]; \
jl_signal_fence(); \
(void)safepoint_load; \
} while (0)
#define jl_sigint_safepoint(ptls) do { \
jl_signal_fence(); \
size_t safepoint_load = ptls->safepoint[-1]; \
jl_signal_fence(); \
(void)safepoint_load; \
#define jl_sigint_safepoint(ptls) do { \
jl_signal_fence(); \
size_t safepoint_load = jl_atomic_load_relaxed(&ptls->safepoint)[-1]; \
jl_signal_fence(); \
(void)safepoint_load; \
} while (0)
#endif
STATIC_INLINE int8_t jl_gc_state_set(jl_ptls_t ptls, int8_t state,
Expand Down
11 changes: 4 additions & 7 deletions src/rtutils.c
Expand Up @@ -275,15 +275,12 @@ JL_DLLEXPORT void jl_eh_restore_state(jl_handler_t *eh)
}
ct->world_age = eh->world_age;
ct->ptls->defer_signal = eh->defer_signal;
if (old_gc_state != eh->gc_state) {
if (old_gc_state != eh->gc_state)
jl_atomic_store_release(&ct->ptls->gc_state, eh->gc_state);
if (old_gc_state) {
jl_gc_safepoint_(ct->ptls);
}
}
if (old_defer_signal && !eh->defer_signal) {
if (!eh->gc_state)
jl_gc_safepoint_(ct->ptls);
if (old_defer_signal && !eh->defer_signal)
jl_sigint_safepoint(ct->ptls);
}
if (jl_atomic_load_relaxed(&jl_gc_have_pending_finalizers) &&
unlocks && eh->locks_len == 0) {
jl_gc_run_pending_finalizers(ct);
Expand Down
135 changes: 125 additions & 10 deletions src/safepoint.c
Expand Up @@ -30,7 +30,8 @@ char *jl_safepoint_pages = NULL;
// so that both safepoint load and pending signal load falls in this page.
// The initialization of the `safepoint` pointer is done `ti_initthread`
// in `threading.c`.
uint8_t jl_safepoint_enable_cnt[3] = {0, 0, 0};
// The fourth page is the count of suspended threads
uint16_t jl_safepoint_enable_cnt[4] = {0, 0, 0, 0};

// This lock should be acquired before enabling/disabling the safepoint
// or accessing one of the following variables:
Expand All @@ -48,12 +49,12 @@ uv_cond_t safepoint_cond;
static void jl_safepoint_enable(int idx) JL_NOTSAFEPOINT
{
// safepoint_lock should be held
assert(0 <= idx && idx < 3);
assert(0 <= idx && idx <= 3);
if (jl_safepoint_enable_cnt[idx]++ != 0) {
// We expect this to be enabled at most twice
// one for the GC, one for SIGINT.
// Update this if this is not the case anymore in the future.
assert(jl_safepoint_enable_cnt[idx] <= 2);
assert(jl_safepoint_enable_cnt[idx] <= (idx == 3 ? INT16_MAX : 2));
return;
}
// Now that we are requested to mprotect the page and it wasn't already.
Expand All @@ -62,14 +63,15 @@ static void jl_safepoint_enable(int idx) JL_NOTSAFEPOINT
DWORD old_prot;
VirtualProtect(pageaddr, jl_page_size, PAGE_NOACCESS, &old_prot);
#else
mprotect(pageaddr, jl_page_size, PROT_NONE);
int r = mprotect(pageaddr, jl_page_size, PROT_NONE);
(void)r; //if (r) perror("mprotect");
#endif
}

static void jl_safepoint_disable(int idx) JL_NOTSAFEPOINT
{
// safepoint_lock should be held
assert(0 <= idx && idx < 3);
assert(0 <= idx && idx <= 3);
if (--jl_safepoint_enable_cnt[idx] != 0) {
assert(jl_safepoint_enable_cnt[idx] > 0);
return;
Expand All @@ -81,7 +83,8 @@ static void jl_safepoint_disable(int idx) JL_NOTSAFEPOINT
DWORD old_prot;
VirtualProtect(pageaddr, jl_page_size, PAGE_READONLY, &old_prot);
#else
mprotect(pageaddr, jl_page_size, PROT_READ);
int r = mprotect(pageaddr, jl_page_size, PROT_READ);
(void)r; //if (r) perror("mprotect");
#endif
}

Expand All @@ -92,9 +95,9 @@ void jl_safepoint_init(void)
// jl_page_size isn't available yet.
size_t pgsz = jl_getpagesize();
#ifdef _OS_WINDOWS_
char *addr = (char*)VirtualAlloc(NULL, pgsz * 3, MEM_COMMIT, PAGE_READONLY);
char *addr = (char*)VirtualAlloc(NULL, pgsz * 4, MEM_COMMIT, PAGE_READONLY);
#else
char *addr = (char*)mmap(0, pgsz * 3, PROT_READ,
char *addr = (char*)mmap(0, pgsz * 4, PROT_READ,
MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (addr == MAP_FAILED)
addr = NULL;
Expand All @@ -104,6 +107,18 @@ void jl_safepoint_init(void)
jl_gc_debug_critical_error();
abort();
}
// // If we able to skip past the faulting safepoint instruction conditionally,
// // then we can make this safepoint page unconditional. But otherwise we
// // only enable this page when required, though it gives us less
// // fine-grained control over individual resume.
// char *pageaddr = addr + pgsz * 3;
//#ifdef _OS_WINDOWS_
// DWORD old_prot;
// VirtualProtect(pageaddr, pgsz, PAGE_NOACCESS, &old_prot);
//#else
// int r = mprotect(pageaddr, pgsz, PROT_NONE);
// (void)r; //if (r) perror("mprotect");
//#endif
// The signal page is for the gc safepoint.
// The page before it is the sigint pending flag.
jl_safepoint_pages = addr;
Expand All @@ -113,6 +128,7 @@ int jl_safepoint_start_gc(void)
{
// The thread should have set this already
assert(jl_atomic_load_relaxed(&jl_current_task->ptls->gc_state) == JL_GC_STATE_WAITING);
jl_safepoint_wait_thread_resume(); // make sure we are permitted to run GC now (we might be required to stop instead)
uv_mutex_lock(&safepoint_lock);
// In case multiple threads enter the GC at the same time, only allow
// one of them to actually run the collection. We can't just let the
Expand Down Expand Up @@ -148,15 +164,16 @@ void jl_safepoint_end_gc(void)
jl_safepoint_disable(2);
jl_safepoint_disable(1);
jl_atomic_store_release(&jl_gc_running, 0);
# ifdef __APPLE__
# ifdef _OS_DARWIN_
// This wakes up other threads on mac.
jl_mach_gc_end();
# endif
uv_mutex_unlock(&safepoint_lock);
uv_cond_broadcast(&safepoint_cond);
}

void jl_safepoint_wait_gc(void)
// this is the core of jl_set_gc_and_wait
void jl_safepoint_wait_gc(void) JL_NOTSAFEPOINT
{
jl_task_t *ct = jl_current_task; (void)ct;
JL_TIMING_SUSPEND_TASK(GC_SAFEPOINT, ct);
Expand All @@ -175,6 +192,104 @@ void jl_safepoint_wait_gc(void)
}
}

// equivalent to jl_set_gc_and_wait, but waiting on resume-thread lock instead
void jl_safepoint_wait_thread_resume(void)
{
jl_task_t *ct = jl_current_task;
// n.b. we do not permit a fast-path here that skips the lock acquire since
// we otherwise have no synchronization point to ensure that this thread
// will observe the change to the safepoint, even though the other thread
// might have already observed our gc_state.
// if (!jl_atomic_load_relaxed(&ct->ptls->suspend_count)) return;
JL_TIMING_SUSPEND_TASK(USER, ct);
int8_t state = jl_atomic_load_relaxed(&ct->ptls->gc_state);
jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_WAITING);
uv_mutex_lock(&ct->ptls->sleep_lock);
while (jl_atomic_load_relaxed(&ct->ptls->suspend_count))
uv_cond_wait(&ct->ptls->wake_signal, &ct->ptls->sleep_lock);
// must while still holding the mutex_unlock, so we know other threads in
// jl_safepoint_suspend_thread will observe this thread in the correct GC
// state, and not still stuck in JL_GC_STATE_WAITING
jl_atomic_store_release(&ct->ptls->gc_state, state);
uv_mutex_unlock(&ct->ptls->sleep_lock);
}

// n.b. suspended threads may still run in the GC or GC safe regions
// but shouldn't be observable, depending on which enum the user picks (only 1 and 2 are typically recommended here)
// waitstate = 0 : do not wait for suspend to finish
// waitstate = 1 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE)
// waitstate = 2 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE) and that GC is not running on that thread
// waitstate = 3 : wait for full suspend (gc_state == JL_GC_STATE_WAITING) -- this may never happen if thread is sleeping currently
// if another thread comes along and calls jl_safepoint_resume, we also return early
// return new suspend count on success, 0 on failure
int jl_safepoint_suspend_thread(int tid, int waitstate)
{
if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads))
return 0;
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
uv_mutex_lock(&ptls2->sleep_lock);
int16_t suspend_count = jl_atomic_load_relaxed(&ptls2->suspend_count) + 1;
jl_atomic_store_relaxed(&ptls2->suspend_count, suspend_count);
if (suspend_count == 1) { // first to suspend
jl_safepoint_enable(3);
jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 3 + sizeof(void*)));
}
uv_mutex_unlock(&ptls2->sleep_lock);
if (waitstate) {
// wait for suspend (or another thread to call resume)
if (waitstate >= 2) {
// We currently cannot distinguish if a thread is helping run GC or
// not, so assume it is running GC and wait for GC to finish first.
// It will be unable to reenter helping with GC because we have
// changed its safepoint page.
jl_set_gc_and_wait();
}
while (jl_atomic_load_acquire(&ptls2->suspend_count) != 0) {
int8_t state2 = jl_atomic_load_acquire(&ptls2->gc_state);
if (waitstate <= 2 && state2 != 0)
break;
if (waitstate == 3 && state2 == JL_GC_STATE_WAITING)
break;
jl_cpu_pause(); // yield?
}
}
return suspend_count;
}

// return old suspend count on success, 0 on failure
// n.b. threads often do not resume until after all suspended threads have been resumed!
int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT
{
if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads))
return 0;
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
# ifdef _OS_DARWIN_
uv_mutex_lock(&safepoint_lock);
# endif
uv_mutex_lock(&ptls2->sleep_lock);
int16_t suspend_count = jl_atomic_load_relaxed(&ptls2->suspend_count);
if (suspend_count == 1) { // last to unsuspend
if (tid == 0)
jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size));
else
jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 2 + sizeof(void*)));
uv_cond_signal(&ptls2->wake_signal);
#ifdef _OS_DARWIN_
jl_safepoint_resume_thread_mach(ptls2, tid);
#endif
}
if (suspend_count != 0) {
jl_atomic_store_relaxed(&ptls2->suspend_count, suspend_count - 1);
if (suspend_count == 1)
jl_safepoint_disable(3);
}
uv_mutex_unlock(&ptls2->sleep_lock);
# ifdef _OS_DARWIN_
uv_mutex_unlock(&safepoint_lock);
# endif
return suspend_count;
}

void jl_safepoint_enable_sigint(void)
{
uv_mutex_lock(&safepoint_lock);
Expand Down

0 comments on commit 3f23533

Please sign in to comment.