-
Notifications
You must be signed in to change notification settings - Fork 0
/
gcpbatchtracker.go
296 lines (271 loc) · 10.6 KB
/
gcpbatchtracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
package gcpbatchtracker
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
batch "cloud.google.com/go/batch/apiv1"
"cloud.google.com/go/batch/apiv1/batchpb"
"github.com/dgruber/drmaa2interface"
"github.com/dgruber/drmaa2os/pkg/helper"
"github.com/dgruber/drmaa2os/pkg/jobtracker"
"github.com/patrickmn/go-cache"
"google.golang.org/api/iterator"
)
// GCPBatchTracker implements the JobTracker interface so that it can be
// used as backend in drmaa2os project.
type GCPBatchTracker struct {
client *batch.Client
// GCP project ID
project string
// GCP location
location string
// job session name
drmaa2session string
// cache for job info
jcache *cache.Cache
}
// NewGCPBatchTracker returns a new GCPBatchTracker instance which is used
// for managing jobs in Google Batch. The project and location parameters
// define the Google Cloud project and the location (like "us-central1").
// The drmaa2session parameter is optional and can be used to filter for
// jobs which are in the same job session. If the job session is "" then
// all jobs are made visible.
// GCPBatchTracker implements the JobTracker interface so that it can be
// used as backend in drmaa2os project and wfl.
func NewGCPBatchTracker(drmaa2session string, project, location string) (*GCPBatchTracker, error) {
ctx := context.Background()
c, err := batch.NewClient(ctx)
if err != nil {
return nil, err
}
return &GCPBatchTracker{
client: c,
project: project,
location: location,
drmaa2session: drmaa2session,
jcache: cache.New(10*time.Second, 1*time.Minute),
}, nil
}
// ListJobs returns all visible job IDs or an error.
func (t *GCPBatchTracker) ListJobs() ([]string, error) {
return listJobs(t, true)
}
// listJobs returns all visible job IDs or an error. If useJobSessionFilter
// is true then only jobs which are in the same job session are returned.
func listJobs(t *GCPBatchTracker, useJobSessionFilter bool) ([]string, error) {
jobs := make([]string, 0)
req := &batchpb.ListJobsRequest{
Parent: fmt.Sprintf("projects/%s/locations/%s", t.project, t.location),
}
iter := t.client.ListJobs(context.Background(), req)
for {
job, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
// filter for jobsession, if job session is "" then all jobs are returned
if useJobSessionFilter && t.drmaa2session != "" {
if job.Labels["drmaa2session"] != t.drmaa2session {
continue
}
}
jobs = append(jobs, job.Name)
// cache job info
ji, err := BatchJobToJobInfo(t.project, job)
if err != nil {
continue
}
jiJSON, err := json.Marshal(ji)
if err != nil {
continue
}
t.jcache.Set(job.Name, jiJSON, cache.DefaultExpiration)
}
return jobs, nil
}
// ListArrayJobs returns all job IDs an job array ID (or array job ID)
// represents or an error.
func (t *GCPBatchTracker) ListArrayJobs(arrayjobID string) ([]string, error) {
// TODO implement real job arrays
return helper.ArrayJobID2GUIDs(arrayjobID)
}
// AddJob creates a Google Batch job which is defined by the DRMAA2 job
// template.
// Job names must be unique in Google Batch hence it is automatically created
// by the backend. The CandidateMachines field is used to define the machine
// type (like "n2-standard-2") to be used. Exactly one machine type must be
// specified. The ResourceLimits field is used to define the CPU and runtime
// limits.
// On success the job ID (job name) is returned.
func (t *GCPBatchTracker) AddJob(jt drmaa2interface.JobTemplate) (string, error) {
req, err := ConvertJobTemplateToJobRequest(t.drmaa2session, t.project, t.location, jt)
if err != nil {
return "", err
}
// do some init: in case the stage out bucket does not exist, create it
if err := CreateMissingStageOutBuckets(t.project, jt.StageOutFiles); err != nil {
return "", fmt.Errorf("could not create stage out buckets: %v", err)
}
job, err := t.client.CreateJob(context.Background(), req)
if err != nil {
return "", err
}
return job.Name, nil
}
// AddArrayJob makes a mass submission of jobs defined by the same job template.
// Many HPC workload manager support job arrays for submitting 10s of thousands
// of similar jobs by one call. The additional parameters define how many jobs
// are submitted by defining a TASK_ID range. Begin is the first task ID (like 1),
// end is the last task ID (like 10), step is a positive integeger which defines
// the increments from one task ID to the next task ID (like 1). maxParallel is
// an arguments representating an optional functionality which instructs the
// backend to limit maxParallel tasks of this job arary to run in parallel.
// Note, that jobs use the TASK_ID environment variable to identifiy which
// task they are and determine that way what to do (like which data set is
// accessed).
//
// With Google Batch job arrays can be created by using MinSlots and MaxSlots
// in AddJob(). MaxSlots defines the number of tasks in the job array. MinSlots
// defines the number of tasks which are run in parallel (for MPI).
func (t *GCPBatchTracker) AddArrayJob(jt drmaa2interface.JobTemplate, begin int, end int, step int, maxParallel int) (string, error) {
// TODO: translate to Google Batch instead using a simple wrapper
return helper.AddArrayJobAsSingleJobs(jt, t, begin, end, step)
}
// JobState returns the DRMAA2 state and substate (free form string) of the job.
func (t *GCPBatchTracker) JobState(jobID string) (drmaa2interface.JobState, string, error) {
// invalidate cache
t.jcache.Delete(jobID)
job, err := t.client.GetJob(context.Background(), &batchpb.GetJobRequest{
Name: jobID,
})
if err != nil {
return drmaa2interface.Undetermined, "", err
}
if t.drmaa2session != "" {
if job.Labels["drmaa2session"] != t.drmaa2session {
return drmaa2interface.Undetermined, "", errors.New("job not found in job session")
}
}
return ConvertJobState(job)
}
// JobInfo returns the job status of a job in form of a JobInfo struct or an error.
func (t *GCPBatchTracker) JobInfo(jobID string) (drmaa2interface.JobInfo, error) {
if ji, found := t.jcache.Get(jobID); found {
var jobInfo drmaa2interface.JobInfo
if err := json.Unmarshal(ji.([]byte), &jobInfo); err != nil {
fmt.Printf("could not unmarshal job info: %v", err)
}
return jobInfo, nil
}
job, err := t.client.GetJob(context.Background(),
&batchpb.GetJobRequest{
Name: jobID,
})
if err != nil {
return drmaa2interface.JobInfo{}, err
}
if t.drmaa2session != "" &&
!IsInDRMAA2Session(t.client, t.drmaa2session, jobID) {
return drmaa2interface.JobInfo{},
errors.New("job not found in job session")
}
return BatchJobToJobInfo(t.project, job)
}
// JobControl sends a request to the backend to either "terminate", "suspend",
// "resume", "hold", or "release" a job. The strings are fixed and are defined
// by the JobControl constants. This could change in the future to be limited
// only to constants representing the actions. When the request is not accepted
// by the system the function must return an error.
func (t *GCPBatchTracker) JobControl(jobID string, action string) error {
// invalidate cache
t.jcache.Delete(jobID)
switch action {
case jobtracker.JobControlSuspend:
return errors.New("unsupported operation")
case jobtracker.JobControlResume:
return errors.New("unsupported operation")
case jobtracker.JobControlHold:
// can a Google Batch job be put in hold?
return errors.New("unsupported operation")
case jobtracker.JobControlRelease:
// can a Google Batch job be released from hold?
return errors.New("unsupported operation")
case jobtracker.JobControlTerminate:
// TODO: that reaps the job and should be DeleteJob()
// any Google Batch equivalent?
if t.drmaa2session != "" && !IsInDRMAA2Session(t.client, t.drmaa2session, jobID) {
return errors.New("job not found in job session")
}
_, err := t.client.DeleteJob(context.Background(), &batchpb.DeleteJobRequest{
Name: jobID,
Reason: "job terminated by user",
})
return err
}
return fmt.Errorf("undefined job operation")
}
// Wait blocks until the job is either in one of the given states, the max.
// waiting time (specified by timeout) is reached or an other internal
// error occured (like job was not found). In case of a timeout also an
// error must be returned.
func (t *GCPBatchTracker) Wait(jobID string, timeout time.Duration, state ...drmaa2interface.JobState) error {
if t.drmaa2session != "" && !IsInDRMAA2Session(t.client, t.drmaa2session, jobID) {
return errors.New("job not found in job session")
}
// invalidate cache
t.jcache.Delete(jobID)
return helper.WaitForState(t, jobID, timeout, state...)
}
// DeleteJob removes a job from a potential internal database. It does not stop
// a job. A job must be in an endstate (terminated, failed) in order to call
// DeleteJob. In case of an error or the job is not in an end state error must be
// returned. If the backend does not support cleaning up resources for a finished
// job nil should be returned.
func (t *GCPBatchTracker) DeleteJob(jobID string) error {
// here it does not need to be in an end state
if t.drmaa2session != "" && !IsInDRMAA2Session(t.client, t.drmaa2session, jobID) {
return fmt.Errorf("job not found in job session %s", t.drmaa2session)
}
// invalidate cache
t.jcache.Delete(jobID)
_, err := t.client.DeleteJob(context.Background(),
&batchpb.DeleteJobRequest{
Name: jobID,
Reason: "job deleted by user",
})
return err
}
// ListJobCategories returns a list of job categories which can be used in the
// JobCategory field of the job template. The list is informational. An example
// is returning a list of supported container images. AddJob() and AddArrayJob()
// processes a JobTemplate and hence also the JobCategory field.
//
// JobCategories supported by Google Batch are all container images which can be
// used by the service. If "$script$" is used as JobCategory then the RemoteCommand
// field of the JobTemplate is used as script. If "$script_path$" is used as
// JobCategory then the RemoteCommand field of the JobTemplate is used as path
// to a script which is executed.
func (t *GCPBatchTracker) ListJobCategories() ([]string, error) {
return []string{JobCategoryScriptPath, JobCategoryScript,
"<container_image_name>"}, nil
}
func IsInDRMAA2Session(client *batch.Client, session string, jobID string) bool {
// job ID might be long or short
//name := strings.Split(jobID, "/")[len(strings.Split(jobID, "/"))-1]
job, err := client.GetJob(context.Background(),
&batchpb.GetJobRequest{
Name: jobID,
})
if err != nil {
return false
}
return IsInJobSession(session, job)
}
func IsInJobSession(session string, job *batchpb.Job) bool {
return job.Labels["drmaa2session"] == session
}