Skip to content

Commit

Permalink
Revert open(cmd) => Process change
Browse files Browse the repository at this point in the history
for separation into a independent merge commit
  • Loading branch information
vtjnash committed Aug 25, 2015
1 parent c565cd0 commit 3c295ed
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 36 deletions.
8 changes: 4 additions & 4 deletions base/loading.jl
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ function create_expr_cache(input::AbstractString, output::AbstractString)
eval(Main, deserialize(STDIN))
end
"""
io = open(detach(`$(julia_cmd())
io, pobj = open(detach(`$(julia_cmd())
--output-ji $output --output-incremental=yes
--startup-file=no --history-file=no
--eval $code_object`), "w", STDOUT)
Expand All @@ -321,9 +321,9 @@ function create_expr_cache(input::AbstractString, output::AbstractString)
delete!(task_local_storage(), :SOURCE_PATH)
end)
end
close(io.in)
wait(io)
return io
close(io)
wait(pobj)
return pobj
end

compilecache(mod::Symbol) = compilecache(string(mod))
Expand Down
8 changes: 4 additions & 4 deletions base/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ function launch_on_machine(manager::SSHManager, machine, cnt, params, launched,
cmd = `ssh -T -a -x -o ClearAllForwardings=yes -n $sshflags $host $(shell_escape(cmd))` # use ssh to remote launch

# launch
pobj = open(detach(cmd), "r")
io, pobj = open(detach(cmd), "r")
wconfig = WorkerConfig()

wconfig.io = pobj.out
wconfig.io = io
wconfig.host = host
wconfig.tunnel = params[:tunnel]
wconfig.sshflags = sshflags
Expand Down Expand Up @@ -188,11 +188,11 @@ function launch(manager::LocalManager, params::Dict, launched::Array, c::Conditi
exeflags = params[:exeflags]

for i in 1:manager.np
pobj = open(detach(
io, pobj = open(detach(
setenv(`$(julia_cmd(exename)) $exeflags --bind-to $(LPROC.bind_addr) --worker`, dir=dir)), "r")
wconfig = WorkerConfig()
wconfig.process = pobj
wconfig.io = pobj.out
wconfig.io = io
push!(launched, wconfig)
end

Expand Down
5 changes: 3 additions & 2 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,7 @@ end
# The master process uses this to connect to the worker and subsequently
# setup a all-to-all network.
function read_worker_host_port(io::IO)
io.line_buffered = true
while true
conninfo = readline(io)
bind_addr, port = parse_connection_info(conninfo)
Expand Down Expand Up @@ -1269,8 +1270,8 @@ function launch_additional(np::Integer, cmd::Cmd)
addresses = cell(np)

for i in 1:np
pobj = open(detach(cmd), "r")
io_objs[i] = pobj.out
io, pobj = open(detach(cmd), "r")
io_objs[i] = io
end

for (i,io) in enumerate(io_objs)
Expand Down
30 changes: 12 additions & 18 deletions base/process.jl
Original file line number Diff line number Diff line change
Expand Up @@ -465,52 +465,46 @@ eachline(cmd::AbstractCmd) = eachline(cmd, DevNull)

# return a Process object to read-to/write-from the pipeline
function open(cmds::AbstractCmd, mode::AbstractString="r", other::AsyncStream=DevNull)
if mode == "r+" || mode == "w+"
other === DevNull || throw(ArgumentError("no other stream for mode rw+"))
in = Pipe()
out = Pipe()
processes = spawn(cmds, (in,out,STDERR))
close(in.out)
close(out.in)
elseif mode == "r"
if mode == "r"
in = other
out = Pipe()
out = io = Pipe()
processes = spawn(cmds, (in,out,STDERR))
close(out.in)
elseif mode == "w"
in = Pipe()
in = io = Pipe()
out = other
processes = spawn(cmds, (in,out,STDERR))
close(in.out)
else
throw(ArgumentError("mode must be \"r\" or \"w\", not \"$mode\""))
end
return processes
return (io, processes)
end

function open(f::Function, cmds::AbstractCmd, args...)
P = open(cmds, args...)
io, P = open(cmds, args...)
ret = try
f(P)
f(io)
catch
kill(P)
rethrow()
finally
close(P)
close(io)
end
success(P) || pipeline_error(P)
return ret
end

# TODO: deprecate this
function readandwrite(cmds::AbstractCmd)
processes = open(cmds, "r+")
(processes.out, processes.in, processes)
in = Pipe()
out, processes = open(cmds, "r", in)
(out, in, processes)
end

function readbytes(cmd::AbstractCmd, stdin::AsyncStream=DevNull)
procs = open(cmd, "r", stdin)
bytes = readbytes(procs.out)
out, procs = open(cmd, "r", stdin)
bytes = readbytes(out)
!success(procs) && pipeline_error(procs)
return bytes
end
Expand Down
1 change: 0 additions & 1 deletion base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ iswritable(io::AbstractPipe) = iswritable(io.in)
read{T<:AbstractPipe}(io::T, args...) = read(io.out, args...)
write{T<:AbstractPipe}(io::T, args...) = write(io.in, args...)
write{S<:AbstractPipe,T}(io::S, a::Array{T}) = write(io.in, a)
buffer_or_write(io::AbstractPipe, p::Ptr, n::Integer) = buffer_or_write(io.in, p, n)
readuntil{T<:AbstractPipe}(io::T, args...) = readuntil(io.out, args...)
read!{T<:AbstractPipe}(io::T, args...) = read!(io.out, args...)
readbytes(io::AbstractPipe) = readbytes(io.out)
Expand Down
4 changes: 2 additions & 2 deletions examples/clustermanager/simple/UnixDomainCM.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ function launch(manager::UnixDomainCM, params::Dict, launched::Array, c::Conditi
sockname = tempname()
try
cmd = `$(params[:exename]) $(@__FILE__) udwrkr $sockname`
pobj = open(cmd, "r")
io, pobj = open(cmd, "r")

wconfig = WorkerConfig()
wconfig.userdata = Dict(:sockname=>sockname, :io=>pobj.out, :process=>pobj)
wconfig.userdata = Dict(:sockname=>sockname, :io=>io, :process=>pobj)
push!(launched, wconfig)
notify(c)
catch e
Expand Down
5 changes: 2 additions & 3 deletions test/examples.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ include(joinpath(dir, "queens.jl"))
script = joinpath(dir, "clustermanager/simple/test_simple.jl")
cmd = `$(joinpath(JULIA_HOME,Base.julia_exename())) $script`

proc = open(cmd)
errors = readall(proc)
(strm, proc) = open(cmd)
wait(proc)
if !success(proc) && ccall(:jl_running_on_valgrind,Cint,()) == 0
println(errors)
println(readall(strm))
error("UnixDomainCM failed test, cmd : $cmd")
end
end
Expand Down
4 changes: 2 additions & 2 deletions test/spawn.jl
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,10 @@ sleep(1)
import Base.zzzInvalidIdentifier
"""
try
in = open(`$exename -f`, "w")
(in,p) = open(`$exename -f`, "w")
write(in,cmd)
close(in)
wait(in)
wait(p)
catch
error("IOStream redirect failed. Child stderr was \n$(readall(fname))\n")
end
Expand Down

0 comments on commit 3c295ed

Please sign in to comment.