/
exec.go
312 lines (287 loc) · 11.5 KB
/
exec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
package client
import (
"context"
"errors"
"io"
"sort"
"time"
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
log "github.com/golang/glog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
// Redundant imports are required for the google3 mirror. Aliases should not be changed.
regrpc "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
gerrors "github.com/pkg/errors"
oppb "google.golang.org/genproto/googleapis/longrunning"
dpb "google.golang.org/protobuf/types/known/durationpb"
)
const (
containerImagePropertyName = "container-image"
)
// Action encodes the full details of an action to be sent to the remote execution service for
// execution. It corresponds roughly, but not exactly, to the Action proto used by the Remote
// Execution API.
type Action struct {
// Args are the command-line arguments to start the process. The first argument is the process
// name, and the rest are its arguments.
Args []string
// EnvVars are the variables to add to the process's environment.
EnvVars map[string]string
// InputRoot and InputFiles contain the details of the input tree, in remote execution format.
// They should normally be constructed through the PackageTree function.
InputRoot digest.Digest
InputFiles map[digest.Digest][]byte
// OutputFiles is a list of output files requested (full paths).
OutputFiles []string
// OutputDirs is a list of output directories requested (full paths).
OutputDirs []string
// Docker image is a docker:// URL to the docker image in which execution will take place.
DockerImage string
// Timeout is the maximum execution time for the action. Note that it's not an overall timeout on
// the process, since there may be additional time for transferring files, waiting for a worker to
// become available, or other overhead.
//
// If 0, the server's default timeout is used.
Timeout time.Duration
// DoNotCache, if true, indicates that the result of this action should never be cached. It
// implies SkipCache.
DoNotCache bool
// SkipCache, if true, indicates that this action should be executed even if there is a copy of
// its result in the action cache that could be used instead.
SkipCache bool
}
// ExecuteAction performs all of the steps necessary to execute an action, including checking the
// cache if applicable, uploading necessary protos and inputs to the CAS, queueing the action, and
// waiting for the result.
//
// Execute may block for a long time while the action is in progress. Currently, two-phase
// queue-wait is not supported; the token necessary to query the job is not provided to users.
//
// This method MAY return a non-nil ActionResult along with a non-nil error if the action failed.
// The ActionResult may include, for example, the stdout/stderr digest from the attempt.
//
// ExecuteAction is a convenience method which wraps both PrepAction and ExecuteAndWait, along with
// other steps such as uploading extra inputs and parsing Operation protos.
func (c *Client) ExecuteAction(ctx context.Context, ac *Action) (*repb.ActionResult, error) {
log.V(1).Infof("Executing action: %v", ac.Args)
// Construct the action we're trying to run.
acDg, res, err := c.PrepAction(ctx, ac)
if err != nil {
return nil, err
}
// If we found a result in the cache, return that.
if res != nil {
return res, nil
}
// Upload any remaining inputs.
if err := c.WriteBlobs(ctx, ac.InputFiles); err != nil {
return nil, gerrors.WithMessage(err, "uploading input files to the CAS")
}
log.V(1).Info("Executing job")
res, err = c.executeJob(ctx, ac.SkipCache, acDg)
if err != nil {
return res, gerrors.WithMessage(err, "executing an action")
}
return res, nil
}
// CheckActionCache queries remote action cache, returning an ActionResult or nil if it doesn't exist.
func (c *Client) CheckActionCache(ctx context.Context, acDg *repb.Digest) (*repb.ActionResult, error) {
res, err := c.GetActionResult(ctx, &repb.GetActionResultRequest{
InstanceName: c.InstanceName,
ActionDigest: acDg,
})
switch st, _ := status.FromError(err); st.Code() {
case codes.OK:
return res, nil
case codes.NotFound:
return nil, nil
default:
return nil, gerrors.WithMessage(err, "checking the action cache")
}
}
func (c *Client) executeJob(ctx context.Context, skipCache bool, acDg *repb.Digest) (*repb.ActionResult, error) {
execReq := &repb.ExecuteRequest{
InstanceName: c.InstanceName,
SkipCacheLookup: skipCache,
ActionDigest: acDg,
}
op, err := c.ExecuteAndWait(ctx, execReq)
if err != nil {
return nil, gerrors.WithMessage(err, "execution error")
}
switch r := op.Result.(type) {
case *oppb.Operation_Error:
return nil, StatusDetailedError(status.FromProto(r.Error))
case *oppb.Operation_Response:
res := new(repb.ExecuteResponse)
if err := r.Response.UnmarshalTo(res); err != nil {
return nil, gerrors.WithMessage(err, "extracting ExecuteResponse from execution operation")
}
if st := status.FromProto(res.Status); st.Code() != codes.OK {
return res.Result, gerrors.WithMessage(StatusDetailedError(st), "job failed with error")
}
return res.Result, nil
default:
return nil, errors.New("unexpected operation result type")
}
}
// PrepAction constructs the Command and Action protos, checks the action cache if appropriate, and
// uploads the action if the cache was not checked or if there was no cache hit. If successful,
// PrepAction returns the digest of the Action and a (possibly nil) pointer to an ActionResult
// representing the result of the cache check, if any.
func (c *Client) PrepAction(ctx context.Context, ac *Action) (*repb.Digest, *repb.ActionResult, error) {
comDg, err := c.WriteProto(ctx, buildCommand(ac))
if err != nil {
return nil, nil, gerrors.WithMessage(err, "storing Command proto")
}
reAc := &repb.Action{
CommandDigest: comDg.ToProto(),
InputRootDigest: ac.InputRoot.ToProto(),
DoNotCache: ac.DoNotCache,
}
// Only set timeout if it's non-zero, because Timeout needs to be nil for the server to use a
// default.
if ac.Timeout != 0 {
reAc.Timeout = dpb.New(ac.Timeout)
}
acBlob, err := proto.Marshal(reAc)
if err != nil {
return nil, nil, gerrors.WithMessage(err, "marshalling Action proto")
}
acDg := digest.NewFromBlob(acBlob).ToProto()
// If the result is cacheable, check if it's already in the cache.
if !ac.DoNotCache || !ac.SkipCache {
log.V(1).Info("Checking cache")
res, err := c.CheckActionCache(ctx, acDg)
if err != nil {
return nil, nil, err
}
if res != nil {
return acDg, res, nil
}
}
// No cache hit, or we didn't check. Upload the action instead.
if _, err := c.WriteBlob(ctx, acBlob); err != nil {
return nil, nil, gerrors.WithMessage(err, "uploading action to the CAS")
}
return acDg, nil, nil
}
func buildCommand(ac *Action) *repb.Command {
cmd := &repb.Command{
Arguments: ac.Args,
// Do not use OutputFiles and OutputDirs directly from the Action, as we need to sort them which
// implies modification.
OutputFiles: make([]string, len(ac.OutputFiles)),
OutputDirectories: make([]string, len(ac.OutputDirs)),
Platform: &repb.Platform{
Properties: []*repb.Platform_Property{{Name: containerImagePropertyName, Value: ac.DockerImage}},
},
}
copy(cmd.OutputFiles, ac.OutputFiles)
copy(cmd.OutputDirectories, ac.OutputDirs)
sort.Strings(cmd.OutputFiles)
sort.Strings(cmd.OutputDirectories)
for name, val := range ac.EnvVars {
cmd.EnvironmentVariables = append(cmd.EnvironmentVariables, &repb.Command_EnvironmentVariable{Name: name, Value: val})
}
sort.Slice(cmd.EnvironmentVariables, func(i, j int) bool { return cmd.EnvironmentVariables[i].Name < cmd.EnvironmentVariables[j].Name })
return cmd
}
// ExecuteAndWait calls Execute on the underlying client and WaitExecution if necessary. It returns
// the completed operation or an error.
//
// The retry logic is complicated. Assuming retries are enabled, we want the retry to call
// WaitExecution if there's an Operation "in progress", and to call Execute otherwise. In practice
// that means:
// 1. If an error occurs before the first operation is returned, or after the final operation is
// returned (i.e. the one with op.Done==true), retry by calling Execute again.
// 2. Otherwise, retry by calling WaitExecution with the last operation name.
//
// In addition, we want the retrier to trigger based on certain operation statuses as well as on
// explicit errors. (The shouldRetry function knows which statuses.) We do this by mapping statuses,
// if present, to errors inside the closure and then throwing away such "fake" errors outside the
// closure (if we ran out of retries or if there was never a retrier enabled). The exception is
// deadline-exceeded statuses, which we never give to the retrier (and hence will always propagate
// directly to the caller).
func (c *Client) ExecuteAndWait(ctx context.Context, req *repb.ExecuteRequest) (op *oppb.Operation, err error) {
return c.ExecuteAndWaitProgress(ctx, req, nil)
}
// ExecuteAndWaitProgress calls Execute on the underlying client and WaitExecution if necessary. It returns
// the completed operation or an error.
// The supplied callback function is called for each message received to update the state of
// the remote action.
func (c *Client) ExecuteAndWaitProgress(ctx context.Context, req *repb.ExecuteRequest, progress func(metadata *repb.ExecuteOperationMetadata)) (op *oppb.Operation, err error) {
wait := false // Should we retry by calling WaitExecution instead of Execute?
opError := false // Are we propagating an Operation status as an error for the retrier's benefit?
lastOp := &oppb.Operation{}
closure := func(ctx context.Context) (e error) {
var res regrpc.Execution_ExecuteClient
if wait {
res, e = c.WaitExecution(ctx, &repb.WaitExecutionRequest{Name: lastOp.Name})
} else {
res, e = c.Execute(ctx, req)
}
if e != nil {
return e
}
for {
op, e := res.Recv()
if e == io.EOF {
break
}
if e != nil {
return e
}
wait = !op.Done
lastOp = op
if progress != nil {
metadata := &repb.ExecuteOperationMetadata{}
if err := op.Metadata.UnmarshalTo(metadata); err == nil {
progress(metadata)
}
}
}
st := OperationStatus(lastOp)
if st != nil {
opError = true
if st.Code() == codes.DeadlineExceeded {
return nil
}
return st.Err()
}
return nil
}
err = c.Retrier.Do(ctx, func() error { return c.CallWithTimeout(ctx, "Execute", closure) })
if err != nil && !opError {
if st, ok := status.FromError(err); ok {
err = StatusDetailedError(st)
}
return nil, err
}
// In the off chance that the server closes the stream immediately without returning any Operation
// values and without returning an error, then lastOp will never be modified. Alternatively
// the server could return an empty operation explicitly prior to closing the stream. Either
// case is a server error.
if proto.Equal(lastOp, &oppb.Operation{}) {
return nil, errors.New("unexpected server behaviour: an empty Operation was returned, or no operation was returned")
}
return lastOp, nil
}
// OperationStatus returns an operation error status, if it is present, and nil otherwise.
func OperationStatus(op *oppb.Operation) *status.Status {
var r *oppb.Operation_Response
var ok bool
if r, ok = op.Result.(*oppb.Operation_Response); !ok || r == nil {
return nil
}
respv2 := &repb.ExecuteResponse{}
if err := r.Response.UnmarshalTo(respv2); err != nil {
return nil
}
if s, ok := status.FromError(status.FromProto(respv2.Status).Err()); ok {
return s
}
return nil
}