-
Notifications
You must be signed in to change notification settings - Fork 405
/
resource.go
609 lines (489 loc) · 15.1 KB
/
resource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
package agent
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"github.com/fnproject/fn/api/models"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
const (
Mem1MB = 1024 * 1024
Mem1GB = 1024 * 1024 * 1024
// Assume 2GB RAM on non-linux systems
DefaultNonLinuxMemory = 2048 * Mem1MB
)
var CapacityFull = errors.New("max capacity reached")
type ResourceUtilization struct {
// CPU in use
CpuUsed models.MilliCPUs
// CPU available
CpuAvail models.MilliCPUs
// Memory in use in bytes
MemUsed uint64
// Memory available in bytes
MemAvail uint64
}
// A simple resource (memory, cpu, disk, etc.) tracker for scheduling.
// TODO: disk, network IO for future
type ResourceTracker interface {
// WaitAsyncResource returns a channel that will send once when there seem to be sufficient
// resource levels to run an async task, it is up to the implementer to create policy here.
WaitAsyncResource(ctx context.Context) chan struct{}
// GetResourceToken returns a channel to wait for a resource token on. If the provided context is canceled,
// the channel will never receive anything. If it is not possible to fulfill this resource, the channel
// will never receive anything (use IsResourcePossible). If a resource token is available for the provided
// resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed.
// if isNB is set, resource check is done and error token is returned without blocking.
// Memory is expected to be provided in MB units.
GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isNB bool) <-chan ResourceToken
// IsResourcePossible returns whether it's possible to fulfill the requested resources on this
// machine. It must be called before GetResourceToken or GetResourceToken may hang.
// Memory is expected to be provided in MB units.
IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs) bool
// Retrieve current stats/usage
GetUtilization() ResourceUtilization
}
type resourceTracker struct {
// cond protects access to ram variables below
cond *sync.Cond
// ramTotal is the total usable memory for functions
ramTotal uint64
// ramUsed is ram reserved for running containers including hot/idle
ramUsed uint64
// memory in use in which agent stops dequeuing async jobs
ramAsyncHWMark uint64
// cpuTotal is the total usable cpu for functions
cpuTotal uint64
// cpuUsed is cpu reserved for running containers including hot/idle
cpuUsed uint64
// cpu in use in which agent stops dequeuing async jobs
cpuAsyncHWMark uint64
}
func NewResourceTracker(cfg *Config) ResourceTracker {
obj := &resourceTracker{
cond: sync.NewCond(new(sync.Mutex)),
}
obj.initializeMemory(cfg)
obj.initializeCPU(cfg)
return obj
}
type ResourceToken interface {
// Close must be called by any thread that receives a token.
io.Closer
Error() error
NeededCapacity() (uint64, models.MilliCPUs)
}
type resourceToken struct {
once sync.Once
err error
needCpu models.MilliCPUs
needMem uint64
decrement func()
}
func (t *resourceToken) Error() error {
return t.err
}
func (t *resourceToken) NeededCapacity() (uint64, models.MilliCPUs) {
return t.needMem, t.needCpu
}
func (t *resourceToken) Close() error {
t.once.Do(func() {
if t.decrement != nil {
t.decrement()
}
})
return nil
}
func (a *resourceTracker) isResourceAvailableLocked(memory uint64, cpuQuota models.MilliCPUs) bool {
availMem := a.ramTotal - a.ramUsed
availCPU := a.cpuTotal - a.cpuUsed
return availMem >= memory && availCPU >= uint64(cpuQuota)
}
func (a *resourceTracker) GetUtilization() ResourceUtilization {
var util ResourceUtilization
a.cond.L.Lock()
util.CpuUsed = models.MilliCPUs(a.cpuUsed)
util.MemUsed = a.ramUsed
a.cond.L.Unlock()
util.CpuAvail = models.MilliCPUs(a.cpuTotal) - util.CpuUsed
util.MemAvail = a.ramTotal - util.MemUsed
return util
}
// is this request possible to meet? If no, fail quick
func (a *resourceTracker) IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs) bool {
memory = memory * Mem1MB
return memory <= a.ramTotal && uint64(cpuQuota) <= a.cpuTotal
}
func (a *resourceTracker) allocResourcesLocked(memory uint64, cpuQuota models.MilliCPUs) ResourceToken {
a.ramUsed += memory
a.cpuUsed += uint64(cpuQuota)
return &resourceToken{decrement: func() {
a.cond.L.Lock()
a.ramUsed -= memory
a.cpuUsed -= uint64(cpuQuota)
a.cond.L.Unlock()
// WARNING: yes, we wake up everyone even async waiters when only sync pool has space, but
// the cost of this spurious wake up is unlikely to impact much performance. Simpler
// to use one cond variable for the time being.
a.cond.Broadcast()
}}
}
func (a *resourceTracker) getResourceTokenNB(memory uint64, cpuQuota models.MilliCPUs) ResourceToken {
if !a.IsResourcePossible(memory, cpuQuota) {
return &resourceToken{err: CapacityFull, needCpu: cpuQuota, needMem: memory}
}
memory = memory * Mem1MB
var t ResourceToken
var needMem uint64
var needCpu models.MilliCPUs
a.cond.L.Lock()
availMem := a.ramTotal - a.ramUsed
availCPU := a.cpuTotal - a.cpuUsed
if availMem >= memory && availCPU >= uint64(cpuQuota) {
t = a.allocResourcesLocked(memory, cpuQuota)
} else {
if availMem < memory {
needMem = (memory - availMem) / Mem1MB
}
if availCPU < uint64(cpuQuota) {
needCpu = models.MilliCPUs(uint64(cpuQuota) - availCPU)
}
t = &resourceToken{err: CapacityFull, needCpu: needCpu, needMem: needMem}
}
a.cond.L.Unlock()
return t
}
func (a *resourceTracker) getResourceTokenNBChan(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs) <-chan ResourceToken {
ctx, span := trace.StartSpan(ctx, "agent_get_resource_token_nbio_chan")
ch := make(chan ResourceToken)
go func() {
defer span.End()
t := a.getResourceTokenNB(memory, cpuQuota)
select {
case ch <- t:
case <-ctx.Done():
// if we can't send b/c nobody is waiting anymore, need to decrement here
t.Close()
}
}()
return ch
}
// the received token should be passed directly to launch (unconditionally), launch
// will close this token (i.e. the receiver should not call Close)
func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs, isNB bool) <-chan ResourceToken {
if isNB {
return a.getResourceTokenNBChan(ctx, memory, cpuQuota)
}
ch := make(chan ResourceToken)
if !a.IsResourcePossible(memory, cpuQuota) {
// return the channel, but never send anything.
return ch
}
c := a.cond
isWaiting := false
memory = memory * Mem1MB
// if we find a resource token, shut down the thread waiting on ctx finish.
// alternatively, if the ctx is done, wake up the cond loop.
ctx, cancel := context.WithCancel(ctx)
go func() {
<-ctx.Done()
c.L.Lock()
if isWaiting {
c.Broadcast()
}
c.L.Unlock()
}()
ctx, span := trace.StartSpan(ctx, "agent_get_resource_token")
go func() {
defer span.End()
defer cancel()
c.L.Lock()
isWaiting = true
for !a.isResourceAvailableLocked(memory, cpuQuota) && ctx.Err() == nil {
c.Wait()
}
isWaiting = false
if ctx.Err() != nil {
c.L.Unlock()
return
}
t := a.allocResourcesLocked(memory, cpuQuota)
c.L.Unlock()
select {
case ch <- t:
case <-ctx.Done():
// if we can't send b/c nobody is waiting anymore, need to decrement here
t.Close()
}
}()
return ch
}
// WaitAsyncResource will send a signal on the returned channel when RAM and CPU in-use
// in the async area is less than high water mark
func (a *resourceTracker) WaitAsyncResource(ctx context.Context) chan struct{} {
ch := make(chan struct{}, 1)
isWaiting := false
c := a.cond
// if we find a resource token, shut down the thread waiting on ctx finish.
// alternatively, if the ctx is done, wake up the cond loop.
ctx, cancel := context.WithCancel(ctx)
go func() {
<-ctx.Done()
c.L.Lock()
if isWaiting {
c.Broadcast()
}
c.L.Unlock()
}()
ctx, span := trace.StartSpan(ctx, "agent_wait_async_resource")
go func() {
defer span.End()
defer cancel()
c.L.Lock()
isWaiting = true
for (a.ramUsed >= a.ramAsyncHWMark || a.cpuUsed >= a.cpuAsyncHWMark) && ctx.Err() == nil {
c.Wait()
}
isWaiting = false
c.L.Unlock()
if ctx.Err() == nil {
ch <- struct{}{}
}
}()
return ch
}
func minUint64(a, b uint64) uint64 {
if a <= b {
return a
}
return b
}
func maxUint64(a, b uint64) uint64 {
if a >= b {
return a
}
return b
}
func clampUint64(val, min, max uint64) uint64 {
val = minUint64(val, max)
val = maxUint64(val, min)
return val
}
func (a *resourceTracker) initializeCPU(cfg *Config) {
// Use all available CPU from go.runtime in non-linux systems. We ignore
// non-linux container implementations and their limits on CPU if there's any.
// (This is also the default if we cannot determine limits from proc or sysfs)
totalCPU := uint64(runtime.NumCPU() * 1000)
availCPU := totalCPU
if runtime.GOOS == "linux" {
// Why do we prefer /proc/cpuinfo for Linux and not just use runtime.NumCPU?
// This is because NumCPU is sched_getaffinity based and we prefer to check
// cgroup which will more likely be same cgroup for container runtime
numCPU, err := checkProcCPU()
if err != nil {
logrus.WithError(err).Error("Error checking for CPU, falling back to runtime CPU count.")
} else {
totalCPU = 1000 * numCPU
availCPU = totalCPU
}
// Clamp further if cgroups CFS quota/period limits are in place
cgroupCPU := checkCgroupCPU()
if cgroupCPU > 0 {
availCPU = minUint64(availCPU, cgroupCPU)
}
// TODO: check cgroup cpuset to clamp this further. We might be restricted into
// a subset of CPUs. (eg. /sys/fs/cgroup/cpuset/cpuset.effective_cpus)
// TODO: skip CPU headroom for ourselves for now
}
// now based on cfg, further clamp on calculated values
if cfg != nil && cfg.MaxTotalCPU != 0 {
availCPU = minUint64(cfg.MaxTotalCPU, availCPU)
}
logrus.WithFields(logrus.Fields{
"total_cpu": totalCPU,
"avail_cpu": availCPU,
}).Info("available cpu")
a.cpuTotal = availCPU
a.cpuAsyncHWMark = availCPU * 8 / 10
logrus.WithFields(logrus.Fields{
"cpu": a.cpuTotal,
"cpu_async_hw_mark": a.cpuAsyncHWMark,
}).Info("cpu reservations")
if a.cpuTotal == 0 {
logrus.Fatal("Cannot get the proper CPU information to size server")
}
if a.cpuTotal < 1000 {
logrus.Warn("Severaly Limited CPU: cpu < 1000m (1 CPU)")
}
}
func (a *resourceTracker) initializeMemory(cfg *Config) {
availMemory := uint64(DefaultNonLinuxMemory)
if runtime.GOOS == "linux" {
// system wide available memory
totalMemory, err := checkProcMem()
if err != nil {
logrus.WithError(err).Fatal("Cannot get the proper memory information to size server.")
}
availMemory = totalMemory
// cgroup limit restriction on memory usage
cGroupLimit, err := checkCgroupMem()
if err != nil {
logrus.WithError(err).Error("Error checking for cgroup memory limits, falling back to host memory available..")
} else {
availMemory = minUint64(cGroupLimit, availMemory)
}
// clamp the available memory by head room (for docker, ourselves, other processes)
headRoom, err := getMemoryHeadRoom(availMemory, cfg)
if err != nil {
logrus.WithError(err).Fatal("Out of memory")
}
availMemory = availMemory - headRoom
logrus.WithFields(logrus.Fields{
"total_memory": totalMemory,
"head_room": headRoom,
"cgroup_limit": cGroupLimit,
}).Info("available memory")
}
// now based on cfg, further clamp on calculated values
if cfg != nil && cfg.MaxTotalMemory != 0 {
availMemory = minUint64(cfg.MaxTotalMemory, availMemory)
}
a.ramTotal = availMemory
a.ramAsyncHWMark = availMemory * 8 / 10
// For non-linux OS, we expect these (or their defaults) properly configured from command-line/env
logrus.WithFields(logrus.Fields{
"avail_memory": a.ramTotal,
"ram_async_hw_mark": a.ramAsyncHWMark,
}).Info("ram reservations")
if a.ramTotal == 0 {
logrus.Fatal("Cannot get the proper memory pool information to size server")
}
if a.ramTotal < 256*Mem1MB {
logrus.Warn("Severely Limited memory: ram < 256MB")
}
}
// headroom estimation in order not to consume entire RAM if possible
func getMemoryHeadRoom(usableMemory uint64, cfg *Config) (uint64, error) {
// get %10 of the RAM
headRoom := uint64(usableMemory / 10)
// TODO: improve this pre-fork calculation, we should fetch/query this
// instead of estimate below.
// if pre-fork pool is enabled, add 1 MB per pool-item
if cfg != nil && cfg.PreForkPoolSize != 0 {
headRoom += Mem1MB * cfg.PreForkPoolSize
}
// TODO: improve these calculations.
// clamp this with 256MB min -- 5GB max
maxHeadRoom := uint64(5 * Mem1GB)
minHeadRoom := uint64(256 * Mem1MB)
if minHeadRoom >= usableMemory {
return 0, fmt.Errorf("Not enough memory: %v", usableMemory)
}
headRoom = clampUint64(headRoom, minHeadRoom, maxHeadRoom)
return headRoom, nil
}
func readString(fileName string) (string, error) {
b, err := ioutil.ReadFile(filepath.Clean(fileName))
if err != nil {
return "", err
}
value := string(b)
return strings.TrimSpace(value), nil
}
func checkCgroupMem() (uint64, error) {
value, err := readString("/sys/fs/cgroup/memory/memory.limit_in_bytes")
if err != nil {
return 0, err
}
return strconv.ParseUint(value, 10, 64)
}
func checkCgroupCPU() uint64 {
periodStr, err := readString("/sys/fs/cgroup/cpu/cpu.cfs_period_us")
if err != nil {
return 0
}
quotaStr, err := readString("/sys/fs/cgroup/cpu/cpu.cfs_quota_us")
if err != nil {
return 0
}
period, err := strconv.ParseUint(periodStr, 10, 64)
if err != nil {
logrus.Warn("Cannot parse CFS period", err)
return 0
}
quota, err := strconv.ParseInt(quotaStr, 10, 64)
if err != nil {
logrus.Warn("Cannot parse CFS quota", err)
return 0
}
if quota <= 0 || period <= 0 {
return 0
}
return uint64(quota) * 1000 / period
}
var errCantReadMemInfo = errors.New("Didn't find MemAvailable in /proc/meminfo, kernel is probably < 3.14")
func checkProcMem() (uint64, error) {
f, err := os.Open("/proc/meminfo")
if err != nil {
return 0, err
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
b := scanner.Text()
if !strings.HasPrefix(b, "MemAvailable") {
continue
}
// expect form:
// MemAvailable: 1234567890 kB
tri := strings.Fields(b)
if len(tri) != 3 {
return 0, fmt.Errorf("MemAvailable line has unexpected format: %v", b)
}
c, err := strconv.ParseUint(tri[1], 10, 64)
if err != nil {
return 0, fmt.Errorf("Could not parse MemAvailable: %v", b)
}
switch tri[2] { // convert units to bytes
case "kB":
c *= 1024
case "MB":
c *= 1024 * 1024
default:
return 0, fmt.Errorf("Unexpected units for MemAvailable in /proc/meminfo, need kB or MB, got: %v", tri[2])
}
return c, nil
}
return 0, errCantReadMemInfo
}
func checkProcCPU() (uint64, error) {
f, err := os.Open("/proc/cpuinfo")
if err != nil {
return 0, err
}
defer f.Close()
total := uint64(0)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
b := scanner.Text()
// processor : 0
toks := strings.Fields(b)
if len(toks) == 3 && toks[0] == "processor" && toks[1] == ":" {
total += 1
}
}
if total == 0 {
return 0, errors.New("Could not parse cpuinfo")
}
return total, nil
}