Skip to content

Commit

Permalink
Improve fork adapter implementation
Browse files Browse the repository at this point in the history
- Fix implementation of status to only check a single SSH host
- Fix Singularity container overriding
- Improve doc strings
- Permit overriding SINGULARITY_BINDPATH
- Raise error when downstream developer attempts to use unsupported #submit features
  • Loading branch information
Morgan Rodgers committed Sep 27, 2019
1 parent f2b4b55 commit a083526
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 70 deletions.
119 changes: 54 additions & 65 deletions lib/ood_core/job/adapters/fork.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,74 +62,52 @@ def initialize(ssh_hosts:, launcher:)
end

# Submit a job with the attributes defined in the job template instance
# @abstract Subclass is expected to implement {#submit}
# @raise [NotImplementedError] if subclass did not define {#submit}
# @example Submit job template to cluster
# solver_id = job_adapter.submit(solver_script)
# #=> "1234.server"
# @example Submit job that depends on previous job
# post_id = job_adapter.submit(
# post_script,
# afterok: solver_id
# )
# #=> "1235.server"
# @param script [Script] script object that describes the
# script and attributes for the submitted job
# @param after [#to_s, Array<#to_s>] this job may be scheduled for execution
# at any point after dependent jobs have started execution
# @param afterok [#to_s, Array<#to_s>] this job may be scheduled for
# execution only after dependent jobs have terminated with no errors
# @param afternotok [#to_s, Array<#to_s>] this job may be scheduled for
# execution only after dependent jobs have terminated with errors
# @param afterany [#to_s, Array<#to_s>] this job may be scheduled for
# execution after dependent jobs have terminated
# @return [String] the job id returned after successfully submitting a job
# @param script [Script] script object that describes the script and
# attributes for the submitted job
# @param after [#to_s, Array<#to_s>] No scheduling is available is used; setting raises JobAdapterError
# @param afterok [#to_s, Array<#to_s>] No scheduling is available is used; setting raises JobAdapterError
# @param afternotok [#to_s, Array<#to_s>] No scheduling is available is used; setting raises JobAdapterError
# @param afterany [#to_s, Array<#to_s>] No scheduling is available is used; setting raises JobAdapterError
# @raise [JobAdapterError] if something goes wrong submitting a job
# @return [String] the job id returned after successfully submitting a
# job
# @see Adapter#submit
def submit(script, after: [], afterok: [], afternotok: [], afterany: [])
unless (after.empty? && afterok.empty? && afternotok.empty? && afterany.empty?)
raise JobAdapterError 'Scheduling subsequent jobs is not available.'
end

@launcher.start_remote_session(script)
rescue Launcher::Error => e
raise JobAdapterError e.message
end

# Retrieve info for all jobs from the resource manager
# @abstract Subclass is expected to implement {#info_all}
# @raise [NotImplementedError] if subclass did not define {#info_all}
# @param attrs [Array<symbol>] defaults to nil (and all attrs are provided)
# This array specifies only attrs you want, in addition to id and status.
# If an array, the Info object that is returned to you is not guarenteed
# to have a value for any attr besides the ones specified and id and status.
#
# For certain adapters this may speed up the response since
# adapters can get by without populating the entire Info object
# @raise [JobAdapterError] if something goes wrong getting job info
# @return [Array<Info>] information describing submitted jobs
# @see Adapter#info_all
def info_all(attrs: nil, host: nil)
host_permitted?(host) if host

@launcher.list_remote_sessions(host: host).map{
|ls_output| ls_to_info(ls_output)
}
rescue Launcher::Error => e
raise JobAdapterError e.message
end

# Retrieve info for all jobs for a given owner or owners from the
# resource manager
# Note: owner and attrs are present only to complete the interface and are ignored
# @param owner [#to_s, Array<#to_s>] the owner(s) of the jobs
# @param attrs [Array<symbol>] defaults to nil (and all attrs are provided)
# This array specifies only attrs you want, in addition to id and status.
# If an array, the Info object that is returned to you is not guarenteed
# to have a value for any attr besides the ones specified and id and status.
#
# For certain adapters this may speed up the response since
# adapters can get by without populating the entire Info object
# @raise [JobAdapterError] if something goes wrong getting job info
# @return [Array<Info>] information describing submitted jobs
def info_where_owner(owner: nil, attrs: nil)
info_all
end

# Iterate over each job Info object
# @param attrs [Array<symbol>] defaults to nil (and all attrs are provided)
# This array specifies only attrs you want, in addition to id and status.
# If an array, the Info object that is returned to you is not guarenteed
# to have a value for any attr besides the ones specified and id and status.
#
# For certain adapters this may speed up the response since
# adapters can get by without populating the entire Info object
# @param attrs [Array<symbol>] attrs is present only to complete the interface and is ignored
# @yield [Info] of each job to block
# @return [Enumerator] if no block given
def info_all_each(attrs: nil)
Expand All @@ -141,14 +119,8 @@ def info_all_each(attrs: nil)
end

# Iterate over each job Info object
# @param owner [#to_s, Array<#to_s>] the owner(s) of the jobs
# @param attrs [Array<symbol>] defaults to nil (and all attrs are provided)
# This array specifies only attrs you want, in addition to id and status.
# If an array, the Info object that is returned to you is not guarenteed
# to have a value for any attr besides the ones specified and id and status.
#
# For certain adapters this may speed up the response since
# adapters can get by without populating the entire Info object
# @param owner [#to_s, Array<#to_s>] owner is present only to complete the interface and is ignored
# @param attrs [Array<symbol>] attrs is present only to complete the interface and is ignored
# @yield [Info] of each job to block
# @return [Enumerator] if no block given
def info_where_owner_each(owner, attrs: nil)
Expand All @@ -160,21 +132,22 @@ def info_where_owner_each(owner, attrs: nil)
end

# Whether the adapter supports job arrays
# @return [Boolean] - assumes true; but can be overridden by adapters that
# explicitly do not
# @return [Boolean] - false
def supports_job_arrays?
false
end

# Retrieve job info from the resource manager
# @abstract Subclass is expected to implement {#info}
# @raise [NotImplementedError] if subclass did not define {#info}
# Retrieve job info from the SSH host
# @param id [#to_s] the id of the job
# @raise [JobAdapterError] if something goes wrong getting job info
# @return [Info] information describing submitted job
# @see Adapter#info
def info(id)
_, host = *(id.split('@'))
job = info_all(host: host).select{|info| info.id == id}.first
(job) ? job : Info.new(id: id, status: :completed)
rescue Launcher::Error => e
raise JobAdapterError e.message
end

# Retrieve job status from resource manager
Expand All @@ -184,11 +157,11 @@ def info(id)
# @param id [#to_s] the id of the job
# @return [Status] status of job
def status(id)
if info_all.select{|info| info.id == id}.count == 1
:running
else
:completed
end
_, host = *(id.split('@'))
job = info_all(host: host).select{|info| info.id == id}.first
(job) ? :running : :completed
rescue Launcher::Error => e
raise JobAdapterError e.message
end

# Put the submitted job on hold
Expand All @@ -207,7 +180,7 @@ def hold(id)
# @param id [#to_s] the id of the job
# @return [void]
def release(id)
# Consider ssh HOST fg job ID?
# Consider sending SIGCONT
raise NotImplementedError, "subclass did not define #release"
end

Expand All @@ -219,6 +192,8 @@ def release(id)
def delete(id)
session_name, destination_host = *id.split('@')
@launcher.stop_remote_session(session_name, destination_host)
rescue Launcher::Error => e
raise JobAdapterError e.message
end

private
Expand All @@ -228,11 +203,25 @@ def host_permitted?(destination_host)
end

# Convert the returned Hash into an Info object
# TODO: walltime, submit time, allocated nodes, native...these are all definable, there may be others
def ls_to_info(ls_output)
started = ls_output[:session_created].to_i
now = Time.now.to_i
ellapsed = now - started
Info.new(
accounting_id: nil,
allocated_nodes: [NodeInfo.new(name: ls_output[:destination_host], procs: 1)],
cpu_time: ellapsed,
dispatch_time: started,
id: ls_output[:id],
job_name: nil, # TODO
job_owner: Etc.getlogin,
native: ls_output,
procs: 1,
queue_name: "Fork adapter for #{@submit_host}",
status: :running,
submission_time: ellapsed,
submit_host: @submit_host,
wallclock_time: ellapsed
)
end
end
Expand Down
22 changes: 18 additions & 4 deletions lib/ood_core/job/adapters/fork/launcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def initialize(
@max_timeout = max_timeout.to_i
@session_name_label = 'launched-by-ondemand'
@singularity_bin = Pathname.new(singularity_bin)
@singularity_bindpath = singularity_bindpath.to_s
@singularity_image = Pathname.new(singularity_image)
@site_singularity_bindpath = singularity_bindpath.to_s
@default_singularity_image = Pathname.new(singularity_image)
@ssh_hosts = ssh_hosts
@strict_host_checking = strict_host_checking
@submit_host = submit_host
Expand Down Expand Up @@ -130,7 +130,7 @@ def wrapped_script(script, session_name)
'script_timeout' => script_timeout(script),
'session_name' => session_name,
'singularity_bin' => @singularity_bin,
'singularity_image' => @singularity_image,
'singularity_image' => singularity_image(script),
'tmux_bin' => @tmux_bin,
}.each{
|key, value| bnd.local_variable_set(key, value)
Expand All @@ -141,12 +141,26 @@ def wrapped_script(script, session_name)
# Generate the environment export block for this script
def export_env(environment)
(environment ? environment : {}).tap{
|hsh| hsh['SINGULARITY_BINDPATH'] = @singularity_bindpath if @singularity_bindpath
|hsh|
hsh['SINGULARITY_BINDPATH'] = singularity_bindpath(environment)
hsh.delete('SINGULARITY_CONTAINER')
}.map{
|key, value| "export #{key}=#{Shellwords.escape(value)}"
}.sort.join("\n")
end

def singularity_image(script)
image = script.job_environment['SINGULARITY_CONTAINER']
(image) ? image : @default_singularity_image
end

def singularity_bindpath(environment)
script_bindpath = environment['SINGULARITY_BINDPATH']
return script_bindpath if script_bindpath

@site_singularity_bindpath
end

def script_timeout(script)
return [script.wall_time.to_i, @max_timeout].min unless @max_timeout == 0

Expand Down
3 changes: 2 additions & 1 deletion lib/ood_core/job/adapters/fork/script_wrapper.erb.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ chmod +x "$tmux_tmp_file"
# Remove the file
<% if ! debug %>
# Wait 1 second to ensure that tmux session has started before the file is removed
(sleep 1; rm -f "$tmux_tmp_file"; rm -f "$singularity_tmp_file") &
sleep 1
rm -f "$tmux_tmp_file"; rm -f "$singularity_tmp_file"
<% end %>

0 comments on commit a083526

Please sign in to comment.