forked from hashicorp/nomad
/
stats.go
182 lines (156 loc) · 4.78 KB
/
stats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package docker
import (
"context"
"fmt"
"io"
"sync"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/client/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/docker/util"
nstructs "github.com/hashicorp/nomad/nomad/structs"
)
const (
// statsCollectorBackoffBaseline is the baseline time for exponential
// backoff while calling the docker stats api.
statsCollectorBackoffBaseline = 5 * time.Second
// statsCollectorBackoffLimit is the limit of the exponential backoff for
// calling the docker stats api.
statsCollectorBackoffLimit = 2 * time.Minute
)
// usageSender wraps a TaskResourceUsage chan such that it supports concurrent
// sending and closing, and backpressures by dropping events if necessary.
type usageSender struct {
closed bool
destCh chan<- *structs.TaskResourceUsage
mu sync.Mutex
}
// newStatsChanPipe returns a chan wrapped in a struct that supports concurrent
// sending and closing, and the receiver end of the chan.
func newStatsChanPipe() (*usageSender, <-chan *structs.TaskResourceUsage) {
destCh := make(chan *cstructs.TaskResourceUsage, 1)
return &usageSender{
destCh: destCh,
}, destCh
}
// send resource usage to the receiver unless the chan is already full or
// closed.
func (u *usageSender) send(tru *cstructs.TaskResourceUsage) {
u.mu.Lock()
defer u.mu.Unlock()
if u.closed {
return
}
select {
case u.destCh <- tru:
default:
// Backpressure caused missed interval
}
}
// close resource usage. Any further sends will be dropped.
func (u *usageSender) close() {
u.mu.Lock()
defer u.mu.Unlock()
if u.closed {
// already closed
return
}
u.closed = true
close(u.destCh)
}
// Stats starts collecting stats from the docker daemon and sends them on the
// returned channel.
func (h *taskHandle) Stats(ctx context.Context, interval time.Duration) (<-chan *cstructs.TaskResourceUsage, error) {
select {
case <-h.doneCh:
return nil, nstructs.NewRecoverableError(fmt.Errorf("container stopped"), false)
default:
}
destCh, recvCh := newStatsChanPipe()
go h.collectStats(ctx, destCh, interval)
return recvCh, nil
}
// collectStats starts collecting resource usage stats of a docker container
func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, interval time.Duration) {
defer destCh.close()
// backoff and retry used if the docker stats API returns an error
var backoff time.Duration
var retry int
// loops until doneCh is closed
for {
if backoff > 0 {
select {
case <-time.After(backoff):
case <-ctx.Done():
return
case <-h.doneCh:
return
}
}
// make a channel for docker stats structs and start a collector to
// receive stats from docker and emit nomad stats
// statsCh will always be closed by docker client.
statsCh := make(chan *docker.Stats)
go dockerStatsCollector(destCh, statsCh, interval)
statsOpts := docker.StatsOptions{
ID: h.containerID,
Context: ctx,
Done: h.doneCh,
Stats: statsCh,
Stream: true,
}
// Stats blocks until an error has occurred, or doneCh has been closed
if err := h.client.Stats(statsOpts); err != nil && err != io.ErrClosedPipe {
// An error occurred during stats collection, retry with backoff
h.logger.Debug("error collecting stats from container", "error", err)
// Calculate the new backoff
backoff = (1 << (2 * uint64(retry))) * statsCollectorBackoffBaseline
if backoff > statsCollectorBackoffLimit {
backoff = statsCollectorBackoffLimit
}
// Increment retry counter
retry++
continue
}
// Stats finished either because context was canceled, doneCh was closed
// or the container stopped. Stop stats collections.
return
}
}
func dockerStatsCollector(destCh *usageSender, statsCh <-chan *docker.Stats, interval time.Duration) {
var resourceUsage *cstructs.TaskResourceUsage
// hasSentInitialStats is used so as to emit the first stats received from
// the docker daemon
var hasSentInitialStats bool
// timer is used to send nomad status at the specified interval
timer := time.NewTimer(interval)
for {
select {
case <-timer.C:
// it is possible for the timer to go off before the first stats
// has been emitted from docker
if resourceUsage == nil {
continue
}
// sending to destCh could block, drop this interval if it does
destCh.send(resourceUsage)
timer.Reset(interval)
case s, ok := <-statsCh:
// if statsCh is closed stop collection
if !ok {
return
}
// s should always be set, but check and skip just in case
if s != nil {
resourceUsage = util.DockerStatsToTaskResourceUsage(s)
// send stats next interation if this is the first time received
// from docker
if !hasSentInitialStats {
timer.Reset(0)
hasSentInitialStats = true
}
}
}
}
}