/
fork.rb
238 lines (211 loc) · 10 KB
/
fork.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
require "ood_core/refinements/hash_extensions"
require "ood_core/refinements/array_extensions"
require "ood_core/job/adapters/helper"
require "set"
module OodCore
module Job
class Factory
using Refinements::HashExtensions
# Build the Fork adapter from a configuration
# @param config [#to_h] the configuration for job adapter
# @option config [Object] :debug (false) Use the adapter in a debug mode
# @option config [Object] :max_timeout (nil) The longest 'wall_clock' permissible
# @option config [Object] :singularity_bin ('/usr/bin/singularity') The path to the Singularity executable
# @option config [Object] :singularity_bindpath ('/etc,/media,/mnt,/opt,/srv,/usr,/var,/users') A comma delimited list of paths to bind between the host and the guest
# @option config [Object] :singularity_image The path to the Singularity image to use
# @option config [Object] :ssh_hosts (nil) The list of permissable hosts, defaults to :submit_host
# @option config [Object] :strict_host_checking (true) Set to false to disable strict host checking and updating the known_hosts file
# @option config [Object] :submit_host The SSH target to connect to, may be the head of a round-robin
# @option config [Object] :tmux_bin ('/usr/bin/tmux') The path to the Tmux executable
def self.build_fork(config)
c = config.to_h.symbolize_keys
debug = c.fetch(:debug, false)
max_timeout = c.fetch(:max_timeout, nil)
singularity_bin = c.fetch(:singularity_bin, '/usr/bin/singularity')
singularity_bindpath = c.fetch(:singularity_bindpath, '/etc,/media,/mnt,/opt,/srv,/usr,/var,/users')
singularity_image = c[:singularity_image]
ssh_hosts = c.fetch(:ssh_hosts, [c[:submit_host]])
strict_host_checking = c.fetch(:strict_host_checking, true)
submit_host = c[:submit_host]
tmux_bin = c.fetch(:tmux_bin, '/usr/bin/tmux')
Adapters::Fork.new(
ssh_hosts: ssh_hosts,
launcher: Adapters::Fork::Launcher.new(
debug: debug,
max_timeout: max_timeout,
singularity_bin: singularity_bin,
singularity_bindpath: singularity_bindpath, # '/etc,/media,/mnt,/opt,/srv,/usr,/var,/users',
singularity_image: singularity_image,
ssh_hosts: ssh_hosts,
strict_host_checking: strict_host_checking,
submit_host: submit_host,
tmux_bin: tmux_bin,
)
)
end
end
module Adapters
# An adapter object that describes the communication with a remote host
# for job management.
class Fork < Adapter
using Refinements::ArrayExtensions
require "ood_core/job/adapters/fork/launcher"
def initialize(ssh_hosts:, launcher:)
@launcher = launcher
@ssh_hosts = Set.new(ssh_hosts)
end
# Submit a job with the attributes defined in the job template instance
# @param script [Script] script object that describes the script and
# attributes for the submitted job
# @param after [#to_s, Array<#to_s>] No scheduling is available is used; setting raises JobAdapterError
# @param afterok [#to_s, Array<#to_s>] No scheduling is available is used; setting raises JobAdapterError
# @param afternotok [#to_s, Array<#to_s>] No scheduling is available is used; setting raises JobAdapterError
# @param afterany [#to_s, Array<#to_s>] No scheduling is available is used; setting raises JobAdapterError
# @raise [JobAdapterError] if something goes wrong submitting a job
# @return [String] the job id returned after successfully submitting a
# job
# @see Adapter#submit
def submit(script, after: [], afterok: [], afternotok: [], afterany: [])
unless (after.empty? && afterok.empty? && afternotok.empty? && afterany.empty?)
raise JobAdapterError, 'Scheduling subsequent jobs is not available.'
end
@launcher.start_remote_session(script)
rescue Launcher::Error => e
raise JobAdapterError, e.message
end
# Retrieve info for all jobs from the resource manager
# @raise [JobAdapterError] if something goes wrong getting job info
# @return [Array<Info>] information describing submitted jobs
# @see Adapter#info_all
def info_all(attrs: nil, host: nil)
host_permitted?(host) if host
@launcher.list_remote_sessions(host: host).map{
|ls_output| ls_to_info(ls_output)
}
rescue Launcher::Error => e
raise JobAdapterError, e.message
end
# Retrieve info for all jobs for a given owner or owners from the
# resource manager
# Note: owner and attrs are present only to complete the interface and are ignored
# Note: since this API is used in production no errors or warnings are thrown / issued
# @param owner [#to_s, Array<#to_s>] the owner(s) of the jobs
# @raise [JobAdapterError] if something goes wrong getting job info
# @return [Array<Info>] information describing submitted jobs
def info_where_owner(owner: nil, attrs: nil)
info_all
end
# Iterate over each job Info object
# @param attrs [Array<symbol>] attrs is present only to complete the interface and is ignored
# @yield [Info] of each job to block
# @return [Enumerator] if no block given
def info_all_each(attrs: nil)
return to_enum(:info_all_each, attrs: attrs) unless block_given?
info_all(attrs: attrs).each do |job|
yield job
end
end
# Iterate over each job Info object
# @param owner [#to_s, Array<#to_s>] owner is present only to complete the interface and is ignored
# @param attrs [Array<symbol>] attrs is present only to complete the interface and is ignored
# @yield [Info] of each job to block
# @return [Enumerator] if no block given
def info_where_owner_each(owner, attrs: nil)
return to_enum(:info_where_owner_each, owner, attrs: attrs) unless block_given?
info_where_owner(owner, attrs: attrs).each do |job|
yield job
end
end
# Whether the adapter supports job arrays
# @return [Boolean] - false
def supports_job_arrays?
false
end
# Retrieve job info from the SSH host
# @param id [#to_s] the id of the job
# @raise [JobAdapterError] if something goes wrong getting job info
# @return [Info] information describing submitted job
# @see Adapter#info
def info(id)
_, host = parse_job_id(id)
job = info_all(host: host).select{|info| info.id == id}.first
(job) ? job : Info.new(id: id, status: :completed)
rescue Launcher::Error => e
raise JobAdapterError, e.message
end
# Retrieve job status from resource manager
# @note Optimized slightly over retrieving complete job information from server
# @abstract Subclass is expected to implement {#status}
# @raise [NotImplementedError] if subclass did not define {#status}
# @param id [#to_s] the id of the job
# @return [Status] status of job
def status(id)
_, host = parse_job_id(id)
job = info_all(host: host).select{|info| info.id == id}.first
Status.new(state: (job) ? :running : :completed)
rescue Launcher::Error => e
raise JobAdapterError, e.message
end
# Put the submitted job on hold
# @abstract Subclass is expected to implement {#hold}
# @raise [NotImplementedError] if subclass did not define {#hold}
# @param id [#to_s] the id of the job
# @return [void]
def hold(id)
# Consider sending SIGSTOP?
raise NotImplementedError, "subclass did not define #hold"
end
# Release the job that is on hold
# @abstract Subclass is expected to implement {#release}
# @raise [NotImplementedError] if subclass did not define {#release}
# @param id [#to_s] the id of the job
# @return [void]
def release(id)
# Consider sending SIGCONT
raise NotImplementedError, "subclass did not define #release"
end
# Delete the submitted job
# @abstract Subclass is expected to implement {#delete}
# @raise [NotImplementedError] if subclass did not define {#delete}
# @param id [#to_s] the id of the job
# @return [void]
def delete(id)
session_name, destination_host = parse_job_id(id)
@launcher.stop_remote_session(session_name, destination_host)
rescue Launcher::Error => e
raise JobAdapterError, e.message
end
private
def host_permitted?(destination_host)
raise JobAdapterError, "Requested destination host (#{destination_host}) not permitted" unless @ssh_hosts.include?(destination_host)
end
def parse_job_id(id)
raise JobAdapterError, "#{id} is not a valid Fork adapter id because it is missing the '@'." unless id.include?('@')
return id.split('@')
end
# Convert the returned Hash into an Info object
def ls_to_info(ls_output)
started = ls_output[:session_created].to_i
now = Time.now.to_i
ellapsed = now - started
Info.new(
accounting_id: nil,
allocated_nodes: [NodeInfo.new(name: ls_output[:destination_host], procs: 1)],
cpu_time: ellapsed,
dispatch_time: started,
id: ls_output[:id],
job_name: nil, # TODO
job_owner: Etc.getlogin,
native: ls_output,
procs: 1,
queue_name: "Fork adapter for #{@submit_host}",
status: :running,
submission_time: ellapsed,
submit_host: @submit_host,
wallclock_time: ellapsed
)
end
end
end
end
end