From a083526dc5f28d06d75235933dc6991b291df0f2 Mon Sep 17 00:00:00 2001 From: Morgan Rodgers Date: Fri, 27 Sep 2019 11:30:32 -0400 Subject: [PATCH] Improve fork adapter implementation - Fix implementation of status to only check a single SSH host - Fix Singularity container overriding - Improve doc strings - Permit overriding SINGULARITY_BINDPATH - Raise error when downstream developer attempts to use unsupported #submit features --- lib/ood_core/job/adapters/fork.rb | 119 ++++++++---------- lib/ood_core/job/adapters/fork/launcher.rb | 22 +++- .../job/adapters/fork/script_wrapper.erb.sh | 3 +- 3 files changed, 74 insertions(+), 70 deletions(-) diff --git a/lib/ood_core/job/adapters/fork.rb b/lib/ood_core/job/adapters/fork.rb index 6335f0615..fc3434392 100644 --- a/lib/ood_core/job/adapters/fork.rb +++ b/lib/ood_core/job/adapters/fork.rb @@ -62,74 +62,52 @@ def initialize(ssh_hosts:, launcher:) end # Submit a job with the attributes defined in the job template instance - # @abstract Subclass is expected to implement {#submit} - # @raise [NotImplementedError] if subclass did not define {#submit} - # @example Submit job template to cluster - # solver_id = job_adapter.submit(solver_script) - # #=> "1234.server" - # @example Submit job that depends on previous job - # post_id = job_adapter.submit( - # post_script, - # afterok: solver_id - # ) - # #=> "1235.server" - # @param script [Script] script object that describes the - # script and attributes for the submitted job - # @param after [#to_s, Array<#to_s>] this job may be scheduled for execution - # at any point after dependent jobs have started execution - # @param afterok [#to_s, Array<#to_s>] this job may be scheduled for - # execution only after dependent jobs have terminated with no errors - # @param afternotok [#to_s, Array<#to_s>] this job may be scheduled for - # execution only after dependent jobs have terminated with errors - # @param afterany [#to_s, Array<#to_s>] this job may be scheduled for - # execution after dependent jobs have terminated - # @return [String] the job id returned after successfully submitting a job + # @param script [Script] script object that describes the script and + # attributes for the submitted job + # @param after [#to_s, Array<#to_s>] No scheduling is available is used; setting raises JobAdapterError + # @param afterok [#to_s, Array<#to_s>] No scheduling is available is used; setting raises JobAdapterError + # @param afternotok [#to_s, Array<#to_s>] No scheduling is available is used; setting raises JobAdapterError + # @param afterany [#to_s, Array<#to_s>] No scheduling is available is used; setting raises JobAdapterError + # @raise [JobAdapterError] if something goes wrong submitting a job + # @return [String] the job id returned after successfully submitting a + # job + # @see Adapter#submit def submit(script, after: [], afterok: [], afternotok: [], afterany: []) + unless (after.empty? && afterok.empty? && afternotok.empty? && afterany.empty?) + raise JobAdapterError 'Scheduling subsequent jobs is not available.' + end + @launcher.start_remote_session(script) + rescue Launcher::Error => e + raise JobAdapterError e.message end # Retrieve info for all jobs from the resource manager - # @abstract Subclass is expected to implement {#info_all} - # @raise [NotImplementedError] if subclass did not define {#info_all} - # @param attrs [Array] defaults to nil (and all attrs are provided) - # This array specifies only attrs you want, in addition to id and status. - # If an array, the Info object that is returned to you is not guarenteed - # to have a value for any attr besides the ones specified and id and status. - # - # For certain adapters this may speed up the response since - # adapters can get by without populating the entire Info object + # @raise [JobAdapterError] if something goes wrong getting job info # @return [Array] information describing submitted jobs + # @see Adapter#info_all def info_all(attrs: nil, host: nil) host_permitted?(host) if host @launcher.list_remote_sessions(host: host).map{ |ls_output| ls_to_info(ls_output) } + rescue Launcher::Error => e + raise JobAdapterError e.message end # Retrieve info for all jobs for a given owner or owners from the # resource manager + # Note: owner and attrs are present only to complete the interface and are ignored # @param owner [#to_s, Array<#to_s>] the owner(s) of the jobs - # @param attrs [Array] defaults to nil (and all attrs are provided) - # This array specifies only attrs you want, in addition to id and status. - # If an array, the Info object that is returned to you is not guarenteed - # to have a value for any attr besides the ones specified and id and status. - # - # For certain adapters this may speed up the response since - # adapters can get by without populating the entire Info object + # @raise [JobAdapterError] if something goes wrong getting job info # @return [Array] information describing submitted jobs def info_where_owner(owner: nil, attrs: nil) info_all end # Iterate over each job Info object - # @param attrs [Array] defaults to nil (and all attrs are provided) - # This array specifies only attrs you want, in addition to id and status. - # If an array, the Info object that is returned to you is not guarenteed - # to have a value for any attr besides the ones specified and id and status. - # - # For certain adapters this may speed up the response since - # adapters can get by without populating the entire Info object + # @param attrs [Array] attrs is present only to complete the interface and is ignored # @yield [Info] of each job to block # @return [Enumerator] if no block given def info_all_each(attrs: nil) @@ -141,14 +119,8 @@ def info_all_each(attrs: nil) end # Iterate over each job Info object - # @param owner [#to_s, Array<#to_s>] the owner(s) of the jobs - # @param attrs [Array] defaults to nil (and all attrs are provided) - # This array specifies only attrs you want, in addition to id and status. - # If an array, the Info object that is returned to you is not guarenteed - # to have a value for any attr besides the ones specified and id and status. - # - # For certain adapters this may speed up the response since - # adapters can get by without populating the entire Info object + # @param owner [#to_s, Array<#to_s>] owner is present only to complete the interface and is ignored + # @param attrs [Array] attrs is present only to complete the interface and is ignored # @yield [Info] of each job to block # @return [Enumerator] if no block given def info_where_owner_each(owner, attrs: nil) @@ -160,21 +132,22 @@ def info_where_owner_each(owner, attrs: nil) end # Whether the adapter supports job arrays - # @return [Boolean] - assumes true; but can be overridden by adapters that - # explicitly do not + # @return [Boolean] - false def supports_job_arrays? false end - # Retrieve job info from the resource manager - # @abstract Subclass is expected to implement {#info} - # @raise [NotImplementedError] if subclass did not define {#info} + # Retrieve job info from the SSH host # @param id [#to_s] the id of the job + # @raise [JobAdapterError] if something goes wrong getting job info # @return [Info] information describing submitted job + # @see Adapter#info def info(id) _, host = *(id.split('@')) job = info_all(host: host).select{|info| info.id == id}.first (job) ? job : Info.new(id: id, status: :completed) + rescue Launcher::Error => e + raise JobAdapterError e.message end # Retrieve job status from resource manager @@ -184,11 +157,11 @@ def info(id) # @param id [#to_s] the id of the job # @return [Status] status of job def status(id) - if info_all.select{|info| info.id == id}.count == 1 - :running - else - :completed - end + _, host = *(id.split('@')) + job = info_all(host: host).select{|info| info.id == id}.first + (job) ? :running : :completed + rescue Launcher::Error => e + raise JobAdapterError e.message end # Put the submitted job on hold @@ -207,7 +180,7 @@ def hold(id) # @param id [#to_s] the id of the job # @return [void] def release(id) - # Consider ssh HOST fg job ID? + # Consider sending SIGCONT raise NotImplementedError, "subclass did not define #release" end @@ -219,6 +192,8 @@ def release(id) def delete(id) session_name, destination_host = *id.split('@') @launcher.stop_remote_session(session_name, destination_host) + rescue Launcher::Error => e + raise JobAdapterError e.message end private @@ -228,11 +203,25 @@ def host_permitted?(destination_host) end # Convert the returned Hash into an Info object - # TODO: walltime, submit time, allocated nodes, native...these are all definable, there may be others def ls_to_info(ls_output) + started = ls_output[:session_created].to_i + now = Time.now.to_i + ellapsed = now - started Info.new( + accounting_id: nil, + allocated_nodes: [NodeInfo.new(name: ls_output[:destination_host], procs: 1)], + cpu_time: ellapsed, + dispatch_time: started, id: ls_output[:id], + job_name: nil, # TODO + job_owner: Etc.getlogin, + native: ls_output, + procs: 1, + queue_name: "Fork adapter for #{@submit_host}", status: :running, + submission_time: ellapsed, + submit_host: @submit_host, + wallclock_time: ellapsed ) end end diff --git a/lib/ood_core/job/adapters/fork/launcher.rb b/lib/ood_core/job/adapters/fork/launcher.rb index 028ca38ca..eb53a84b0 100644 --- a/lib/ood_core/job/adapters/fork/launcher.rb +++ b/lib/ood_core/job/adapters/fork/launcher.rb @@ -39,8 +39,8 @@ def initialize( @max_timeout = max_timeout.to_i @session_name_label = 'launched-by-ondemand' @singularity_bin = Pathname.new(singularity_bin) - @singularity_bindpath = singularity_bindpath.to_s - @singularity_image = Pathname.new(singularity_image) + @site_singularity_bindpath = singularity_bindpath.to_s + @default_singularity_image = Pathname.new(singularity_image) @ssh_hosts = ssh_hosts @strict_host_checking = strict_host_checking @submit_host = submit_host @@ -130,7 +130,7 @@ def wrapped_script(script, session_name) 'script_timeout' => script_timeout(script), 'session_name' => session_name, 'singularity_bin' => @singularity_bin, - 'singularity_image' => @singularity_image, + 'singularity_image' => singularity_image(script), 'tmux_bin' => @tmux_bin, }.each{ |key, value| bnd.local_variable_set(key, value) @@ -141,12 +141,26 @@ def wrapped_script(script, session_name) # Generate the environment export block for this script def export_env(environment) (environment ? environment : {}).tap{ - |hsh| hsh['SINGULARITY_BINDPATH'] = @singularity_bindpath if @singularity_bindpath + |hsh| + hsh['SINGULARITY_BINDPATH'] = singularity_bindpath(environment) + hsh.delete('SINGULARITY_CONTAINER') }.map{ |key, value| "export #{key}=#{Shellwords.escape(value)}" }.sort.join("\n") end + def singularity_image(script) + image = script.job_environment['SINGULARITY_CONTAINER'] + (image) ? image : @default_singularity_image + end + + def singularity_bindpath(environment) + script_bindpath = environment['SINGULARITY_BINDPATH'] + return script_bindpath if script_bindpath + + @site_singularity_bindpath + end + def script_timeout(script) return [script.wall_time.to_i, @max_timeout].min unless @max_timeout == 0 diff --git a/lib/ood_core/job/adapters/fork/script_wrapper.erb.sh b/lib/ood_core/job/adapters/fork/script_wrapper.erb.sh index bf80243fd..f9459ba59 100644 --- a/lib/ood_core/job/adapters/fork/script_wrapper.erb.sh +++ b/lib/ood_core/job/adapters/fork/script_wrapper.erb.sh @@ -41,5 +41,6 @@ chmod +x "$tmux_tmp_file" # Remove the file <% if ! debug %> # Wait 1 second to ensure that tmux session has started before the file is removed -(sleep 1; rm -f "$tmux_tmp_file"; rm -f "$singularity_tmp_file") & +sleep 1 +rm -f "$tmux_tmp_file"; rm -f "$singularity_tmp_file" <% end %>