-
Notifications
You must be signed in to change notification settings - Fork 63
/
build_client.go
339 lines (311 loc) · 11.7 KB
/
build_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
package builder
import (
"context"
"log"
"time"
remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/buildbarn/bb-remote-execution/pkg/filesystem"
"github.com/buildbarn/bb-remote-execution/pkg/proto/remoteworker"
"github.com/buildbarn/bb-storage/pkg/clock"
"github.com/buildbarn/bb-storage/pkg/digest"
"github.com/buildbarn/bb-storage/pkg/otel"
"github.com/buildbarn/bb-storage/pkg/program"
"github.com/buildbarn/bb-storage/pkg/random"
"github.com/buildbarn/bb-storage/pkg/util"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
)
// BuildClient is a client for the Remote Worker protocol. It can send
// synchronization requests to a scheduler, informing it of the current
// state of the worker, while also obtaining requests for executing
// build actions.
type BuildClient struct {
// Constant fields.
scheduler remoteworker.OperationQueueClient
buildExecutor BuildExecutor
filePool filesystem.FilePool
clock clock.Clock
instanceNamePrefix digest.InstanceName
instanceNamePatcher digest.InstanceNamePatcher
// Mutable fields that are always set.
request remoteworker.SynchronizeRequest
schedulerMayThinkExecutingUntil *time.Time
nextSynchronizationAt time.Time
// Mutable fields that are only set when executing an action.
executionCancellation func()
executionUpdates <-chan *remoteworker.CurrentState_Executing
}
// NewBuildClient creates a new BuildClient instance that is set to the
// initial state (i.e., being idle).
func NewBuildClient(scheduler remoteworker.OperationQueueClient, buildExecutor BuildExecutor, filePool filesystem.FilePool, clock clock.Clock, workerID map[string]string, instanceNamePrefix digest.InstanceName, platform *remoteexecution.Platform, sizeClass uint32) *BuildClient {
return &BuildClient{
scheduler: scheduler,
buildExecutor: buildExecutor,
filePool: filePool,
clock: clock,
instanceNamePrefix: instanceNamePrefix,
instanceNamePatcher: digest.NewInstanceNamePatcher(digest.EmptyInstanceName, instanceNamePrefix),
request: remoteworker.SynchronizeRequest{
WorkerId: workerID,
InstanceNamePrefix: instanceNamePrefix.String(),
Platform: platform,
SizeClass: sizeClass,
CurrentState: &remoteworker.CurrentState{
WorkerState: &remoteworker.CurrentState_Idle{
Idle: &emptypb.Empty{},
},
},
},
nextSynchronizationAt: clock.Now(),
}
}
func (bc *BuildClient) startExecution(executionRequest *remoteworker.DesiredState_Executing) error {
instanceNameSuffix, err := digest.NewInstanceName(executionRequest.InstanceNameSuffix)
if err != nil {
return util.StatusWrapf(err, "Invalid instance name suffix %#v", executionRequest.InstanceNameSuffix)
}
digestFunction, err := bc.instanceNamePatcher.PatchInstanceName(instanceNameSuffix).
GetDigestFunction(executionRequest.DigestFunction, 0)
if err != nil {
return err
}
bc.stopExecution()
// Spawn the execution of the build action.
var ctx context.Context
ctx, bc.executionCancellation = context.WithCancel(
otel.NewContextWithW3CTraceContext(
context.Background(),
executionRequest.W3CTraceContext))
updates := make(chan *remoteworker.CurrentState_Executing, 10)
bc.executionUpdates = updates
go func() {
executeResponse := bc.buildExecutor.Execute(
ctx,
bc.filePool,
nil,
digestFunction,
executionRequest,
updates)
updates <- &remoteworker.CurrentState_Executing{
ActionDigest: executionRequest.ActionDigest,
ExecutionState: &remoteworker.CurrentState_Executing_Completed{
Completed: executeResponse,
},
}
close(updates)
}()
// Change state to indicate the build has started.
bc.request.CurrentState.WorkerState = &remoteworker.CurrentState_Executing_{
Executing: &remoteworker.CurrentState_Executing{
ActionDigest: executionRequest.ActionDigest,
ExecutionState: &remoteworker.CurrentState_Executing_Started{
Started: &emptypb.Empty{},
},
},
}
return nil
}
func (bc *BuildClient) stopExecution() {
// Trigger cancellation of the existing build action and wait
// for it to complete. Discard the results.
if bc.executionCancellation != nil {
bc.executionCancellation()
for {
if _, hasUpdate := <-bc.executionUpdates; !hasUpdate {
break
}
}
bc.executionCancellation = nil
bc.executionUpdates = nil
}
bc.request.CurrentState.WorkerState = &remoteworker.CurrentState_Idle{
Idle: &emptypb.Empty{},
}
}
func (bc *BuildClient) applyExecutionUpdate(update *remoteworker.CurrentState_Executing) {
if update != nil {
// New update received.
bc.request.CurrentState.WorkerState = &remoteworker.CurrentState_Executing_{
Executing: update,
}
} else {
// Execution has finished. Clean up resources.
bc.executionCancellation()
bc.executionCancellation = nil
bc.executionUpdates = nil
}
}
func (bc *BuildClient) consumeExecutionUpdatesNonBlocking() {
for {
select {
case update := <-bc.executionUpdates:
bc.applyExecutionUpdate(update)
default:
// No more updates left.
return
}
}
}
// touchSchedulerMayThinkExecuting updates state on whether the
// scheduler may think the worker is currently executing an action. This
// is used to determine whether it is safe to terminate the worker
// gracefully.
func (bc *BuildClient) touchSchedulerMayThinkExecuting() {
// Assume that if we've missed the desired synchronization time
// provided by the scheduler by more than a minute, the
// scheduler has purged our state.
until := bc.nextSynchronizationAt.Add(time.Minute)
bc.schedulerMayThinkExecutingUntil = &until
}
// Run a iteration of the Remote Worker client, by performing a single
// synchronization against the scheduler.
func (bc *BuildClient) Run(ctx context.Context) (bool, error) {
// Allow the worker to terminate if the scheduler doesn't think
// we're executing any action, or if we haven't been able to
// successfully synchronize for a prolonged amount of time.
if ctx.Err() != nil && (bc.schedulerMayThinkExecutingUntil == nil || bc.clock.Now().After(*bc.schedulerMayThinkExecutingUntil)) {
return true, nil
}
// If the scheduler isn't assuming we're executing any action
// right now, perform some readiness checks. This ensures we
// don't dequeue actions from the scheduler while unhealthy.
if bc.schedulerMayThinkExecutingUntil == nil {
if err := bc.buildExecutor.CheckReadiness(ctx); err != nil {
return true, util.StatusWrap(err, "Worker failed readiness check")
}
}
// When executing an action, see if there are any updates on the
// execution state.
if bc.executionCancellation != nil {
timer, timerChannel := bc.clock.NewTimer(bc.nextSynchronizationAt.Sub(bc.clock.Now()))
select {
case <-timerChannel:
// No meaningful updates. Send the last update
// once again.
case update := <-bc.executionUpdates:
// One or more execution updates available. Send
// a new update with the latest state,
// regardless of the next synchronization time
// returned by the scheduler during the previous
// synchronize call.
timer.Stop()
bc.applyExecutionUpdate(update)
bc.consumeExecutionUpdatesNonBlocking()
if now := bc.clock.Now(); bc.nextSynchronizationAt.After(now) {
bc.nextSynchronizationAt = now
}
}
}
// Determine whether we should perform call to Synchronize with
// prefer_being_able set to false (potentially blocking) or true
// (non-blocking).
currentStateIsExecuting := false
switch workerState := bc.request.CurrentState.WorkerState.(type) {
case *remoteworker.CurrentState_Idle:
// Even though we are idle, the scheduler may think we
// are executing. This means we were not able to perform
// readiness checks. Forcefully switch to idle, so that
// we can still do this before picking up more work.
bc.request.PreferBeingIdle = bc.schedulerMayThinkExecutingUntil != nil
case *remoteworker.CurrentState_Executing_:
if updateCompleted, ok := workerState.Executing.ExecutionState.(*remoteworker.CurrentState_Executing_Completed); ok {
// In case execution failed with a serious
// error, request that the worker gets a brief
// amount of idle time, so that we can do some
// health checks prior to picking up more work.
bc.request.PreferBeingIdle = status.ErrorProto(updateCompleted.Completed.Status) != nil
} else {
currentStateIsExecuting = true
bc.request.PreferBeingIdle = false
}
default:
panic("Unknown worker state")
}
// If we need to shut down, we should never be performing
// blocking Synchronize() calls. We should still let the calls
// go through, so that we can either finish the current action,
// or properly ensure the scheduler thinks we're in the idle
// state.
if ctx.Err() != nil {
bc.request.PreferBeingIdle = true
ctx = context.Background()
}
// Inform scheduler of current worker state, potentially
// requesting new work. If this fails, we might have lost an
// execute request sent by the scheduler, so assume the
// scheduler thinks we may be executing.
response, err := bc.scheduler.Synchronize(ctx, &bc.request)
if bc.schedulerMayThinkExecutingUntil == nil {
bc.touchSchedulerMayThinkExecuting()
}
if err != nil {
return false, util.StatusWrap(err, "Failed to synchronize with scheduler")
}
// Determine when we should contact the scheduler again in case
// of no activity.
nextSynchronizationAt := response.NextSynchronizationAt
if err := nextSynchronizationAt.CheckValid(); err != nil {
return false, util.StatusWrap(err, "Scheduler response contained invalid synchronization timestamp")
}
bc.nextSynchronizationAt = nextSynchronizationAt.AsTime()
// Apply desired state changes provided by the scheduler.
if desiredState := response.DesiredState; desiredState != nil {
switch workerState := desiredState.WorkerState.(type) {
case *remoteworker.DesiredState_Executing_:
// Scheduler is requesting us to execute the
// next action, maybe forcing us to to stop
// execution of the current build action.
if err := bc.startExecution(workerState.Executing); err != nil {
return false, err
}
bc.touchSchedulerMayThinkExecuting()
return false, nil
case *remoteworker.DesiredState_Idle:
// Scheduler is forcing us to go back to idle.
bc.stopExecution()
bc.schedulerMayThinkExecutingUntil = nil
return true, nil
default:
return false, status.Error(codes.Internal, "Scheduler provided an unknown desired state")
}
}
// Scheduler has instructed to continue as is.
if currentStateIsExecuting {
bc.touchSchedulerMayThinkExecuting()
return false, nil
}
bc.schedulerMayThinkExecutingUntil = nil
return true, nil
}
// LaunchWorkerThread launches a single routine that uses a build client
// to repeatedly synchronizes against the scheduler, requesting a task
// to execute.
func LaunchWorkerThread(group program.Group, buildClient *BuildClient, workerName string) {
group.Go(func(ctx context.Context, siblingsGroup, dependenciesGroup program.Group) error {
generator := random.NewFastSingleThreadedGenerator()
for {
terminationStartedBeforeRun := ctx.Err() != nil
if mayTerminate, err := buildClient.Run(ctx); mayTerminate && ctx.Err() != nil {
log.Printf("Worker %s: terminating", workerName)
return nil
} else if err != nil {
log.Printf("Worker %s: %s", workerName, err)
// In case of errors, sleep a random amount of
// time. Allow the sleep to be skipped once when
// termination is initiated, so that it happens
// quickly.
if d := random.Duration(generator, 5*time.Second); terminationStartedBeforeRun {
time.Sleep(d)
} else {
t := time.NewTimer(d)
select {
case <-t.C:
case <-ctx.Done():
t.Stop()
}
}
}
}
})
}