/
pool.go
258 lines (230 loc) · 7.69 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
// Copyright 2017 GRAIL, Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.
// Package pool implements resource pools for reflow. Reflow manages
// resources in units of "allocs" -- an a resource allocation that
// exists on a single machine, and to which is attached a shared
// repository with the results of all execs within that Alloc. Allocs
// are leased-- they must be kept alive to guarantee continuity; they
// are collected as a unit.
package pool
import (
"container/heap"
"context"
"fmt"
"math/rand"
"os/user"
"github.com/grailbio/reflow"
"github.com/grailbio/reflow/errors"
"github.com/grailbio/reflow/log"
"golang.org/x/sync/errgroup"
)
// Offer represents an offer of resources, from which an Alloc can be created.
type Offer interface {
// ID returns the ID of the offer. It is an opaque string.
ID() string
// Pool returns the pool from which this Offer is extended.
Pool() Pool
// Available returns the amount of total available resources
// that can be accepted.
Available() reflow.Resources
// Accept accepts this Offer with the given Alloc metadata. The
// metadata includes how many resources are requested. Accept may
// return ErrOfferExpired if another client accepted the offer
// first.
Accept(ctx context.Context, meta AllocMeta) (Alloc, error)
}
// OfferJSON is the JSON structure used to describe offers.
type OfferJSON struct {
// The ID of the offer.
ID string
// The amount of available resources the offer represents.
Available reflow.Resources
}
// Pool is a resource pool which manages a set of allocs.
type Pool interface {
// ID returns the ID of the pool. It is an opaque string.
ID() string
// Alloc returns the Alloc named by an ID.
Alloc(ctx context.Context, id string) (Alloc, error)
// Allocs enumerates the available Allocs in this Pool.
Allocs(ctx context.Context) ([]Alloc, error)
// Offer returns the Offer identified by an id.
Offer(ctx context.Context, id string) (Offer, error)
// Offers returns the set of current Offers from this Pool.
// TODO(marius): it would be good to have a scanning/long-poll
// version of this so that clients do not have to do their own polling.
Offers(ctx context.Context) ([]Offer, error)
}
var (
errUnavailable = errors.New("no allocs available in pool")
errTooManyTries = errors.New("too many tries")
)
// Allocate attempts to place an Alloc on a pool with the given
// resource requirements.
func Allocate(ctx context.Context, pool Pool, req reflow.Requirements, labels Labels) (Alloc, error) {
const maxRetries = 6
for n := 0; n < maxRetries; n++ {
alloc, err := allocate(ctx, pool, req, labels)
if err == nil {
return alloc, nil
}
if err != errUnavailable {
return nil, errors.E(errors.Unavailable, err)
}
}
return nil, errors.E(errors.Unavailable, errTooManyTries)
}
func allocate(ctx context.Context, pool Pool, req reflow.Requirements, labels Labels) (Alloc, error) {
offers, err := pool.Offers(ctx)
if err != nil {
return nil, err
}
// We shuffle the list of offers so that if we have a large number of equal offers,
// different callers of `allocate` do not end up competing for the same set of offers.
// Offers are ordered by "best match" before being chosen from anyway (see `pickN`),
// so this shuffling doesn't impact performance if offers are largely diverse.
rand.Shuffle(len(offers), func(i, j int) { offers[i], offers[j] = offers[j], offers[i] })
// maxOffersToConsider is the maximum number of offers to consider. This restricts the size
// of the heap built by `pickN` so that the complexity is `P log N` instead of `P log P`,
// where `P = len(offers)` and `N = maxOffersToConsider`.
const maxOffersToConsider = 10
ordered := pickN(offers, maxOffersToConsider, req.Min, req.Max())
for _, pick := range ordered {
// pick the smallest of max and what's available. If memory, disk,
// or CPU are left zero, we grab the whole alloc so that we don't
// unnecessarily leave resources on the table; they can become
// useful later in execution, and it leaves the rest of the offer
// unusable anyway. We do the same if it's a wide request.
avail := pick.Available()
var want reflow.Resources
want.Min(req.Max(), avail)
var tmp reflow.Resources
tmp.Sub(avail, want)
if tmp["cpu"] <= 0 || tmp["mem"] <= 0 || tmp["disk"] <= 0 {
want.Set(avail)
}
// Let's not ask for nothing
if want.Equal(nil) {
continue
}
meta := AllocMeta{Want: want, Labels: labels}
// TODO(marius): include more flow metadata here.
// (expr, parameters, etc.)
u, err := user.Current()
if err != nil {
meta.Owner = "<unknown>"
} else {
meta.Owner = fmt.Sprintf("%s <%s>", u.Name, u.Username)
}
alloc, err := pick.Accept(ctx, meta)
if err == nil {
return alloc, err
}
if !errors.Is(errors.NotExist, err) {
return nil, err
}
}
return nil, errUnavailable
}
// Allocs fetches all of the allocs from the provided pool. If it
// encounters any failure (e.g., due to a context timeout), they are
// logged, but ignored. The returned slice contains all the
// successfuly fetched allocs.
func Allocs(ctx context.Context, pool Pool, log *log.Logger) []Alloc {
p, ok := pool.(interface {
Pools() []Pool
})
if !ok {
allocs, err := pool.Allocs(ctx)
if err != nil {
log.Errorf("allocs %v: %v", pool, err)
}
return allocs
}
pools := p.Pools()
allocss := make([][]Alloc, len(pools))
g, ctx := errgroup.WithContext(ctx)
for i := range pools {
i := i
g.Go(func() error {
var err error
allocss[i], err = pools[i].Allocs(ctx)
if err != nil {
log.Errorf("allocs %v: %v", pools[i], err)
}
return nil
})
}
g.Wait()
var allocs []Alloc
for _, a := range allocss {
allocs = append(allocs, a...)
}
return allocs
}
// pickN returns upto n offers in decreasing order of "best match" defined as follows:
// - all offers >= max appear first, in increasing order of distance from max.
// - offers less than max appear next, again in increasing order of distance from max.
// - offers less than min are omitted.
func pickN(offers []Offer, n int, min, max reflow.Resources) []Offer {
q := &offerq{max: max}
for _, offer := range offers {
if !offer.Available().Available(min) {
continue
}
heap.Push(q, offer)
// prune the heap if larger than n.
if q.Len() > n {
heap.Pop(q)
}
}
// return the reverse of the queue.
ordered := make([]Offer, q.Len())
for i := len(ordered) - 1; i >= 0; i-- {
x := heap.Pop(q)
ordered[i] = x.(Offer)
}
return ordered
}
// offerq implements a priority queue of offers, ordered in the following manner:
// - offers less than max appear first, in decreasing order of distance from max.
// - offers >= max appear next, again in decreasing order of distance from max.
type offerq struct {
max reflow.Resources
offers []Offer
}
// Len implements sort.Interface/heap.Interface.
func (q offerq) Len() int { return len(q.offers) }
// Less implements sort.Interface/heap.Interface.
func (q offerq) Less(i, j int) bool {
ri, rj := q.offers[i].Available(), q.offers[j].Available()
availi, availj := ri.Available(q.max), rj.Available(q.max)
disti, distj := ri.ScaledDistance(q.max), rj.ScaledDistance(q.max)
switch {
case availi && availj:
return disti > distj
case !availi && !availj:
return disti > distj
case !availi:
return true
}
return false
}
// Swap implements heap.Interface/sort.Interface
func (q offerq) Swap(i, j int) {
q.offers[i], q.offers[j] = q.offers[j], q.offers[i]
}
// Push implements heap.Interface.
func (q *offerq) Push(x interface{}) {
o := x.(Offer)
q.offers = append(q.offers, o)
}
// Pop implements heap.Interface.
func (q *offerq) Pop() interface{} {
old := q.offers
n := len(old)
x := old[n-1]
q.offers = old[0 : n-1]
return x
}