/
executor.go
298 lines (257 loc) · 10 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
package wasm
import (
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"github.com/rs/zerolog/log"
"github.com/tetratelabs/wazero"
"go.uber.org/atomic"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/system"
"github.com/bacalhau-project/bacalhau/pkg/lib/math"
"github.com/bacalhau-project/bacalhau/pkg/bidstrategy"
"github.com/bacalhau-project/bacalhau/pkg/executor"
wasmmodels "github.com/bacalhau-project/bacalhau/pkg/executor/wasm/models"
wasmlogs "github.com/bacalhau-project/bacalhau/pkg/logger/wasm"
"github.com/bacalhau-project/bacalhau/pkg/storage"
"github.com/bacalhau-project/bacalhau/pkg/storage/util"
"github.com/bacalhau-project/bacalhau/pkg/util/filefs"
"github.com/bacalhau-project/bacalhau/pkg/util/generic"
"github.com/bacalhau-project/bacalhau/pkg/util/mountfs"
"github.com/bacalhau-project/bacalhau/pkg/util/touchfs"
)
type Executor struct {
// handlers is a map of executionID to its handler.
handlers generic.SyncMap[string, *executionHandler]
}
func NewExecutor() (*Executor, error) {
return &Executor{}, nil
}
func (e *Executor) IsInstalled(context.Context) (bool, error) {
// WASM executor runs natively in Go and so is always available
return true, nil
}
func (*Executor) ShouldBid(ctx context.Context, request bidstrategy.BidStrategyRequest) (bidstrategy.BidStrategyResponse, error) {
return bidstrategy.NewBidResponse(true, "not place additional requirements on WASM jobs"), nil
}
func (*Executor) ShouldBidBasedOnUsage(
ctx context.Context,
request bidstrategy.BidStrategyRequest,
usage models.Resources,
) (bidstrategy.BidStrategyResponse, error) {
return bidstrategy.NewBidResponse(true, "not place additional requirements on WASM jobs"), nil
}
// Wazero: is compliant to WebAssembly Core Specification 1.0 and 2.0.
//
// WebAssembly1: linear memory objects have sizes measured in pages. Each page is 65536 (2^16) bytes.
// In WebAssembly version 1, a linear memory can have at most 65536 pages, for a total of 2^32 bytes (4 gibibytes).
const WasmArch = 32
const WasmPageSize = 65536
const WasmMaxPagesLimit = 1 << (WasmArch / 2)
// Start initiates an execution based on the provided RunCommandRequest.
func (e *Executor) Start(ctx context.Context, request *executor.RunCommandRequest) error {
ctx, span := system.NewSpan(ctx, system.GetTracer(), "pkg/executor/wasm.Executor.Start")
defer span.End()
if handler, found := e.handlers.Get(request.ExecutionID); found {
if handler.active() {
return fmt.Errorf("starting execution (%s): %w", request.ExecutionID, executor.ErrAlreadyStarted)
} else {
return fmt.Errorf("starting execution (%s): %w", request.ExecutionID, executor.ErrAlreadyComplete)
}
}
// Apply memory limits to the runtime. We have to do this in multiples of
// the WASM page size of 64kb, so round up to the nearest page size if the
// limit is not specified as a multiple of that.
engineConfig := wazero.NewRuntimeConfig().WithCloseOnContextDone(true)
if request.Resources.Memory > 0 {
requestedPages := request.Resources.Memory/WasmPageSize + math.Min(request.Resources.Memory%WasmPageSize, 1)
if requestedPages > WasmMaxPagesLimit {
err := fmt.Errorf("requested memory exceeds the wasm limit - %d > 4GB", request.Resources.Memory)
log.Err(err).Msgf("requested memory exceeds maximum limit: %d > %d", requestedPages, WasmMaxPagesLimit)
return err
}
engineConfig = engineConfig.WithMemoryLimitPages(uint32(requestedPages))
}
engineParams, err := wasmmodels.DecodeArguments(request.EngineParams)
if err != nil {
return fmt.Errorf("decoding wasm arguments: %w", err)
}
rootFs, err := e.makeFsFromStorage(ctx, request.ResultsDir, request.Inputs, request.Outputs)
if err != nil {
return err
}
// Create a new log manager and obtain some writers that we can pass to the wasm
// configuration
wasmLogs, err := wasmlogs.NewLogManager(ctx, request.ExecutionID)
if err != nil {
return err
}
handler := &executionHandler{
runtime: wazero.NewRuntimeWithConfig(ctx, engineConfig),
arguments: engineParams,
fs: rootFs,
inputs: request.Inputs,
executionID: request.ExecutionID,
resultsDir: request.ResultsDir,
limits: request.OutputLimits,
logger: log.With().
Str("execution", request.ExecutionID).
Str("job", request.JobID).
Str("entrypoint", engineParams.EntryPoint).
Logger(),
logManager: wasmLogs,
activeCh: make(chan bool),
waitCh: make(chan bool),
running: atomic.NewBool(false),
}
// register the handler for this executionID
e.handlers.Put(request.ExecutionID, handler)
go handler.run(ctx)
return nil
}
// Wait initiates a wait for the completion of a specific execution using its
// executionID. The function returns two channels: one for the result and another
// for any potential error. If the executionID is not found, an error is immediately
// sent to the error channel. Otherwise, an internal goroutine (doWait) is spawned
// to handle the asynchronous waiting. Callers should use the two returned channels
// to wait for the result of the execution or an error. This can be due to issues
// either beginning the wait or in getting the response. This approach allows the
// caller to synchronize Wait with calls to Start, waiting for the execution to complete.
func (e *Executor) Wait(ctx context.Context, executionID string) (<-chan *models.RunCommandResult, <-chan error) {
handler, found := e.handlers.Get(executionID)
outCh := make(chan *models.RunCommandResult, 1)
errCh := make(chan error, 1)
if !found {
errCh <- fmt.Errorf("waiting on execution (%s): %w", executionID, executor.ErrNotFound)
return outCh, errCh
}
go e.doWait(ctx, outCh, errCh, handler)
return outCh, errCh
}
// doWait is a helper function that actively waits for an execution to finish. It
// listens on the executionHandler's wait channel for completion signals. Once the
// signal is received, the result is sent to the provided output channel. If there's
// a cancellation request (context is done) before completion, an error is relayed to
// the error channel. If the execution result is nil, an error suggests a potential
// flaw in the executor logic.
func (e *Executor) doWait(ctx context.Context, out chan *models.RunCommandResult, errCh chan error, handle *executionHandler) {
log.Info().Str("executionID", handle.executionID).Msg("waiting on execution")
defer close(out)
defer close(errCh)
select {
case <-ctx.Done():
errCh <- ctx.Err() // Send the cancellation error to the error channel
return
case <-handle.waitCh:
log.Info().Str("executionID", handle.executionID).Msg("received results from execution")
if handle.result != nil {
out <- handle.result
} else {
errCh <- fmt.Errorf("execution result is nil")
}
}
}
// Cancel tries to cancel a specific execution by its executionID.
// It returns an error if the execution is not found.
func (e *Executor) Cancel(ctx context.Context, executionID string) error {
handler, found := e.handlers.Get(executionID)
if !found {
return fmt.Errorf("canceling execution (%s): %w", executionID, executor.ErrNotFound)
}
return handler.kill(ctx)
}
// GetOutputStream provides a stream of output logs for a specific execution.
// Parameters 'withHistory' and 'follow' control whether to include past logs
// and whether to keep the stream open for new logs, respectively.
// It returns an error if the execution is not found.
func (e *Executor) GetLogStream(ctx context.Context, request executor.LogStreamRequest) (io.ReadCloser, error) {
handler, found := e.handlers.Get(request.ExecutionID)
if !found {
return nil, fmt.Errorf("getting outputs for execution (%s): %w", request.ExecutionID, executor.ErrNotFound)
}
return handler.outputStream(ctx, request)
}
// Run initiates and waits for the completion of an execution in one call.
// This method serves as a higher-level convenience function that
// internally calls Start and Wait methods.
// It returns the result of the execution or an error if either starting
// or waiting fails, or if the context is canceled.
func (e *Executor) Run(
ctx context.Context,
request *executor.RunCommandRequest,
) (*models.RunCommandResult, error) {
if err := e.Start(ctx, request); err != nil {
return nil, err
}
resCh, errCh := e.Wait(ctx, request.ExecutionID)
select {
case <-ctx.Done():
return nil, ctx.Err()
case out := <-resCh:
return out, nil
case err := <-errCh:
return nil, err
}
}
// makeFsFromStorage sets up a virtual filesystem (represented by an fs.FS) that
// will be the filesystem exposed to our WASM. The strategy for this is to:
//
// - mount each input at the name specified by Path
// - make a directory in the job results directory for each output and mount that
// at the name specified by Name
func (e *Executor) makeFsFromStorage(
ctx context.Context,
jobResultsDir string,
volumes []storage.PreparedStorage,
outputs []*models.ResultPath) (fs.FS, error) {
var err error
rootFs := mountfs.New()
for _, v := range volumes {
log.Ctx(ctx).Debug().
Str("input", v.InputSource.Target).
Str("source", v.Volume.Source).
Msg("Using input")
var stat os.FileInfo
stat, err = os.Stat(v.Volume.Source)
if err != nil {
return nil, err
}
var inputFs fs.FS
if stat.IsDir() {
inputFs = os.DirFS(v.Volume.Source)
} else {
inputFs = filefs.New(v.Volume.Source)
}
err = rootFs.Mount(v.InputSource.Target, inputFs)
if err != nil {
return nil, err
}
}
for _, output := range outputs {
if output.Name == "" {
return nil, fmt.Errorf("output volume has no name: %+v", output)
}
if output.Path == "" {
return nil, fmt.Errorf("output volume has no path: %+v", output)
}
srcd := filepath.Join(jobResultsDir, output.Name)
log.Ctx(ctx).Debug().
Str("output", output.Name).
Str("dir", srcd).
Msg("Collecting output")
err = os.Mkdir(srcd, util.OS_ALL_R|util.OS_ALL_X|util.OS_USER_W)
if err != nil {
return nil, err
}
err = rootFs.Mount(output.Name, touchfs.New(srcd))
if err != nil {
return nil, err
}
}
return rootFs, nil
}
// Compile-time check that Executor implements the Executor interface.
var _ executor.Executor = (*Executor)(nil)