Skip to content

Commit

Permalink
Finished tests, and added missing script functionality
Browse files Browse the repository at this point in the history
Added:
    - arguments
    - emails on started and terminated
    - tests
  • Loading branch information
Morgan Rodgers committed Oct 1, 2019
1 parent e84181c commit 05b5fe0
Show file tree
Hide file tree
Showing 6 changed files with 549 additions and 20 deletions.
28 changes: 18 additions & 10 deletions lib/ood_core/job/adapters/fork.rb
Expand Up @@ -74,12 +74,12 @@ def initialize(ssh_hosts:, launcher:)
# @see Adapter#submit
def submit(script, after: [], afterok: [], afternotok: [], afterany: [])
unless (after.empty? && afterok.empty? && afternotok.empty? && afterany.empty?)
raise JobAdapterError 'Scheduling subsequent jobs is not available.'
raise JobAdapterError, 'Scheduling subsequent jobs is not available.'
end

@launcher.start_remote_session(script)
rescue Launcher::Error => e
raise JobAdapterError e.message
raise JobAdapterError, e.message
end

# Retrieve info for all jobs from the resource manager
Expand All @@ -93,12 +93,13 @@ def info_all(attrs: nil, host: nil)
|ls_output| ls_to_info(ls_output)
}
rescue Launcher::Error => e
raise JobAdapterError e.message
raise JobAdapterError, e.message
end

# Retrieve info for all jobs for a given owner or owners from the
# resource manager
# Note: owner and attrs are present only to complete the interface and are ignored
# Note: since this API is used in production no errors or warnings are thrown / issued
# @param owner [#to_s, Array<#to_s>] the owner(s) of the jobs
# @raise [JobAdapterError] if something goes wrong getting job info
# @return [Array<Info>] information describing submitted jobs
Expand Down Expand Up @@ -143,11 +144,11 @@ def supports_job_arrays?
# @return [Info] information describing submitted job
# @see Adapter#info
def info(id)
_, host = *(id.split('@'))
_, host = parse_job_id(id)
job = info_all(host: host).select{|info| info.id == id}.first
(job) ? job : Info.new(id: id, status: :completed)
rescue Launcher::Error => e
raise JobAdapterError e.message
raise JobAdapterError, e.message
end

# Retrieve job status from resource manager
Expand All @@ -157,11 +158,12 @@ def info(id)
# @param id [#to_s] the id of the job
# @return [Status] status of job
def status(id)
_, host = *(id.split('@'))
_, host = parse_job_id(id)
job = info_all(host: host).select{|info| info.id == id}.first
(job) ? :running : :completed

Status.new(state: (job) ? :running : :completed)
rescue Launcher::Error => e
raise JobAdapterError e.message
raise JobAdapterError, e.message
end

# Put the submitted job on hold
Expand Down Expand Up @@ -190,10 +192,10 @@ def release(id)
# @param id [#to_s] the id of the job
# @return [void]
def delete(id)
session_name, destination_host = *id.split('@')
session_name, destination_host = parse_job_id(id)
@launcher.stop_remote_session(session_name, destination_host)
rescue Launcher::Error => e
raise JobAdapterError e.message
raise JobAdapterError, e.message
end

private
Expand All @@ -202,6 +204,12 @@ def host_permitted?(destination_host)
raise JobAdapterError, "Requested destination host (#{destination_host}) not permitted" unless @ssh_hosts.include?(destination_host)
end

def parse_job_id(id)
raise JobAdapterError, "#{id} is not a valid Fork adapter id because it is missing the '@'." unless id.include?('@')

return id.split('@')
end

# Convert the returned Hash into an Info object
def ls_to_info(ls_output)
started = ls_output[:session_created].to_i
Expand Down
28 changes: 27 additions & 1 deletion lib/ood_core/job/adapters/fork/launcher.rb
Expand Up @@ -4,6 +4,7 @@
require 'securerandom'
require 'shellwords'
require 'time'

# Object used for simplified communication SSH hosts
#
# @api private
Expand Down Expand Up @@ -125,11 +126,14 @@ def ssh_cmd(destination_host)
# Wraps a user-provided script into a Tmux invocation
def wrapped_script(script, session_name)
ERB.new(
File.read(Pathname.new(__dir__).join('script_wrapper.erb.sh'))
File.read(Pathname.new(__dir__).join('templates/script_wrapper.erb.sh'))
).result(binding.tap {|bnd|
{
'arguments' => script_arguments(script),
'cd_to_workdir' => (script.workdir) ? "cd #{script.workdir}" : '',
'debug' => debug,
'email_on_terminated' => script_email_on_event(script, 'terminated'),
'email_on_start' => script_email_on_event(script, 'started'),
'environment' => export_env(script.job_environment),
'error_path' => (script.error_path) ? script.error_path.to_s : '/dev/null',
'job_name' => script.job_name.to_s,
Expand Down Expand Up @@ -177,6 +181,28 @@ def script_timeout(script)
wall_time
end

def script_arguments(script)
return '' unless script.args

Shellwords.join(script.args)
end

def script_email_on_event(script, event)
return false unless script.email && script.send("email_on_#{event}")

ERB.new(
File.read(Pathname.new(__dir__).join('templates/email.erb.sh'))
).result(binding.tap {|bnd|
{
'email_recipients' => script.email.map{|addr| Shellwords.escape(addr)}.join(', '),
'job_name' => (script.job_name) ? script.job_name : 'Fork_Adapter_Job',
'job_status' => event
}.each{
|key, value| bnd.local_variable_set(key, value)
}
})
end

def unique_session_name
"#{session_name_label}-#{SecureRandom.uuid}"
end
Expand Down
9 changes: 9 additions & 0 deletions lib/ood_core/job/adapters/fork/templates/email.erb.sh
@@ -0,0 +1,9 @@
if command -v mail; then
cat << EMAIL_CONTENT | mail -s "Job <%= job_name %> has <%= job_status %>" <%= email_recipients %>
Greetings,
Your job <%= job_name %> has <%= job_status %>.
- The OnDemand Fork Adapter
EMAIL_CONTENT
fi
Expand Up @@ -13,16 +13,32 @@ tmux_tmp_file=$(mktemp)
# Create an executable to run in a tmux session
cat << TMUX_LAUNCHER > "$tmux_tmp_file"
#!/bin/bash
<% if email_on_terminated %>
exit_script() {
<%# DO NOT INDENT email_on_terminated may have HEREDOCS %>
<%= email_on_terminated %>
trap - SIGINT SIGTERM # clear the trap
kill -- -$$ # Sends SIGTERM to child/sub processes
}
trap exit_script SIGINT SIGTERM
<% end %>
<%= cd_to_workdir %>
<%= environment %>
<%= environment %>
<%= email_on_start %>
# Redirect stdout and stderr to separate files for all commands run within the curly braces
# https://unix.stackexchange.com/a/6431/204548
# Swap sterr and stdout after stdout has been redirected
# https://unix.stackexchange.com/a/61932/204548
OUTPUT_PATH=<%= output_path %>
ERROR_PATH=<%= error_path %>
({
timeout <%= script_timeout %>s <%= singularity_bin %> exec --pid <%= singularity_image %> /bin/bash --login $singularity_tmp_file
} | tee "<%= output_path %>") 3>&1 1>&2 2>&3 | tee "<%= error_path %>"
timeout <%= script_timeout %>s <%= singularity_bin %> exec --pid <%= singularity_image %> /bin/bash --login $singularity_tmp_file <%= arguments %>
} | tee "\$OUTPUT_PATH") 3>&1 1>&2 2>&3 | tee "\$ERROR_PATH"
<%= email_on_terminated %>
# Exit the tmux session when we are complete
exit 0
Expand Down

0 comments on commit 05b5fe0

Please sign in to comment.