-
-
Notifications
You must be signed in to change notification settings - Fork 74
/
qsub.jl
120 lines (94 loc) · 3.61 KB
/
qsub.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
export PBSManager, SGEManager, QRSHManager, addprocs_pbs, addprocs_sge, addprocs_qrsh
struct PBSManager <: ClusterManager
np::Integer
queue
wd
end
struct SGEManager <: ClusterManager
np::Integer
queue
wd
end
struct QRSHManager <: ClusterManager
np::Integer
queue
wd
end
function launch(manager::Union{PBSManager, SGEManager, QRSHManager},
params::Dict, instances_arr::Array, c::Condition)
try
dir = params[:dir]
exename = params[:exename]
exeflags = params[:exeflags]
wd = manager.wd
isPBS = isa(manager, PBSManager)
np = manager.np
queue = manager.queue
jobname = `julia-$(getpid())`
if isa(manager, QRSHManager)
cmd = `cd $dir '&&' $exename $exeflags $(worker_arg())`
qrsh_cmd = `qrsh $queue -V -N $jobname -wd $wd -now n "$cmd"`
stream_proc = [open(qrsh_cmd) for i in 1:np]
for i in 1:np
config = WorkerConfig()
config.io, io_proc = stream_proc[i]
config.userdata = Dict{Symbol, Any}(:task => i,
:process => io_proc)
push!(instances_arr, config)
notify(c)
end
else # PBS & SGE
cmd = `cd $dir '&&' $exename $exeflags $(worker_arg())`
qsub_cmd = pipeline(`echo $(Base.shell_escape(cmd))` , (isPBS ?
`qsub -N $jobname -wd $wd -j oe -k o -t 1-$np $queue` :
`qsub -N $jobname -wd $wd -terse -j y -R y -t 1-$np -V $queue`))
out = open(qsub_cmd)
if !success(out)
throw(error()) # qsub already gives a message
end
id = chomp(split(readline(out),'.')[1])
if endswith(id, "[]")
id = id[1:end-2]
end
filenames(i) = "$wd/julia-$(getpid()).o$id-$i","$wd/julia-$(getpid())-$i.o$id","$wd/julia-$(getpid()).o$id.$i"
println("Job $id in queue.")
for i=1:np
# wait for each output stream file to get created
fnames = filenames(i)
j = 0
while (j=findfirst(x->isfile(x),fnames))==nothing
sleep(1.0)
end
fname = fnames[j]
# Hack to get Base to get the host:port, the Julia process has already started.
cmd = `tail -f $fname`
config = WorkerConfig()
config.io = open(detach(cmd))
config.userdata = Dict{Symbol, Any}(:job=>id, :task=>i, :iofile=>fname)
push!(instances_arr, config)
notify(c)
end
println("Running.")
end
catch e
println("Error launching workers")
println(e)
end
end
function manage(manager::Union{PBSManager, SGEManager, QRSHManager},
id::Int64, config::WorkerConfig, op::Symbol)
end
function kill(manager::Union{PBSManager, SGEManager, QRSHManager}, id::Int64, config::WorkerConfig)
remotecall(exit,id)
close(config.io)
if isa(manager, QRSHManager)
kill(config.userdata[:process],15)
return
end
if isfile(config.userdata[:iofile])
rm(config.userdata[:iofile])
end
end
addprocs_pbs(np::Integer; qsub_flags=``, wd=ENV["HOME"], kwargs...) = addprocs(PBSManager(np, qsub_flags, wd); kwargs...)
addprocs_sge(np::Integer; qsub_flags=``, wd=ENV["HOME"], kwargs...) = addprocs(SGEManager(np, qsub_flags, wd); kwargs...)
addprocs_qrsh(np::Integer; qsub_flags=``, wd=ENV["HOME"], kwargs...) = addprocs(QRSHManager(np, qsub_flags, wd); kwargs...)