Skip to content

Commit

Permalink
allow any thread, one at a time, to block in the event loop
Browse files Browse the repository at this point in the history
start adding some synchronization to I/O

this yields basic support for printing to TTYs from multiple threads
  • Loading branch information
JeffBezanson committed Apr 3, 2019
1 parent a24c38c commit 55f0367
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 52 deletions.
2 changes: 2 additions & 0 deletions NEWS.md
Expand Up @@ -23,6 +23,8 @@ Multi-threading changes
* The `Condition` type now has a thread-safe replacement, accessed as `Threads.Condition`.
With that addition, task scheduling primitives such as `ReentrantLock` are now thread-safe ([#30061]).

* It is possible to schedule and switch Tasks during `@threads` loops, and perform limited I/O ([#31438]).

Language changes
----------------
* Empty entries in `JULIA_DEPOT_PATH` are now expanded to default depot entries ([#31009]).
Expand Down
5 changes: 5 additions & 0 deletions base/libuv.jl
Expand Up @@ -48,18 +48,23 @@ disassociate_julia_struct(handle::Ptr{Cvoid}) =
# A dict of all libuv handles that are being waited on somewhere in the system
# and should thus not be garbage collected
const uvhandles = IdDict()
const preserve_handle_lock = Threads.SpinLock()
function preserve_handle(x)
lock(preserve_handle_lock)
v = get(uvhandles, x, 0)::Int
uvhandles[x] = v + 1
unlock(preserve_handle_lock)
nothing
end
function unpreserve_handle(x)
lock(preserve_handle_lock)
v = uvhandles[x]::Int
if v == 1
pop!(uvhandles, x)
else
uvhandles[x] = v - 1
end
unlock(preserve_handle_lock)
nothing
end

Expand Down
68 changes: 45 additions & 23 deletions base/stream.jl
Expand Up @@ -120,7 +120,7 @@ mutable struct PipeEndpoint <: LibuvStream
buffer::IOBuffer
readnotify::Condition
connectnotify::Condition
closenotify::Condition
closenotify::ThreadSynchronizer
sendbuf::Union{IOBuffer, Nothing}
lock::ReentrantLock
throttle::Int
Expand All @@ -130,7 +130,7 @@ mutable struct PipeEndpoint <: LibuvStream
PipeBuffer(),
Condition(),
Condition(),
Condition(),
ThreadSynchronizer(),
nothing,
ReentrantLock(),
DEFAULT_READ_BUFFER_SZ)
Expand Down Expand Up @@ -165,7 +165,7 @@ mutable struct TTY <: LibuvStream
status::Int
buffer::IOBuffer
readnotify::Condition
closenotify::Condition
closenotify::ThreadSynchronizer
sendbuf::Union{IOBuffer, Nothing}
lock::ReentrantLock
throttle::Int
Expand All @@ -176,7 +176,7 @@ mutable struct TTY <: LibuvStream
status,
PipeBuffer(),
Condition(),
Condition(),
ThreadSynchronizer(),
nothing,
ReentrantLock(),
DEFAULT_READ_BUFFER_SZ)
Expand Down Expand Up @@ -380,25 +380,38 @@ function wait_readnb(x::LibuvStream, nb::Int)
end

function wait_close(x::Union{LibuvStream, LibuvServer})
if isopen(x)
stream_wait(x, x.closenotify)
lock(x.closenotify)
try
if isopen(x)
stream_wait(x, x.closenotify)
end
finally
unlock(x.closenotify)
end
nothing
end

function close(stream::Union{LibuvStream, LibuvServer})
if stream.status == StatusInit
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
elseif isopen(stream)
if stream.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
if uv_handle_data(stream) != C_NULL
stream_wait(stream, stream.closenotify)
return nothing
end
lock(stream.closenotify)
try
if isopen(stream)
should_wait = uv_handle_data(stream) != C_NULL
if stream.status != StatusClosing
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
if should_wait
stream_wait(stream, stream.closenotify)
end
end
finally
unlock(stream.closenotify)
end
nothing
return nothing
end

function uvfinalize(uv::Union{LibuvStream, LibuvServer})
Expand Down Expand Up @@ -547,7 +560,12 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
if isa(stream, TTY)
stream.status = StatusEOF # libuv called uv_stop_reading already
notify(stream.readnotify)
notify(stream.closenotify)
lock(stream.closenotify)
try
notify(stream.closenotify)
finally
unlock(stream.closenotify)
end
elseif stream.status != StatusClosing
# begin shutdown of the stream
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
Expand Down Expand Up @@ -589,10 +607,15 @@ function reseteof(x::TTY)
end

function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
uv.handle = C_NULL
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.closenotify)
lock(uv.closenotify)
try
uv.handle = C_NULL
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.closenotify)
finally
unlock(uv.closenotify)
end
isdefined(uv, :readnotify) && notify(uv.readnotify)
isdefined(uv, :connectnotify) && notify(uv.connectnotify)
nothing
Expand Down Expand Up @@ -842,14 +865,13 @@ end
uv_write(s::LibuvStream, p::Vector{UInt8}) = uv_write(s, pointer(p), UInt(sizeof(p)))

function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
uvw = uv_write_async(s, p, n)
ct = current_task()
uvw = uv_write_async(s, p, n, ct)
preserve_handle(ct)
try
# wait for the last chunk to complete (or error)
# assume that any errors would be sticky,
# (so we don't need to monitor the error status of the intermediate writes)
uv_req_set_data(uvw, ct)
wait()
finally
if uv_req_data(uvw) != C_NULL
Expand All @@ -867,11 +889,11 @@ end

# helper function for uv_write that returns the uv_write_t struct for the write
# rather than waiting on it
function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt, reqdata)
check_open(s)
while true
uvw = Libc.malloc(_sizeof_uv_write)
uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call
uv_req_set_data(uvw, reqdata)
nwrite = min(n, MAX_OS_WRITE) # split up the write into chunks the OS can handle.
# TODO: use writev, when that is added to uv-win
err = ccall(:jl_uv_write,
Expand Down
26 changes: 14 additions & 12 deletions src/jl_uv.c
Expand Up @@ -57,6 +57,20 @@ void jl_init_uv(void)
JL_MUTEX_INIT(&jl_uv_mutex); // a file-scope initializer can be used instead
}

int jl_uv_n_waiters = 0;

void JL_UV_LOCK(void)
{
if (jl_mutex_trylock(&jl_uv_mutex)) {
}
else {
jl_atomic_fetch_add(&jl_uv_n_waiters, 1);
jl_wake_libuv();
JL_LOCK(&jl_uv_mutex);
jl_atomic_fetch_add(&jl_uv_n_waiters, -1);
}
}

void jl_uv_call_close_callback(jl_value_t *val)
{
jl_value_t *args[2];
Expand Down Expand Up @@ -190,18 +204,6 @@ JL_DLLEXPORT int jl_run_once(uv_loop_t *loop)
return 0;
}

JL_DLLEXPORT void jl_run_event_loop(uv_loop_t *loop)
{
jl_ptls_t ptls = jl_get_ptls_states();
if (loop) {
jl_gc_safepoint_(ptls);
JL_UV_LOCK();
loop->stop_flag = 0;
uv_run(loop,UV_RUN_DEFAULT);
JL_UV_UNLOCK();
}
}

JL_DLLEXPORT int jl_process_events(uv_loop_t *loop)
{
jl_ptls_t ptls = jl_get_ptls_states();
Expand Down
3 changes: 2 additions & 1 deletion src/julia_internal.h
Expand Up @@ -115,7 +115,8 @@ static inline void jl_assume_(int cond)
static uv_loop_t *const unused_uv_loop_arg = (uv_loop_t *)0xBAD10;

extern jl_mutex_t jl_uv_mutex;
#define JL_UV_LOCK() JL_LOCK(&jl_uv_mutex)
extern int jl_uv_n_waiters;
void JL_UV_LOCK(void);
#define JL_UV_UNLOCK() JL_UNLOCK(&jl_uv_mutex)

#ifdef __cplusplus
Expand Down
41 changes: 25 additions & 16 deletions src/partr.c
Expand Up @@ -226,8 +226,9 @@ JL_DLLEXPORT void jl_wakeup_thread(int16_t tid)
uv_cond_broadcast(&sleep_alarm); // TODO: make this uv_cond_signal / just wake up correct thread
uv_mutex_unlock(&sleep_lock);
}
/* stop the event loop too, if on thread 1 and alerting thread 1 */
if (ptls->tid == 0 && (tid == 0 || tid == -1))
if (_threadedregion && jl_uv_mutex.owner != jl_thread_self())
jl_wake_libuv();
else
uv_stop(jl_global_event_loop());
}

Expand Down Expand Up @@ -260,8 +261,8 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
if (task)
return task;

if (ptls->tid == 0) {
if (!_threadedregion) {
if (!_threadedregion) {
if (ptls->tid == 0) {
if (jl_run_once(jl_global_event_loop()) == 0) {
task = get_next_task(getsticky);
if (task)
Expand All @@ -274,29 +275,37 @@ JL_DLLEXPORT jl_task_t *jl_task_get_next(jl_value_t *getsticky)
}
}
else {
jl_process_events(jl_global_event_loop());
}
}
else {
int sleepnow = 0;
if (!_threadedregion) {
int sleepnow = 0;
uv_mutex_lock(&sleep_lock);
if (!_threadedregion) {
sleepnow = 1;
}
else {
uv_mutex_unlock(&sleep_lock);
}
if (sleepnow) {
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_cond_wait(&sleep_alarm, &sleep_lock);
uv_mutex_unlock(&sleep_lock);
jl_gc_safe_leave(ptls, gc_state);
}
}
}
else {
if (jl_atomic_load(&jl_uv_n_waiters) == 0 && jl_mutex_trylock(&jl_uv_mutex)) {
task = get_next_task(getsticky);
if (task) {
JL_UV_UNLOCK();
return task;
}
uv_loop_t *loop = jl_global_event_loop();
loop->stop_flag = 0;
uv_run(loop, UV_RUN_ONCE);
JL_UV_UNLOCK();
}
else {
jl_cpu_pause();
}
if (sleepnow) {
int8_t gc_state = jl_gc_safe_enter(ptls);
uv_cond_wait(&sleep_alarm, &sleep_lock);
uv_mutex_unlock(&sleep_lock);
jl_gc_safe_leave(ptls, gc_state);
}
}
}
}
Expand Down

0 comments on commit 55f0367

Please sign in to comment.