Skip to content

Commit

Permalink
Rewrite fork adapter to use Singularity in PID namespace mode
Browse files Browse the repository at this point in the history
- Adds Singularity to ensure that daemons are killed
- Improves timeout
- Adds SSH option to improve user experience on round robin systems
  • Loading branch information
Morgan Rodgers committed Sep 24, 2019
1 parent 03d3efb commit f2b4b55
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 66 deletions.
50 changes: 34 additions & 16 deletions lib/ood_core/job/adapters/fork.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,40 @@ class Factory

# Build the Fork adapter from a configuration
# @param config [#to_h] the configuration for job adapter
# @option config [Object] :hosts (nil) The list of permissable hosts
# @option config [Object] :debug (false) Use the adapter in a debug mode
# @option config [Object] :max_timeout (nil) The longest 'wall_clock' permissible
# @option config [Object] :singularity_bin ('/usr/bin/singularity') The path to the Singularity executable
# @option config [Object] :singularity_bindpath ('/etc,/media,/mnt,/opt,/srv,/usr,/var,/users') A comma delimited list of paths to bind between the host and the guest
# @option config [Object] :singularity_image The path to the Singularity image to use
# @option config [Object] :ssh_hosts (nil) The list of permissable hosts, defaults to :submit_host
# @option config [Object] :strict_host_checking (true) Set to false to disable strict host checking and updating the known_hosts file
# @option config [Object] :submit_host The SSH target to connect to, may be the head of a round-robin
# @option config [Object] :tmux_bin ('/usr/bin/tmux') The path to the Tmux executable
def self.build_fork(config)
c = config.to_h.symbolize_keys
debug = c.fetch(:debug, false)
max_timeout = c.fetch(:max_timeout, nil)
tmux_bin = c.fetch(:tmux_bin, '/usr/bin/tmux')
singularity_bin = c.fetch(:singularity_bin, '/usr/bin/singularity')
singularity_bindpath = c.fetch(:singularity_bindpath, '/etc,/media,/mnt,/opt,/srv,/usr,/var,/users')
singularity_image = c[:singularity_image]
ssh_hosts = c.fetch(:ssh_hosts, [c[:submit_host]])
strict_host_checking = c.fetch(:strict_host_checking, true)
submit_host = c[:submit_host]
ssh_hosts = c.fetch(:ssh_hosts, [submit_host])
debug = c.fetch(:debug, false)
tmux_bin = c.fetch(:tmux_bin, '/usr/bin/tmux')

Adapters::Fork.new(
ssh_hosts: ssh_hosts,
forker: Adapters::Fork::Forker.new(
launcher: Adapters::Fork::Launcher.new(
debug: debug,
max_timeout: max_timeout,
singularity_bin: singularity_bin,
singularity_bindpath: singularity_bindpath, # '/etc,/media,/mnt,/opt,/srv,/usr,/var,/users',
singularity_image: singularity_image,
ssh_hosts: ssh_hosts,
strict_host_checking: strict_host_checking,
submit_host: submit_host,
tmux_bin: tmux_bin,
),
)
)
end
end
Expand All @@ -38,10 +54,10 @@ module Adapters
class Fork < Adapter
using Refinements::ArrayExtensions

require "ood_core/job/adapters/fork/forker"
require "ood_core/job/adapters/fork/launcher"

def initialize(ssh_hosts:, forker:)
@forker = forker
def initialize(ssh_hosts:, launcher:)
@launcher = launcher
@ssh_hosts = Set.new(ssh_hosts)
end

Expand Down Expand Up @@ -69,7 +85,7 @@ def initialize(ssh_hosts:, forker:)
# execution after dependent jobs have terminated
# @return [String] the job id returned after successfully submitting a job
def submit(script, after: [], afterok: [], afternotok: [], afterany: [])
@forker.start_remote_tmux_session(script)
@launcher.start_remote_session(script)
end

# Retrieve info for all jobs from the resource manager
Expand All @@ -84,7 +100,9 @@ def submit(script, after: [], afterok: [], afternotok: [], afterany: [])
# adapters can get by without populating the entire Info object
# @return [Array<Info>] information describing submitted jobs
def info_all(attrs: nil, host: nil)
@forker.list_remote_tmux_sessions(host: host).map{
host_permitted?(host) if host

@launcher.list_remote_sessions(host: host).map{
|ls_output| ls_to_info(ls_output)
}
end
Expand Down Expand Up @@ -200,7 +218,7 @@ def release(id)
# @return [void]
def delete(id)
session_name, destination_host = *id.split('@')
@forker.stop_remote_tmux_session(session_name: session_name, hostname: destination_host)
@launcher.stop_remote_session(session_name, destination_host)
end

private
Expand All @@ -212,10 +230,10 @@ def host_permitted?(destination_host)
# Convert the returned Hash into an Info object
# TODO: walltime, submit time, allocated nodes, native...these are all definable, there may be others
def ls_to_info(ls_output)
Info.new(
id: ls_output[:id],
status: :running,
)
Info.new(
id: ls_output[:id],
status: :running,
)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,72 @@
# Object used for simplified communication SSH hosts
#
# @api private
class OodCore::Job::Adapters::Fork::Forker
class OodCore::Job::Adapters::Fork::Launcher
# The root exception class that all Fork adapter-specific exceptions inherit
# from
class Error < StandardError; end

UNIT_SEPARATOR = "\x1F"

# @param debug Whether the adapter should be used in debug mode
# @param max_timeout [#to_i] A period after which the job should be killed or nil
# @param singularity_bin Path to the Singularity executable
# @param singularity_bindpath A comma delimited string of host paths to bindmount into the guest; sets SINGULARITY_BINDPATH environment variable
# @param singularity_image [#to_s] Path to the Singularity image
# @param ssh_hosts List of hosts to check when scanning for running jobs
# @param strict_host_checking Allow SSH to perform strict host checking
# @param submit_host The SSH-able host
# @param tmux_bin [#to_s] Path to the tmux executable
# @param timeout [#to_i] A period after which the job should be killed or nil
def initialize(tmux_bin:, max_timeout: nil, ssh_hosts:, submit_host:, debug: false, **_)
def initialize(
debug: false,
max_timeout: nil,
singularity_bin:,
singularity_bindpath: '/etc,/media,/mnt,/opt,/srv,/usr,/var,/users',
singularity_image:,
ssh_hosts:,
strict_host_checking: false,
submit_host:,
tmux_bin:,
**_
)
@debug = !! debug
@max_timeout = max_timeout.to_i
@session_name_label = 'launched-by-ondemand'
@singularity_bin = Pathname.new(singularity_bin)
@singularity_bindpath = singularity_bindpath.to_s
@singularity_image = Pathname.new(singularity_image)
@ssh_hosts = ssh_hosts
@strict_host_checking = strict_host_checking
@submit_host = submit_host
@tmux_bin = Pathname.new(tmux_bin)
@tmux_bin = tmux_bin
@username = Etc.getlogin
end

# @param hostname [#to_s] The hostname to submit the work to
# @param script [OodCore::Job::Script] The script object defining the work
def start_remote_tmux_session(script)
def start_remote_session(script)
cmd = ssh_cmd(@submit_host)
session_name = unique_session_name

session_name = unique_session_name
output = call(*cmd, stdin: wrapped_script(script, session_name))
hostname = output.split("\n").first
hostname, pid = output.strip.split(UNIT_SEPARATOR)

"#{session_name}@#{hostname}"
end

def stop_remote_tmux_session(hostname:, session_name:)
def stop_remote_session(session_name, hostname)
cmd = ssh_cmd(hostname) + [@tmux_bin, 'kill-session', '-t', session_name]
call(*cmd)
rescue Error => e
# The Tmux server not running is not an error
raise e unless e.message.include?('failed to connect to server')
raise e unless (
# The tmux server not running is not an error
e.message.include?('failed to connect to server') ||
# The session not being found is not an error
e.message.include?("session not found: #{@session_name_label}")
)
end

def list_remote_tmux_sessions(host: nil)
def list_remote_sessions(host: nil)
host_list = (host) ? [host] : @ssh_hosts

host_list.map {
Expand All @@ -67,8 +93,25 @@ def call(cmd, *args, env: {}, stdin: "")
# The SSH invocation to send a command
# -t Force pseudo-terminal allocation (required to allow tmux to run)
# -o BatchMode=yes (set mode to be non-interactive)
# if ! strict_host_checking
# -o UserKnownHostsFile=/dev/null (do not update the user's known hosts file)
# -o StrictHostKeyChecking=no (do no check the user's known hosts file)
def ssh_cmd(destination_host)
['ssh', '-t', '-o', 'BatchMode=yes', "#{@username}@#{destination_host}"]
if @strict_host_checking
[
'ssh', '-t',
'-o', 'BatchMode=yes',
"#{@username}@#{destination_host}"
]
else
[
'ssh', '-t',
'-o', 'BatchMode=yes',
'-o', 'UserKnownHostsFile=/dev/null',
'-o', 'StrictHostKeyChecking=no',
"#{@username}@#{destination_host}"
]
end
end

# Wraps a user-provided script into a Tmux invocation
Expand All @@ -81,44 +124,37 @@ def wrapped_script(script, session_name)
'debug' => @debug,
'environment' => export_env(script.job_environment),
'error_path' => (script.error_path) ? script.error_path.to_s : '/dev/null',
'job_name' => script.job_name.to_s,
'output_path' => (script.output_path) ? script.output_path.to_s : '/dev/null',
'script_content' => script.content,
'script_timeout' => script_timeout(script),
'session_name' => session_name,
'timeout_cmd' => timeout_killer_cmd(script.wall_time),
'singularity_bin' => @singularity_bin,
'singularity_image' => @singularity_image,
'tmux_bin' => @tmux_bin,
}.each{
|key, value| bnd.local_variable_set(key, value)
}
})
end

# Nuke the current process after @timeout seconds
def timeout_killer_cmd(script_timeout)
if @max_timeout == 0
''
else
# TODO: Handle requested timeout that's longer than system configured timeout by raising Error
timeout = (@max_timeout < script_timeout.to_i ) ? @max_timeout : script_timeout.to_i
current_pid = Shellwords.escape('$$')
<<~HEREDOC
{
sleep #{timeout}
kill -9 #{current_pid}
} &
HEREDOC
end
# Generate the environment export block for this script
def export_env(environment)
(environment ? environment : {}).tap{
|hsh| hsh['SINGULARITY_BINDPATH'] = @singularity_bindpath if @singularity_bindpath
}.map{
|key, value| "export #{key}=#{Shellwords.escape(value)}"
}.sort.join("\n")
end

def unique_session_name
"#{@session_name_label}-#{SecureRandom.uuid}"
def script_timeout(script)
return [script.wall_time.to_i, @max_timeout].min unless @max_timeout == 0

[script.wall_time.to_i, 0].max
end

# Generate the environment export block for this script
def export_env(environment)
# TODO: Need to confirm that quotes are handled properly for value
(environment ? environment : {}).map{
|key, value| "export #{key}=#{Shellwords.escape(value)}"
}.join("\n")
def unique_session_name
"#{@session_name_label}-#{SecureRandom.uuid}"
end

# List all Tmux sessions on destination_host started by this adapter
Expand All @@ -127,10 +163,10 @@ def list_remote_tmux_session(destination_host)
# Note that the tmux variable substitution looks like Ruby string sub,
# these must either be single quoted strings or Ruby-string escaped as well
format_str = Shellwords.escape(
['#{session_name}', '#{session_created}'].join(UNIT_SEPARATOR)
['#{session_name}', '#{session_created}', '#{pane_pid}'].join(UNIT_SEPARATOR)
)
keys = [:session_name, :session_created]
cmd = ssh_cmd(destination_host) + ['tmux', 'list-sessions', '-F', format_str]
keys = [:session_name, :session_created, :session_pid]
cmd = ssh_cmd(destination_host) + ['tmux', 'list-panes', '-F', format_str]

call(*cmd).split(
"\n"
Expand All @@ -143,7 +179,7 @@ def list_remote_tmux_session(destination_host)
|session_hash| session_hash[:session_name].start_with?(@session_name_label)
}
rescue Error => e
# The Tmux server not running is not an error
# The tmux server not running is not an error
raise e unless e.message.include?('failed to connect to server')
[]
end
Expand Down
30 changes: 20 additions & 10 deletions lib/ood_core/job/adapters/fork/script_wrapper.erb.sh
Original file line number Diff line number Diff line change
@@ -1,35 +1,45 @@
#!/bin/bash
hostname

# Put the script into a temp file on localhost
<% if debug %>
tmp_file=$(mktemp -p "$HOME")
singularity_tmp_file=$(mktemp -p "$HOME" --suffix '_sing')
tmux_tmp_file=$(mktemp -p "$HOME" --suffix "_tmux")
<% else %>
tmp_file=$(mktemp)
singularity_tmp_file=$(mktemp)
tmux_tmp_file=$(mktemp)
<% end %>
cat << HEREDOC > "$tmp_file"

# Create an executable to run in a tmux session
cat << TMUX_LAUNCHER > "$tmux_tmp_file"
#!/bin/bash
<%= cd_to_workdir %>
<%= environment %>
<%= timeout_cmd %>
<%= environment %>
# Redirect stdout and stderr to separate files for all commands run within the curly braces
# https://unix.stackexchange.com/a/6431/204548
# Swap sterr and stdout after stdout has been redirected
# https://unix.stackexchange.com/a/61932/204548
({
<%= script_content %>
timeout <%= script_timeout %>s <%= singularity_bin %> exec --pid <%= singularity_image %> /bin/bash --login $singularity_tmp_file
} | tee "<%= output_path %>") 3>&1 1>&2 2>&3 | tee "<%= error_path %>"
# Exit the tmux session when we are complete
exit 0
HEREDOC
TMUX_LAUNCHER

# Create an executable for Singularity to run
cat << SINGULARITY_LAUNCHER > "$singularity_tmp_file"
<%= script_content %>
SINGULARITY_LAUNCHER

# Run the script inside a tmux session
chmod +x "$tmp_file"
<%= tmux_bin %> new-session -d -s "<%= session_name %>" "$tmp_file"
chmod +x "$singularity_tmp_file"
chmod +x "$tmux_tmp_file"
<%= tmux_bin %> new-session -d -s "<%= session_name %>" "$tmux_tmp_file"

# Remove the file
<% if ! debug %>
# Wait 1 second to ensure that tmux session has started before the file is removed
(sleep 1; rm -f "$tmp_file") &
(sleep 1; rm -f "$tmux_tmp_file"; rm -f "$singularity_tmp_file") &
<% end %>

0 comments on commit f2b4b55

Please sign in to comment.