/
compile.go
398 lines (377 loc) · 12.3 KB
/
compile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
// Copyright 2018 GRAIL, Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
package exec
import (
"context"
"fmt"
"strings"
"github.com/grailbio/base/log"
"github.com/grailbio/bigslice"
"github.com/grailbio/bigslice/frame"
"github.com/grailbio/bigslice/internal/slicecache"
"github.com/grailbio/bigslice/slicefunc"
"github.com/grailbio/bigslice/sliceio"
)
func defaultPartitioner(_ context.Context, frame frame.Frame, nshard int, shards []int) {
for i := range shards {
shards[i] = int(frame.Hash(i) % uint32(nshard))
}
}
// Pipeline returns the sequence of slices that may be pipelined
// starting from slice. Slices that do not have shuffle dependencies
// may be pipelined together: slices[0] depends on slices[1], and so on.
func pipeline(slice bigslice.Slice) (slices []bigslice.Slice) {
for {
// Stop at *Results, so we can re-use previous tasks.
if _, ok := bigslice.Unwrap(slice).(*Result); ok {
return
}
slices = append(slices, slice)
if slice.NumDep() != 1 {
return
}
dep := slice.Dep(0)
if dep.Shuffle {
return
}
if pragma, ok := dep.Slice.(bigslice.Pragma); ok && pragma.Materialize() {
return
}
slice = dep.Slice
}
}
// memoKey is the memo key for memoized slice compilations.
type memoKey struct {
slice bigslice.Slice
// numPartition is the number of partitions in the output of the memoized
// compiled tasks.
numPartition int
}
// partitioner configures the output partitioning of compiled tasks. The zero
// value indicates that the output of the tasks are not for a shuffle
// dependency.
type partitioner struct {
// numPartition is the number of partitions in the output for a shuffle
// dependency, if >1. If 0, the output is not used by a shuffle.
numPartition int
partitioner bigslice.Partitioner
Combiner slicefunc.Func
CombineKey string
}
// IsShuffle returns whether the task output is used by a shuffle dependency.
func (p partitioner) IsShuffle() bool {
return p.numPartition != 0
}
// Partitioner returns the partitioner to be used to partition the output of
// this task.
func (p partitioner) Partitioner() bigslice.Partitioner {
if p.partitioner == nil {
return defaultPartitioner
}
return p.partitioner
}
// NumPartition returns the number of partitions that the task output should
// have. If this is not a shuffle dependency, returns 1.
func (p partitioner) NumPartition() int {
if p.numPartition == 0 {
return 1
}
return p.numPartition
}
// Compile compiles the provided slice into a set of task graphs,
// each representing the computation for one shard of the slice. The
// slice is produced by the provided invocation. Compile coalesces
// slice operations that can be pipelined into single tasks, creating
// wide dependencies only at shuffle boundaries. The provided namer
// must mint names that are unique to the session. The order in which
// the namer is invoked is guaranteed to be deterministic.
//
// TODO(marius): we don't currently reuse tasks across compilations,
// even though this could sometimes safely be done (when the number
// of partitions and the kind of partitioner matches at shuffle
// boundaries). We should at least support this use case to avoid
// redundant computations.
//
// TODO(marius): an alternative model for propagating invocations is
// to provide each actual invocation with a "root" slice from where
// all other slices must be derived. This simplifies the
// implementation but may make the API a little confusing.
func compile(inv execInvocation, slice bigslice.Slice, machineCombiners bool) (tasks []*Task, err error) {
c := compiler{
namer: make(taskNamer),
inv: inv,
machineCombiners: machineCombiners,
memo: make(map[memoKey][]*Task),
}
// Top-level compilation always produces tasks that write single partitions,
// as they are materialized and will not be used as direct shuffle
// dependencies.
tasks, err = c.compile(slice, partitioner{})
return
}
type (
// CompileEnv is the environment for compilation. This environment should
// capture all external state that can affect compilation of an invocation.
// It is shared across compilations of the same invocation (e.g. on worker
// nodes) to guarantee consistent compilation results. This is a
// requirement of bigslice's computation model, as we assume that all nodes
// share the same view of the task graph. It must be gob-encodable for
// transport to workers.
CompileEnv struct {
// Writable is true if this environment is writable. It is only
// exported so that it can be gob-{en,dec}oded.
Writable bool
// Cached indicates whether a task operation's results can be read from
// cache. An "operation" is one of the pipelined elements that a task
// may perform. It is only exported so that it can be gob-{en,dec}oded.
Cached map[taskOp]bool
}
// taskOp is a (task, operation) pair. It is used as the key of
// (CompileEnv).Cached.
taskOp struct {
// N is the task, specified by name.
N TaskName
// OpIdx is the operation, specified by index in the task processing
// pipeline.
OpIdx int
}
)
// makeCompileEnv returns an empty and writable CompileEnv that can be passed
// to compile.
func makeCompileEnv() CompileEnv {
return CompileEnv{
Writable: true,
Cached: make(map[taskOp]bool),
}
}
// MarkCached marks the (task, operation) given by (n, opIdx) as cached.
func (e CompileEnv) MarkCached(n TaskName, opIdx int) {
if !e.Writable {
panic("env not writable")
}
e.Cached[taskOp{n, opIdx}] = true
}
// IsCached returns whether the (task, operation) given by (n, opIdx) is
// cached.
func (e CompileEnv) IsCached(n TaskName, opIdx int) bool {
return e.Cached[taskOp{n, opIdx}]
}
// Freeze freezes the state, marking e no longer writable.
func (e *CompileEnv) Freeze() {
e.Writable = false
}
// IsWritable returns whether this environment is writable.
func (e CompileEnv) IsWritable() bool {
return e.Writable
}
type compiler struct {
namer taskNamer
inv execInvocation
machineCombiners bool
memo map[memoKey][]*Task
}
// compile compiles the provided slice into a set of task graphs, memoizing the
// compilation so that tasks can be reused within the invocation.
func (c *compiler) compile(slice bigslice.Slice, part partitioner) (tasks []*Task, err error) {
// We never reuse combiner tasks, as we currently don't have a way of
// identifying equivalent combiner functions. Ditto with custom
// partitioners.
if part.Combiner.IsNil() && part.partitioner == nil {
// TODO(jcharumilind): Repartition already-computed data instead of
// forcing recomputation of the slice if we get a different
// numPartition.
key := memoKey{slice: slice, numPartition: part.numPartition}
if memoTasks, ok := c.memo[key]; ok {
// We're compiling the same slice with the same number of partitions
// (and no combiner), so we can safely reuse the tasks.
return memoTasks, nil
}
defer func() {
if err != nil {
return
}
c.memo[key] = tasks
}()
}
// Beyond this point, any tasks used for shuffles are new and need to have
// task groups set up for phasic evaluation.
defer func() {
if part.IsShuffle() {
for _, task := range tasks {
task.Group = tasks
}
}
}()
// Reuse tasks from a previous invocation.
if result, ok := bigslice.Unwrap(slice).(*Result); ok {
for _, task := range result.tasks {
if !task.Combiner.IsNil() {
// TODO(marius): we may consider supporting this, but it should
// be very rare, since it requires the user to explicitly reuse
// an intermediate slice, which is impossible via the current
// API.
return nil, fmt.Errorf("cannot reuse task %s with combine key %s", task, task.CombineKey)
}
}
if !part.IsShuffle() {
tasks = result.tasks
return
}
// We now insert a set of tasks whose only purpose is (re-)shuffling
// the output from the previously completed task.
shuffleOpName := c.namer.New(fmt.Sprintf("%s_shuffle", result.tasks[0].Name.Op))
tasks = make([]*Task, len(result.tasks))
for shard, task := range result.tasks {
tasks[shard] = &Task{
Type: slice,
Invocation: c.inv,
Name: TaskName{
InvIndex: c.inv.Index,
Op: shuffleOpName,
Shard: shard,
NumShard: len(result.tasks),
},
Do: func(readers []sliceio.Reader) sliceio.Reader { return readers[0] },
Deps: []TaskDep{{task, 0, false, ""}},
Pragma: task.Pragma,
Slices: task.Slices,
}
}
return
}
// Pipeline slices and create a task for each underlying shard, pipelining
// the eligible computations.
slices := pipeline(slice)
defer func() {
for _, task := range tasks {
task.Slices = slices
}
}()
var pragmas bigslice.Pragmas
ops := make([]string, 0, len(slices)+1)
ops = append(ops, fmt.Sprintf("inv%d", c.inv.Index))
for i := len(slices) - 1; i >= 0; i-- {
ops = append(ops, slices[i].Name().Op)
if pragma, ok := slices[i].(bigslice.Pragma); ok {
pragmas = append(pragmas, pragma)
}
}
opName := c.namer.New(strings.Join(ops, "_"))
tasks = make([]*Task, slice.NumShard())
for i := range tasks {
tasks[i] = &Task{
Type: slices[0],
Name: TaskName{
InvIndex: c.inv.Index,
Op: opName,
Shard: i,
NumShard: len(tasks),
},
Invocation: c.inv,
Pragma: pragmas,
NumPartition: part.NumPartition(),
Partitioner: part.Partitioner(),
Combiner: part.Combiner,
CombineKey: part.CombineKey,
}
}
// Capture the dependencies for this task set; they are encoded in the last
// slice.
lastSlice := slices[len(slices)-1]
for i := 0; i < lastSlice.NumDep(); i++ {
dep := lastSlice.Dep(i)
if !dep.Shuffle {
depTasks, err := c.compile(dep.Slice, partitioner{})
if err != nil {
return nil, err
}
if len(tasks) != len(depTasks) {
log.Panicf("tasks:%d deptasks:%d", len(tasks), len(depTasks))
}
for shard := range tasks {
tasks[shard].Deps = append(tasks[shard].Deps,
TaskDep{depTasks[shard], 0, dep.Expand, ""})
}
continue
}
var combineKey string
if !lastSlice.Combiner().IsNil() && c.machineCombiners {
combineKey = opName
}
depPart := partitioner{
slice.NumShard(), dep.Partitioner,
lastSlice.Combiner(), combineKey,
}
depTasks, err := c.compile(dep.Slice, depPart)
if err != nil {
return nil, err
}
// Each shard reads different partitions from all of the previous slice's shards.
for partition := range tasks {
tasks[partition].Deps = append(tasks[partition].Deps,
TaskDep{depTasks[0], partition, dep.Expand, combineKey})
}
}
// Pipeline execution, folding multiple frame operations
// into a single task by composing their readers.
// Use cache when configured.
for opIdx := len(slices) - 1; opIdx >= 0; opIdx-- {
var (
pprofLabel = fmt.Sprintf("%s(%s)", slices[opIdx].Name(), c.inv.Location)
reader = slices[opIdx].Reader
shardCache = slicecache.Empty
)
if c, ok := bigslice.Unwrap(slices[opIdx]).(slicecache.Cacheable); ok {
shardCache = c.Cache()
}
if c.inv.Env.IsWritable() {
for shard, task := range tasks {
if shardCache.IsCached(shard) {
c.inv.Env.MarkCached(task.Name, opIdx)
}
}
}
for shard, task := range tasks {
var (
shard = shard
prev = task.Do
)
if c.inv.Env.IsCached(task.Name, opIdx) {
task.Do = func([]sliceio.Reader) sliceio.Reader {
r := shardCache.CacheReader(shard)
return &sliceio.PprofReader{Reader: r, Label: pprofLabel}
}
// Forget task dependencies for cached shards because we'll read
// from the cache file.
task.Deps = nil
continue
}
if prev == nil {
// First, read the input directly.
task.Do = func(readers []sliceio.Reader) sliceio.Reader {
r := reader(shard, readers)
r = shardCache.WritethroughReader(shard, r)
return &sliceio.PprofReader{Reader: r, Label: pprofLabel}
}
} else {
// Subsequently, read the previous pipelined slice's output.
task.Do = func(readers []sliceio.Reader) sliceio.Reader {
r := reader(shard, []sliceio.Reader{prev(readers)})
r = shardCache.WritethroughReader(shard, r)
return &sliceio.PprofReader{Reader: r, Label: pprofLabel}
}
}
}
}
return
}
type taskNamer map[string]int
func (n taskNamer) New(name string) string {
c := n[name]
n[name]++
if c == 0 {
return name
}
return fmt.Sprintf("%s%d", name, c)
}