From d1d37f790a79f05591d5347a45036ce43d37cc07 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Wed, 3 Feb 2021 22:52:00 -0500 Subject: [PATCH] Add a monitor to some detached Tasks --- doc/src/manual/asynchronous-programming.md | 10 +++++--- doc/src/manual/distributed-computing.md | 2 +- doc/src/manual/networking-and-streams.md | 12 +++++----- doc/src/manual/running-external-programs.md | 2 ++ stdlib/Distributed/src/cluster.jl | 12 ++++++---- stdlib/Distributed/src/macros.jl | 3 ++- stdlib/Distributed/src/managers.jl | 19 +++++---------- stdlib/Distributed/src/process_messages.jl | 26 ++++++++++----------- stdlib/Distributed/src/remotecall.jl | 6 ++--- stdlib/REPL/src/LineEdit.jl | 6 ++--- stdlib/REPL/src/REPL.jl | 2 ++ stdlib/REPL/test/repl.jl | 5 ++-- 12 files changed, 53 insertions(+), 52 deletions(-) diff --git a/doc/src/manual/asynchronous-programming.md b/doc/src/manual/asynchronous-programming.md index 1791d4b0e40f7..c0181e775bd05 100644 --- a/doc/src/manual/asynchronous-programming.md +++ b/doc/src/manual/asynchronous-programming.md @@ -186,7 +186,7 @@ A channel can be visualized as a pipe, i.e., it has a write end and a read end : # we can schedule `n` instances of `foo` to be active concurrently. for _ in 1:n - @async foo() + errormonitor(@async foo()) end ``` * Channels are created via the `Channel{T}(sz)` constructor. The channel will only hold objects @@ -263,10 +263,10 @@ julia> function make_jobs(n) julia> n = 12; -julia> @async make_jobs(n); # feed the jobs channel with "n" jobs +julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs julia> for i in 1:4 # start 4 tasks to process requests in parallel - @async do_work() + errormonitor(@async do_work()) end julia> @elapsed while n > 0 # print out results @@ -289,6 +289,10 @@ julia> @elapsed while n > 0 # print out results 0.029772311 ``` +Instead of `errormonitor(t)`, a more robust solution may be use use `bind(results, t)`, as that will +not only log any unexpected failures, but also force the associated resources to close and propagate +the exception everywhere. + ## More task operations Task operations are built on a low-level primitive called [`yieldto`](@ref). diff --git a/doc/src/manual/distributed-computing.md b/doc/src/manual/distributed-computing.md index e5b6e78cae981..6e3231a76ed1a 100644 --- a/doc/src/manual/distributed-computing.md +++ b/doc/src/manual/distributed-computing.md @@ -580,7 +580,7 @@ julia> function make_jobs(n) julia> n = 12; -julia> @async make_jobs(n); # feed the jobs channel with "n" jobs +julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs julia> for p in workers() # start tasks on the workers to process requests in parallel remote_do(do_work, p, jobs, results) diff --git a/doc/src/manual/networking-and-streams.md b/doc/src/manual/networking-and-streams.md index 163716c583804..0bdef1b338925 100644 --- a/doc/src/manual/networking-and-streams.md +++ b/doc/src/manual/networking-and-streams.md @@ -193,13 +193,13 @@ Let's first create a simple server: ```julia-repl julia> using Sockets -julia> @async begin +julia> errormonitor(@async begin server = listen(2000) while true sock = accept(server) println("Hello World\n") end - end + end) Task (runnable) @0x00007fd31dc11ae0 ``` @@ -265,7 +265,7 @@ printed the message and waited for the next client. Reading and writing works in To see this, consider the following simple echo server: ```julia-repl -julia> @async begin +julia> errormonitor(@async begin server = listen(2001) while true sock = accept(server) @@ -273,15 +273,15 @@ julia> @async begin write(sock, readline(sock, keep=true)) end end - end + end) Task (runnable) @0x00007fd31dc12e60 julia> clientside = connect(2001) TCPSocket(RawFD(28) open, 0 bytes waiting) -julia> @async while isopen(clientside) +julia> errormonitor(@async while isopen(clientside) write(stdout, readline(clientside, keep=true)) - end + end) Task (runnable) @0x00007fd31dc11870 julia> println(clientside,"Hello World from the Echo Server") diff --git a/doc/src/manual/running-external-programs.md b/doc/src/manual/running-external-programs.md index a00050ffb5149..8fb59b8f7152a 100644 --- a/doc/src/manual/running-external-programs.md +++ b/doc/src/manual/running-external-programs.md @@ -327,6 +327,8 @@ wait(writer) fetch(reader) ``` +(commonly also, reader is not a separate task, since we immediately `fetch` it anyways). + ### Complex Example The combination of a high-level programming language, a first-class command abstraction, and automatic diff --git a/stdlib/Distributed/src/cluster.jl b/stdlib/Distributed/src/cluster.jl index 7329e1b91d37b..482e7da44390d 100644 --- a/stdlib/Distributed/src/cluster.jl +++ b/stdlib/Distributed/src/cluster.jl @@ -160,11 +160,12 @@ function check_worker_state(w::Worker) else w.ct_time = time() if myid() > w.id - @async exec_conn_func(w) + t = @async exec_conn_func(w) else # route request via node 1 - @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) + t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid()) end + errormonitor(t) wait_for_conn(w) end end @@ -242,10 +243,10 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std else sock = listen(interface, LPROC.bind_port) end - @async while isopen(sock) + errormonitor(@async while isopen(sock) client = accept(sock) process_messages(client, client, true) - end + end) print(out, "julia_worker:") # print header print(out, "$(string(LPROC.bind_port))#") # print port print(out, LPROC.bind_addr) @@ -274,7 +275,7 @@ end function redirect_worker_output(ident, stream) - @async while !eof(stream) + t = @async while !eof(stream) line = readline(stream) if startswith(line, " From worker ") # stdout's of "additional" workers started from an initial worker on a host are not available @@ -284,6 +285,7 @@ function redirect_worker_output(ident, stream) println(" From worker $(ident):\t$line") end end + errormonitor(t) end struct LaunchWorkerError <: Exception diff --git a/stdlib/Distributed/src/macros.jl b/stdlib/Distributed/src/macros.jl index b53890017d4de..6603d627c3409 100644 --- a/stdlib/Distributed/src/macros.jl +++ b/stdlib/Distributed/src/macros.jl @@ -279,9 +279,10 @@ function preduce(reducer, f, R) end function pfor(f, R) - @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) + t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers()) @spawnat :any f(R, first(c), last(c)) end + errormonitor(t) end function make_preduce_body(var, body) diff --git a/stdlib/Distributed/src/managers.jl b/stdlib/Distributed/src/managers.jl index 3519259190fbc..ce99d85801e17 100644 --- a/stdlib/Distributed/src/managers.jl +++ b/stdlib/Distributed/src/managers.jl @@ -158,22 +158,15 @@ default_addprocs_params(::SSHManager) = function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy::Condition) # Launch one worker on each unique host in parallel. Additional workers are launched later. # Wait for all launches to complete. - launch_tasks = Vector{Any}(undef, length(manager.machines)) - - for (i, (machine, cnt)) in enumerate(manager.machines) + @sync for (i, (machine, cnt)) in enumerate(manager.machines) let machine=machine, cnt=cnt - launch_tasks[i] = @async try - launch_on_machine(manager, machine, cnt, params, launched, launch_ntfy) - catch e - print(stderr, "exception launching on machine $(machine) : $(e)\n") - end + @async try + launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy) + catch e + print(stderr, "exception launching on machine $(machine) : $(e)\n") + end end end - - for t in launch_tasks - wait(t::Task) - end - notify(launch_ntfy) end diff --git a/stdlib/Distributed/src/process_messages.jl b/stdlib/Distributed/src/process_messages.jl index 3216a4e1c73c6..8d5dac5af571e 100644 --- a/stdlib/Distributed/src/process_messages.jl +++ b/stdlib/Distributed/src/process_messages.jl @@ -78,7 +78,7 @@ function schedule_call(rid, thunk) rv = RemoteValue(def_rv_channel()) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid.whence) - @async run_work_thunk(rv, thunk) + errormonitor(@async run_work_thunk(rv, thunk)) return rv end end @@ -111,7 +111,7 @@ end ## message event handlers ## function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true) - @async process_tcp_streams(r_stream, w_stream, incoming) + errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming)) end function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool) @@ -141,7 +141,7 @@ Julia version number to perform the authentication handshake. See also [`cluster_cookie`](@ref). """ function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true) - @async message_handler_loop(r_stream, w_stream, incoming) + errormonitor(@async message_handler_loop(r_stream, w_stream, incoming)) end function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool) @@ -274,7 +274,7 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version) schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...)) end function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version) - @async begin + errormonitor(@async begin v = run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), false) if isa(v, SyncTake) try @@ -285,18 +285,20 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi else deliver_result(w_stream, :call_fetch, header.notify_oid, v) end - end + nothing + end) end function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version) - @async begin + errormonitor(@async begin rv = schedule_call(header.response_oid, ()->msg.f(msg.args...; msg.kwargs...)) deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c)) - end + nothing + end) end function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version) - @async run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true) + errormonitor(@async run_work_thunk(()->msg.f(msg.args...; msg.kwargs...), true)) end function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version) @@ -330,8 +332,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) lazy = msg.lazy PGRP.lazy = lazy - wait_tasks = Task[] - for (connect_at, rpid) in msg.other_workers + @sync for (connect_at, rpid) in msg.other_workers wconfig = WorkerConfig() wconfig.connect_at = connect_at @@ -340,14 +341,11 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) # The constructor registers the object with a global registry. Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig)) else - t = @async connect_to_peer(cluster_manager, rpid, wconfig) - push!(wait_tasks, t) + @async connect_to_peer(cluster_manager, rpid, wconfig) end end end - for wt in wait_tasks; Base.wait(wt); end - send_connection_hdr(controller, false) send_msg_now(controller, MsgHeader(RRID(0,0), header.notify_oid), JoinCompleteMsg(Sys.CPU_THREADS, getpid())) end diff --git a/stdlib/Distributed/src/remotecall.jl b/stdlib/Distributed/src/remotecall.jl index f4845221a611a..9b2127d4f499a 100644 --- a/stdlib/Distributed/src/remotecall.jl +++ b/stdlib/Distributed/src/remotecall.jl @@ -192,7 +192,7 @@ or to use a local [`Channel`](@ref) as a proxy: ```julia p = 1 f = Future(p) -@async put!(f, remotecall_fetch(long_computation, p)) +errormonitor(@async put!(f, remotecall_fetch(long_computation, p))) isready(f) # will not block ``` """ @@ -249,10 +249,10 @@ end const any_gc_flag = Condition() function start_gc_msgs_task() - @async while true + errormonitor(@async while true wait(any_gc_flag) flush_gc_msgs() - end + end) end function send_del_client(rr) diff --git a/stdlib/REPL/src/LineEdit.jl b/stdlib/REPL/src/LineEdit.jl index 718cd02e6eb98..a8a1f5ef03912 100644 --- a/stdlib/REPL/src/LineEdit.jl +++ b/stdlib/REPL/src/LineEdit.jl @@ -182,7 +182,7 @@ function beep(s::PromptState, duration::Real=options(s).beep_duration, isinteractive() || return # some tests fail on some platforms s.beeping = min(s.beeping + duration, maxduration) let colors = Base.copymutable(colors) - @async begin + errormonitor(@async begin trylock(s.refresh_lock) || return try orig_prefix = s.p.prompt_prefix @@ -198,12 +198,10 @@ function beep(s::PromptState, duration::Real=options(s).beep_duration, s.p.prompt_prefix = orig_prefix refresh_multi_line(s, beeping=true) s.beeping = 0.0 - catch e - Base.showerror(stdout, e, catch_backtrace()) finally unlock(s.refresh_lock) end - end + end) end nothing end diff --git a/stdlib/REPL/src/REPL.jl b/stdlib/REPL/src/REPL.jl index 68f157322facc..d17482ec453cb 100644 --- a/stdlib/REPL/src/REPL.jl +++ b/stdlib/REPL/src/REPL.jl @@ -313,10 +313,12 @@ function run_repl(repl::AbstractREPL, @nospecialize(consumer = x -> nothing); ba end if backend_on_current_task t = @async run_frontend(repl, backend_ref) + errormonitor(t) Base._wait2(t, cleanup) start_repl_backend(backend, consumer) else t = @async start_repl_backend(backend, consumer) + errormonitor(t) Base._wait2(t, cleanup) run_frontend(repl, backend_ref) end diff --git a/stdlib/REPL/test/repl.jl b/stdlib/REPL/test/repl.jl index 577dfda1a811b..40c8617ce655d 100644 --- a/stdlib/REPL/test/repl.jl +++ b/stdlib/REPL/test/repl.jl @@ -453,7 +453,8 @@ for prompt = ["TestĪ ", () -> randstring(rand(1:10))] # In the future if we want we can add a test that the right object # gets displayed by intercepting the display repl.specialdisplay = REPL.REPLDisplay(repl) - @async write(devnull, stdout_read) # redirect stdout to devnull so we drain the output pipe + + errormonitor(@async write(devnull, stdout_read)) # redirect stdout to devnull so we drain the output pipe repl.interface = REPL.setup_interface(repl) repl_mode = repl.interface.modes[1] @@ -1252,7 +1253,7 @@ end # AST transformations (softscope, Revise, OhMyREPL, etc.) @testset "AST Transformation" begin backend = REPL.REPLBackend() - @async REPL.start_repl_backend(backend) + errormonitor(@async REPL.start_repl_backend(backend)) put!(backend.repl_channel, (:(1+1), false)) reply = take!(backend.response_channel) @test reply == Pair{Any, Bool}(2, false)