-
Notifications
You must be signed in to change notification settings - Fork 178
/
psutil.go
184 lines (161 loc) · 4.81 KB
/
psutil.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
package internal
import (
"context"
"errors"
"fmt"
"time"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/process"
)
// WaitForProcessToExist queries running processes every `queryInterval` duration, blocking until the provided matcher
// function returns true for at least one running process or the context is canceled. If a query returns multiple
// matching processes, all will be returned.
func WaitForProcessToExist(
ctx context.Context,
queryInterval time.Duration,
matcher func(context.Context, *process.Process) (bool, error),
) ([]*process.Process, error) {
ticker := time.NewTicker(queryInterval)
defer ticker.Stop()
for range ticker.C {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
processes, err := process.ProcessesWithContext(ctx)
if err != nil {
return nil, err
}
var matches []*process.Process
for _, p := range processes {
isMatch, err := matcher(ctx, p)
if err != nil {
return nil, err
}
if isMatch {
matches = append(matches, p)
}
}
if len(matches) > 0 {
return matches, nil
}
}
}
panic("unreachable code in WaitForProcesstoExist")
}
// WaitForPidToExit queries running processes every `queryInterval` duration, blocking until the provided pid is found
// to not exist or the context is canceled.
func WaitForPidToExit(ctx context.Context, queryInterval time.Duration, pid int32) error {
ticker := time.NewTicker(queryInterval)
defer ticker.Stop()
for range ticker.C {
select {
case <-ctx.Done():
return ctx.Err()
default:
pidExists, err := process.PidExistsWithContext(ctx, pid)
if err != nil {
return err
}
if !pidExists {
return nil
}
}
}
panic("unreachable code in WaitForPidToExit")
}
// AverageCPUDeltas returns the average CPU cycles consumed per sampleInterval, as calculated
// between the time this function is invoked and the time the provided ctx is cancelled.
func AverageCPUDeltas(ctx context.Context, sampleInterval time.Duration) (*cpu.TimesStat, error) {
sampleCount := 0
sampleCh := sampleCPUTimes(ctx, sampleInterval)
firstSample := <-sampleCh
if firstSample == nil {
return nil, errors.New("sample channel closed before first data point")
}
if firstSample.Err != nil {
return nil, firstSample.Err
}
var lastSample *cpuTimesSample
for lastSample = range sampleCh {
sampleCount++
if lastSample.Err != nil {
return nil, lastSample.Err
}
}
if lastSample == nil {
return nil, errors.New("only got one data point, cannot calculate average")
}
avg := func(first float64, last float64) float64 {
return (last - first) / float64(sampleCount)
}
return &cpu.TimesStat{
User: avg(firstSample.User, lastSample.User),
System: avg(firstSample.System, lastSample.System),
Idle: avg(firstSample.Idle, lastSample.Idle),
Nice: avg(firstSample.Nice, lastSample.Nice),
Iowait: avg(firstSample.Iowait, lastSample.Iowait),
Irq: avg(firstSample.Irq, lastSample.Irq),
Softirq: avg(firstSample.Softirq, lastSample.Softirq),
Steal: avg(firstSample.Steal, lastSample.Steal),
Guest: avg(firstSample.Guest, lastSample.Guest),
GuestNice: avg(firstSample.GuestNice, lastSample.GuestNice),
}, nil
}
type cpuTimesSample struct {
*cpu.TimesStat
Index int
Err error
}
func sampleCPUTimes(ctx context.Context, sampleInterval time.Duration) <-chan *cpuTimesSample {
returnCh := make(chan *cpuTimesSample)
go func() {
defer close(returnCh)
var index int
ticker := time.NewTicker(sampleInterval)
defer ticker.Stop()
for range ticker.C {
select {
case <-ctx.Done():
return
default:
index++
}
sample := &cpuTimesSample{
Index: index,
}
const percpu = false // just give us the aggregate numbers, not each cpu's numbers
curTimesList, err := cpu.Times(percpu)
if err != nil {
sample.Err = err
returnCh <- sample
return
}
// we set percpu to false, so there should be only one stat
if len(curTimesList) != 1 {
sample.Err = fmt.Errorf("unexpected number of cpu times in sample %d", len(curTimesList))
returnCh <- sample
return
}
sample.TimesStat = &curTimesList[0]
select {
case returnCh <- sample:
default:
// just skip the sample if there's no room for it
}
}
}()
return returnCh
}