This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 53
/
spns_watcher.go
133 lines (120 loc) · 4.65 KB
/
spns_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package containerwatcher
import (
"context"
"os"
"time"
"github.com/lyft/flytestdlib/logger"
"github.com/mitchellh/go-ps"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/sets"
)
const (
k8sPauseContainerPid = 1
k8sAllowedParentPid = 0
)
// given list of processes returns a list of processes such that,
// the pid does not match any of the given filterPids, this is to filter the /pause and current process.
// and the parentPid is the allowedParentPid. The logic for this is because every process in the shared namespace
// always has a parent pid of 0
func FilterProcessList(procs []ps.Process, filterPids sets.Int, allowedParentPid int) ([]ps.Process, error) {
var filteredProcs []ps.Process
for _, p := range procs {
proc := p
if proc.PPid() == allowedParentPid {
if !filterPids.Has(proc.Pid()) {
filteredProcs = append(filteredProcs, proc)
}
}
}
return filteredProcs, nil
}
type SharedNamespaceProcessLister struct {
// PID for the current process
currentProcessPid int
pidsToFilter sets.Int
}
func (s *SharedNamespaceProcessLister) AnyProcessRunning(ctx context.Context) (bool, error) {
procs, err := s.ListRunningProcesses(ctx)
if err != nil {
return false, err
}
return len(procs) > 0, nil
}
// Polls all processes and returns a filtered set. Refer to FilterProcessList for understanding the process of filtering
func (s *SharedNamespaceProcessLister) ListRunningProcesses(ctx context.Context) ([]ps.Process, error) {
procs, err := ps.Processes()
if err != nil {
return nil, errors.Wrap(err, "Failed to list processes")
}
filteredProcs, err := FilterProcessList(procs, s.pidsToFilter, k8sAllowedParentPid)
if err != nil {
return nil, errors.Wrapf(err, "failed to filter processes")
}
return filteredProcs, nil
}
// The best option for this is to use https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/
// This is only available as Beta as of 1.16, so we will launch with this feature only as beta
// But this is the most efficient way to monitor the pod
type sharedProcessNSWatcher struct {
// Rate at which to poll the process list
pollInterval time.Duration
// Number of cycles to wait before finalizing exit of container
cyclesToWait int
s SharedNamespaceProcessLister
}
func (k sharedProcessNSWatcher) wait(ctx context.Context, cyclesToWait int, f func(ctx context.Context, otherProcessRunning bool) bool) error {
t := time.NewTicker(k.pollInterval)
defer t.Stop()
cyclesOfMissingProcesses := 0
for {
select {
case <-ctx.Done():
logger.Infof(ctx, "Context canceled")
return ErrTimeout
case <-t.C:
logger.Infof(ctx, "Checking processes to see if any process were started...")
yes, err := k.s.AnyProcessRunning(ctx)
if err != nil {
return err
}
if f(ctx, yes) {
cyclesOfMissingProcesses++
if cyclesOfMissingProcesses >= cyclesToWait {
logger.Infof(ctx, "Exiting wait loop")
return nil
}
}
logger.Infof(ctx, "process not yet started")
}
}
}
func (k sharedProcessNSWatcher) WaitToStart(ctx context.Context) error {
logger.Infof(ctx, "SNPS Watcher waiting for other processes to start")
defer logger.Infof(ctx, "SNPS Watcher detected process start")
return k.wait(ctx, 1, func(ctx context.Context, otherProcessRunning bool) bool {
return otherProcessRunning
})
}
func (k sharedProcessNSWatcher) WaitToExit(ctx context.Context) error {
logger.Infof(ctx, "SNPS Watcher waiting for other process to exit")
defer logger.Infof(ctx, "SNPS Watcher detected process exit")
return k.wait(ctx, k.cyclesToWait, func(ctx context.Context, otherProcessRunning bool) bool {
return !otherProcessRunning
})
}
// c -> clock.Clock allows for injecting a fake clock. The watcher uses a timer
// pollInterval -> time.Duration, wait for this amount of time between successive process checks
// waitNumIntervalsBeforeFinalize -> Number of successive poll intervals of missing processes for the container, before assuming process is complete. 0/1 indicate the first time a process is detected to be missing, the wait if finalized.
// containerStartupTimeout -> Duration for which to wait for the container to start up. If the container has not started up in this time, exit with error.
func NewSharedProcessNSWatcher(ctx context.Context, pollInterval time.Duration, waitNumIntervalsBeforeFinalize int) (Watcher, error) {
logger.Infof(ctx, "SNPS created with poll interval %s", pollInterval.String())
currentPid := os.Getpid()
return sharedProcessNSWatcher{
pollInterval: pollInterval,
cyclesToWait: waitNumIntervalsBeforeFinalize,
s: SharedNamespaceProcessLister{
currentProcessPid: currentPid,
pidsToFilter: sets.NewInt(currentPid, k8sPauseContainerPid),
},
}, nil
}