From de2d66d3b8b87c30782bb19035c82f0d4047fa11 Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Thu, 30 Jul 2015 13:21:22 +0530 Subject: [PATCH] RemoteValue can hold any type of AbstractChannel --- base/multi.jl | 111 ++++++++++++++++----------------------------- base/precompile.jl | 7 --- 2 files changed, 39 insertions(+), 79 deletions(-) diff --git a/base/multi.jl b/base/multi.jl index 1b179948717002..2b5c8cbb6084a1 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -451,7 +451,7 @@ function deregister_worker(pg, pid) # throw exception to tasks waiting for this pid for (id,rv) in tonotify - notify_error(rv.full, ProcessExitedException()) + notify_error(rv.c, ProcessExitedException()) delete!(pg.refs, id) end end @@ -476,20 +476,26 @@ type RemoteRef finalizer(r, send_del_client) r end +end - REQ_ID::Int = 0 - function RemoteRef(pid::Integer) - rr = RemoteRef(pid, myid(), REQ_ID) - REQ_ID += 1 - rr - end +let REF_ID::Int = 1 + global next_ref_id + next_ref_id() = (id = REF_ID; REF_ID += 1; id) + + global next_rrid_tuple + next_rrid_tuple() = (myid(),next_ref_id()) +end - RemoteRef(w::LocalProcess) = RemoteRef(w.id) - RemoteRef(w::Worker) = RemoteRef(w.id) - RemoteRef() = RemoteRef(myid()) +RemoteRef(w::LocalProcess) = RemoteRef(w.id) +RemoteRef(w::Worker) = RemoteRef(w.id) +RemoteRef(pid::Integer=myid()) = RemoteRef(pid, myid(), next_ref_id()) - global next_id - next_id() = (id=(myid(),REQ_ID); REQ_ID+=1; id) +function RemoteRef(f::Function, pid::Integer=myid()) + remotecall_fetch(pid, f-> begin + rr = RemoteRef() + lookup_ref(rr2id(rr), f) + rr + end, f) end hash(r::RemoteRef, h::UInt) = hash(r.whence, hash(r.id, h)) @@ -497,12 +503,12 @@ hash(r::RemoteRef, h::UInt) = hash(r.whence, hash(r.id, h)) rr2id(r::RemoteRef) = (r.whence, r.id) -lookup_ref(id) = lookup_ref(PGRP, id) -function lookup_ref(pg, id) +lookup_ref(id, f=def_rv_channel) = lookup_ref(PGRP, id, f) +function lookup_ref(pg, id, f) rv = get(pg.refs, id, false) if rv === false # first we've heard of this ref - rv = RemoteValue() + rv = RemoteValue(f) pg.refs[id] = rv push!(rv.clientset, id[1]) end @@ -512,9 +518,9 @@ end function isready(rr::RemoteRef) rid = rr2id(rr) if rr.where == myid() - lookup_ref(rid).done + isready(lookup_ref(rid).c) else - remotecall_fetch(rr.where, id->lookup_ref(id).done, rid) + remotecall_fetch(rr.where, id->isready(lookup_ref(rid).c), rid) end end @@ -607,38 +613,16 @@ function deserialize(s::SerializationState, t::Type{RemoteRef}) end # data stored by the owner of a RemoteRef +def_rv_channel() = Channel(1) type RemoteValue - done::Bool - result - full::Condition # waiting for a value - empty::Condition # waiting for value to be removed + c::AbstractChannel clientset::IntSet waitingfor::Int # processor we need to hear from to fill this, or 0 - RemoteValue() = new(false, nothing, Condition(), Condition(), IntSet(), 0) -end - -function work_result(rv::RemoteValue) - v = rv.result - if isa(v,WeakRef) - v = v.value - end - v -end - -function wait_full(rv::RemoteValue) - while !rv.done - wait(rv.full) - end - return work_result(rv) + RemoteValue(f::Function) = new(f(), IntSet(), 0) end -function wait_empty(rv::RemoteValue) - while rv.done - wait(rv.empty) - end - return nothing -end +wait(rv::RemoteValue) = wait(rv.c) ## core messages: do, call, fetch, wait, ref, put! ## @@ -659,7 +643,7 @@ function run_work_thunk(rv::RemoteValue, thunk) end function schedule_call(rid, thunk) - rv = RemoteValue() + rv = RemoteValue(def_rv_channel) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid[1]) schedule(@task(run_work_thunk(rv,thunk))) @@ -724,11 +708,11 @@ end function remotecall_fetch(w::Worker, f, args...) # can be weak, because the program will have no way to refer to the Ref # itself, it only gets the result. - oid = next_id() + oid = next_rrid_tuple() rv = lookup_ref(oid) rv.waitingfor = w.id send_msg(w, CallMsg{:call_fetch}(f, args, oid)) - v = wait_full(rv) + v = take!(rv) delete!(PGRP.refs, oid) v end @@ -740,12 +724,12 @@ remotecall_fetch(id::Integer, f, args...) = remotecall_wait(w::LocalProcess, f, args...) = wait(remotecall(w,f,args...)) function remotecall_wait(w::Worker, f, args...) - prid = next_id() + prid = next_rrid_tuple() rv = lookup_ref(prid) rv.waitingfor = w.id rr = RemoteRef(w) send_msg(w, CallWaitMsg(f, args, rr2id(rr), prid)) - wait_full(rv) + wait(rv) delete!(PGRP.refs, prid) rr end @@ -779,33 +763,20 @@ function call_on_owner(f, rr::RemoteRef, args...) end end -wait_ref(rid) = (wait_full(lookup_ref(rid)); nothing) +wait_ref(rid) = (wait(lookup_ref(rid).c); nothing) wait(r::RemoteRef) = (call_on_owner(wait_ref, r); r) -fetch_ref(rid) = wait_full(lookup_ref(rid)) +fetch_ref(rid) = fetch(lookup_ref(rid).c) fetch(r::RemoteRef) = call_on_owner(fetch_ref, r) fetch(x::ANY) = x # storing a value to a RemoteRef -function put!(rv::RemoteValue, val::ANY) - wait_empty(rv) - rv.result = val - rv.done = true - notify_full(rv) - rv -end +put!(rv::RemoteValue, val::ANY) = put!(rv.c, val) put_ref(rid, v) = put!(lookup_ref(rid), v) put!(rr::RemoteRef, val::ANY) = (call_on_owner(put_ref, rr, val); rr) -function take!(rv::RemoteValue) - wait_full(rv) - val = rv.result - rv.done = false - rv.result = nothing - notify_empty(rv) - val -end +take!(rv::RemoteValue) = take!(rv.c) take_ref(rid) = take!(lookup_ref(rid)) take!(rr::RemoteRef) = call_on_owner(take_ref, rr) @@ -836,10 +807,6 @@ function deliver_result(sock::IO, msg, oid, value) end end -# notify waiters that a certain job has finished or RemoteRef has been emptied -notify_full( rv::RemoteValue) = notify(rv.full, work_result(rv)) -notify_empty(rv::RemoteValue) = notify(rv.empty) - ## message event handlers ## process_messages(r_stream::TCPSocket, w_stream::TCPSocket) = @schedule process_tcp_streams(r_stream, w_stream) @@ -914,7 +881,7 @@ end function handle_msg(msg::CallWaitMsg, r_stream, w_stream) @schedule begin rv = schedule_call(msg.response_oid, ()->msg.f(msg.args...)) - deliver_result(w_stream, :call_wait, msg.notify_oid, wait_full(rv)) + deliver_result(w_stream, :call_wait, msg.notify_oid, wait(rv)) end end @@ -1222,7 +1189,7 @@ function create_worker(manager, wconfig) finalizer(w, (w)->if myid() == 1 manage(w.manager, w.id, w.config, :finalize) end) # set when the new worker has finshed connections with all other workers - ntfy_oid = next_id() + ntfy_oid = next_rrid_tuple() rr_ntfy_join = lookup_ref(ntfy_oid) rr_ntfy_join.waitingfor = myid() @@ -1274,7 +1241,7 @@ function create_worker(manager, wconfig) send_msg_now(w, JoinPGRPMsg(w.id, all_locs, isa(w.manager, LocalManager), ntfy_oid, PGRP.topology)) @schedule manage(w.manager, w.id, w.config, :register) - wait_full(rr_ntfy_join) + wait(rr_ntfy_join) delete!(PGRP.refs, ntfy_oid) w.id diff --git a/base/precompile.jl b/base/precompile.jl index 59e72ffbb68a99..bc86ab48d5a534 100644 --- a/base/precompile.jl +++ b/base/precompile.jl @@ -301,8 +301,6 @@ precompile(Base.normpath, (ASCIIString,)) precompile(Base.normpath, (UTF8String, UTF8String)) precompile(Base.normpath, (UTF8String,)) precompile(Base.notify, (Condition, Any)) -precompile(Base.notify_empty, (Base.RemoteValue,)) -precompile(Base.notify_full, (Base.RemoteValue,)) precompile(Base.open, (ASCIIString, ASCIIString)) precompile(Base.parse_input_line, (ASCIIString,)) precompile(Base.parse, (Type{Int}, ASCIIString, Int)) @@ -426,9 +424,6 @@ precompile(Base.uvfinalize, (Base.TTY,)) precompile(Base.vcat, (Base.LineEdit.Prompt,)) precompile(Base.wait, ()) precompile(Base.wait, (RemoteRef,)) -precompile(Base.wait_empty, (Base.RemoteValue,)) -precompile(Base.wait_full, (Base.RemoteValue,)) -precompile(Base.work_result, (Base.RemoteValue,)) precompile(Base.write, (Base.Terminals.TTYTerminal, ASCIIString)) precompile(Base.write, (Base.Terminals.TerminalBuffer, ASCIIString)) precompile(Base.write, (IOBuffer, Vector{UInt8})) @@ -446,7 +441,6 @@ precompile(Base.Sort.sort!, (Array{Any,1},)) precompile(Base.Sort.sort!, (Array{VersionNumber, 1}, Int, Int, Base.Sort.InsertionSortAlg, Base.Order.ForwardOrdering)) precompile(Base.info, (ASCIIString,)) precompile(Base.isempty, (Array{Void, 1},)) -precompile(Base.setindex!, (Dict{Any, Any}, Base.RemoteValue, (Int, Int))) precompile(Base.setindex!, (Dict{ByteString, VersionNumber}, VersionNumber, ASCIIString)) precompile(Base.spawn, (Cmd, (Base.TTY, Base.TTY, Base.TTY), Bool, Bool)) precompile(Base.spawn, (Cmd,)) @@ -470,7 +464,6 @@ precompile(Base.LineEdit.init_state, (Base.Terminals.TTYTerminal, Base.LineEdit. precompile(Base.setindex!, (Base.Dict{Any, Any}, Base.LineEdit.PrefixSearchState, Base.LineEdit.PrefixHistoryPrompt{Base.REPL.REPLHistoryProvider})) precompile(Base.take_ref, (Tuple{Int64, Int64},)) precompile(Base.get, (Base.Dict{Any, Any}, Tuple{Int64, Int64}, Bool)) -precompile(Base.setindex!, (Base.Dict{Any, Any}, Base.RemoteValue, Tuple{Int64, Int64})) precompile(Base.LineEdit.refresh_multi_line, (Array{Any, 1}, Base.Terminals.TerminalBuffer, Base.Terminals.TTYTerminal, Base.IOBuffer, Base.LineEdit.InputAreaState, Base.LineEdit.PromptState)) precompile(Base.schedule, (Array{Any, 1}, Task, Void)) precompile(Base.LineEdit.match_input, (Function, Base.LineEdit.MIState, Base.Terminals.TTYTerminal, Array{Char, 1}, Base.Dict{Char, Any}))