Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make SLUM worker startup more robust and provide more feedback #200

Merged
merged 4 commits into from
Apr 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
98 changes: 62 additions & 36 deletions src/slurm.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,71 +51,97 @@ function launch(manager::SlurmManager, params::Dict, instances_arr::Array,
mkdir(job_file_loc)
end

# Check for given output file name
jobname = "julia-$(getpid())"
has_output_name = ("-o" in srunargs) | ("--output" in srunargs)
if has_output_name
loc = findfirst(x-> x == "-o" || x == "--output", srunargs)
job_output_name = srunargs[loc+1]
job_output_template = joinpath(job_file_loc, job_output_name)
srunargs[loc+1] = job_output_template
else
job_output_name = "$(jobname)-$(trunc(Int, Base.time() * 10))"
make_job_output_path(task_num) = joinpath(job_file_loc, "$(job_output_name)-$(task_num).out")
job_output_template = make_job_output_path("%4t")
append!(srunargs, "-o", job_output_template)
end
# Check for given output file name
jobname = "julia-$(getpid())"
has_output_name = ("-o" in srunargs) | ("--output" in srunargs)
if has_output_name
loc = findfirst(x-> x == "-o" || x == "--output", srunargs)
job_output_name = srunargs[loc+1]
job_output_template = joinpath(job_file_loc, job_output_name)
srunargs[loc+1] = job_output_template
else
job_output_name = "$(jobname)-$(trunc(Int, Base.time() * 10))"
make_job_output_path(task_num) = joinpath(job_file_loc, "$(job_output_name)-$(task_num).out")
job_output_template = make_job_output_path("%4t")
push!(srunargs, "-o", job_output_template)
end

np = manager.np
srun_cmd = `srun -J $jobname -n $np -D $exehome $(srunargs) $exename $exeflags $(worker_arg())`

@info "Starting SLURM job $jobname: $srun_cmd"
srun_proc = open(srun_cmd)

slurm_spec_regex = r"([\w]+):([\d]+)#(\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3})"
could_not_connect_regex = r"could not connect"
exiting_regex = r"exiting."
retry_delays = manager.retry_delays

t_start = time()
t_waited = round(Int, time() - t_start)
for i = 0:np - 1
println("connecting to worker $(i + 1) out of $np")
slurm_spec_match = nothing
if has_output_name
fn = job_output_template
else
fn = make_job_output_path(lpad(i, 4, "0"))
end
t0 = time()
for retry_delay in retry_delays
slurm_spec_match::Union{RegexMatch,Nothing} = nothing
worker_errors = String[]
if has_output_name
fn = job_output_template
else
fn = make_job_output_path(lpad(i, 4, "0"))
end
for retry_delay in push!(collect(retry_delays), 0)
t_waited = round(Int, time() - t_start)

# Wait for output log to be created and populated, then parse
if isfile(fn) && filesize(fn) > 0
slurm_spec_match = open(fn) do f
# Due to error and warning messages, the specification
# may not appear on the file's first line
for line in eachline(f)
re_match = match(slurm_spec_regex, line)
if re_match !== nothing
return re_match # only returns from do-block

if isfile(fn)
if filesize(fn) > 0
open(fn) do f
# Due to error and warning messages, the specification
# may not appear on the file's first line
for line in eachline(f)
re_match = match(slurm_spec_regex, line)
if !isnothing(re_match)
slurm_spec_match = re_match
end
for expr in [could_not_connect_regex, exiting_regex]
if !isnothing(match(expr, line))
slurm_spec_match = nothing
push!(worker_errors, line)
end
end
end
end
end
if slurm_spec_match !== nothing
break # break if specification found
if !isempty(worker_errors) || !isnothing(slurm_spec_match)
break # break if error or specification found
else
@info "Worker $i (after $t_waited s): Output file found, but no connection details yet"
end
else
@info "Worker $i (after $t_waited s): No output file \"$fn\" yet"
end

# Sleep for some time to limit ressource usage while waiting for the job to start
sleep(retry_delay)
end

if slurm_spec_match === nothing
throw(SlurmException("Timeout while trying to connect to worker"))
if !isempty(worker_errors)
throw(SlurmException("Worker $i failed after $t_waited s: $(join(worker_errors, " "))"))
elseif isnothing(slurm_spec_match)
throw(SlurmException("Timeout after $t_waited s while waiting for worker $i to get ready."))
end

config = WorkerConfig()
config.port = parse(Int, slurm_spec_match[2])
config.host = strip(slurm_spec_match[3])
@info "Worker $i ready after $t_waited s on host $(config.host), port $(config.port)"
# Keep a reference to the proc, so it's properly closed once
# the last worker exits.
config.userdata = srun_proc
push!(instances_arr, config)
notify(c)
end
catch e
println("Error launching Slurm job:")
@error "Error launching Slurm job"
rethrow(e)
end
end
Expand Down