/
executor_buffer.go
262 lines (227 loc) · 8.98 KB
/
executor_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
package compute
import (
"context"
"fmt"
"sync"
"time"
"github.com/bacalhau-project/bacalhau/pkg/compute/capacity"
"github.com/bacalhau-project/bacalhau/pkg/compute/store"
"github.com/bacalhau-project/bacalhau/pkg/lib/collections"
"github.com/bacalhau-project/bacalhau/pkg/logger"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/rs/zerolog/log"
)
type bufferTask struct {
localExecutionState store.LocalExecutionState
enqueuedAt time.Time
}
func newBufferTask(execution store.LocalExecutionState) *bufferTask {
return &bufferTask{
localExecutionState: execution,
enqueuedAt: time.Now(),
}
}
type ExecutorBufferParams struct {
ID string
DelegateExecutor Executor
Callback Callback
RunningCapacityTracker capacity.Tracker
EnqueuedCapacityTracker capacity.Tracker
DefaultJobExecutionTimeout time.Duration
}
// ExecutorBuffer is a backend.Executor implementation that buffers executions locally until enough capacity is
// available to be able to run them. The buffer accepts a delegate backend.Executor that will be used to run the jobs.
// The buffer is implemented as a FIFO queue, where the order of the executions is determined by the order in which
// they were enqueued. However, an execution with high resource usage requirements might be skipped if there are newer
// jobs with lower resource usage requirements that can be executed immediately. This is done to improve utilization
// of compute nodes, though it might result in starvation and should be re-evaluated in the future.
type ExecutorBuffer struct {
ID string
runningCapacity capacity.Tracker
enqueuedCapacity capacity.Tracker
delegateService Executor
callback Callback
running map[string]*bufferTask
queuedTasks *collections.HashedPriorityQueue[string, *bufferTask]
defaultJobExecutionTimeout time.Duration
mu sync.Mutex
}
func NewExecutorBuffer(params ExecutorBufferParams) *ExecutorBuffer {
indexer := func(b *bufferTask) string {
return b.localExecutionState.Execution.ID
}
r := &ExecutorBuffer{
ID: params.ID,
runningCapacity: params.RunningCapacityTracker,
enqueuedCapacity: params.EnqueuedCapacityTracker,
delegateService: params.DelegateExecutor,
callback: params.Callback,
running: make(map[string]*bufferTask),
defaultJobExecutionTimeout: params.DefaultJobExecutionTimeout,
queuedTasks: collections.NewHashedPriorityQueue[string, *bufferTask](indexer),
}
return r
}
// Run enqueues the execution and tries to run it if there is enough capacity.
func (s *ExecutorBuffer) Run(ctx context.Context, localExecutionState store.LocalExecutionState) error {
var err error
s.mu.Lock()
defer s.mu.Unlock()
execution := localExecutionState.Execution
defer func() {
if err != nil {
s.callback.OnComputeFailure(ctx, ComputeError{
ExecutionMetadata: NewExecutionMetadata(execution),
RoutingMetadata: RoutingMetadata{
SourcePeerID: s.ID,
TargetPeerID: localExecutionState.RequesterNodeID,
},
Err: err.Error(),
})
}
}()
// There is no point in enqueuing a job that requires more than the total capacity of the node. Such jobs should
// have not reached this backend in the first place, and should have been rejected by the frontend when asked to bid
if !s.runningCapacity.IsWithinLimits(ctx, *execution.TotalAllocatedResources()) {
err = fmt.Errorf("not enough capacity to run job")
return err
}
if s.queuedTasks.Contains(execution.ID) {
err = fmt.Errorf("execution %s already enqueued", execution.ID)
return err
}
if _, ok := s.running[execution.ID]; ok {
err = fmt.Errorf("execution %s already running", execution.ID)
return err
}
if added := s.enqueuedCapacity.AddIfHasCapacity(ctx, *execution.TotalAllocatedResources()); added == nil {
err = fmt.Errorf("not enough capacity to enqueue job")
return err
} else {
// Update the execution to include all the resources that have
// actually been allocated. Effectively this is picking the GPU(s)
// that the job will use. Note that this is not persisted here, as
// it was based on current usage information which would change
// under a restart, so it will only persist if the job starts
execution.AllocateResources(execution.Job.Task().Name, *added)
}
s.queuedTasks.Enqueue(newBufferTask(localExecutionState), execution.Job.Priority)
s.deque()
return err
}
// doRun triggers the execution by the delegate backend.Executor and frees up the capacity when the execution is done.
func (s *ExecutorBuffer) doRun(ctx context.Context, task *bufferTask) {
job := task.localExecutionState.Execution.Job
ctx = system.AddJobIDToBaggage(ctx, job.ID)
ctx = system.AddNodeIDToBaggage(ctx, s.ID)
ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/compute.ExecutorBuffer.Run")
defer span.End()
var timeout time.Duration
if !job.IsLongRunning() {
timeout = job.Task().Timeouts.GetExecutionTimeout()
if timeout == 0 {
timeout = s.defaultJobExecutionTimeout
}
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
ch := make(chan error)
go func() {
ch <- s.delegateService.Run(ctx, task.localExecutionState)
}()
select {
case <-ctx.Done():
log.Ctx(ctx).Info().Str("ID", task.localExecutionState.Execution.ID).Dur("Timeout", timeout).Msg("Execution timed out")
s.callback.OnCancelComplete(ctx, CancelResult{
ExecutionMetadata: NewExecutionMetadata(task.localExecutionState.Execution),
RoutingMetadata: RoutingMetadata{
SourcePeerID: s.ID,
TargetPeerID: task.localExecutionState.RequesterNodeID,
},
})
case <-ch:
// no need to check for run errors as they are already handled by the delegate backend.Executor and
// to the callback.
}
s.mu.Lock()
defer s.mu.Unlock()
s.runningCapacity.Remove(ctx, *task.localExecutionState.Execution.TotalAllocatedResources())
delete(s.running, task.localExecutionState.Execution.ID)
s.deque()
}
// deque tries to run the next execution in the queue if there is enough capacity.
// It is called every time a job is finished or enqueued, where a lock is already held.
// TODO: We loop through the queue every time a job runs or finishes, which is not very efficient.
func (s *ExecutorBuffer) deque() {
ctx := context.Background()
// There are at most max matches, so try at most that many times
max := s.queuedTasks.Len()
for i := 0; i < max; i++ {
qitem := s.queuedTasks.DequeueWhere(func(task *bufferTask) bool {
// If we don't have enough resources to run this task, then we will skip it
queued := task.localExecutionState.Execution.TotalAllocatedResources()
added := s.runningCapacity.AddIfHasCapacity(ctx, *queued)
if added == nil {
return false
}
// Update the execution to include all the resources that have
// actually been allocated
task.localExecutionState.Execution.AllocateResources(
task.localExecutionState.Execution.Job.Task().Name,
*added,
)
// Claim the resources now so that we don't count allocated resources
s.enqueuedCapacity.Remove(ctx, *queued)
return true
})
if qitem == nil {
// We didn't find anything in the queue that matches our resource availability so we will
// break out of this look as there is nothing else to find
break
}
task := qitem.Value
// Move the execution to the running list and remove from the list of enqueued IDs
// before we actually run the task
execID := task.localExecutionState.Execution.ID
s.running[execID] = task
go s.doRun(logger.ContextWithNodeIDLogger(context.Background(), s.ID), task)
}
}
func (s *ExecutorBuffer) Cancel(_ context.Context, localExecutionState store.LocalExecutionState) error {
// TODO: Enqueue cancel tasks
execution := localExecutionState.Execution
go func() {
ctx := logger.ContextWithNodeIDLogger(context.Background(), s.ID)
ctx = system.AddJobIDToBaggage(ctx, execution.Job.ID)
ctx = system.AddNodeIDToBaggage(ctx, s.ID)
ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/compute.ExecutorBuffer.Cancel")
defer span.End()
err := s.delegateService.Cancel(ctx, localExecutionState)
if err == nil {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.running, execution.ID)
}
}()
return nil
}
// RunningExecutions return list of running executions
func (s *ExecutorBuffer) RunningExecutions() []store.LocalExecutionState {
return s.mapValues(s.running)
}
// EnqueuedExecutionsCount return number of items enqueued
func (s *ExecutorBuffer) EnqueuedExecutionsCount() int {
return s.queuedTasks.Len()
}
func (s *ExecutorBuffer) mapValues(m map[string]*bufferTask) []store.LocalExecutionState {
s.mu.Lock()
defer s.mu.Unlock()
executions := make([]store.LocalExecutionState, 0, len(m))
for _, v := range m {
executions = append(executions, v.localExecutionState)
}
return executions
}
// compile-time interface check
var _ Executor = (*ExecutorBuffer)(nil)