/
genericpilot.go
123 lines (105 loc) · 3.08 KB
/
genericpilot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package genericpilot
import (
"sync"
"time"
"github.com/golang/glog"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
clientset "github.com/jetstack/navigator/pkg/client/clientset/versioned"
listersv1alpha1 "github.com/jetstack/navigator/pkg/client/listers/navigator/v1alpha1"
"github.com/jetstack/navigator/pkg/pilot/genericpilot/controller"
"github.com/jetstack/navigator/pkg/pilot/genericpilot/leaderelection"
"github.com/jetstack/navigator/pkg/pilot/genericpilot/processmanager"
)
type GenericPilot struct {
Options Options
// TODO: remove use of the kubernetes clientset. Absorb required
// functionality into the navigator api group
kubeClientset kubernetes.Interface
client clientset.Interface
pilotLister listersv1alpha1.PilotLister
recorder record.EventRecorder
controller *controller.Controller
// process is a reference to a process manager for the application this
// Pilot manages
process processmanager.Interface
// shutdown is true when the process has been told to gracefully exit
shutdown bool
// lock is used internally to coordinate updates to fields on the
// GenericPilot structure
lock sync.Mutex
elector leaderelection.Interface
}
func (g *GenericPilot) Run() error {
glog.Infof("Starting generic pilot controller")
// setup healthz handlers
g.serveHealthz()
ctrlStopCh := make(chan struct{})
defer close(ctrlStopCh)
var err error
// block until told to shutdown
select {
case <-g.Options.StopCh:
glog.Infof("Shutdown signal received")
case <-g.waitForProcess():
if err = g.process.Error(); err != nil {
glog.Errorf("Underlying process failed with error: %s", err)
} else {
glog.Errorf("Underlying process unexpectedly exited")
}
case err = <-g.runController(ctrlStopCh):
if err != nil {
glog.Errorf("Control loop failed with error: %s", err)
} else {
glog.Errorf("Control loop unexpectedly exited")
}
case err = <-g.runElector(ctrlStopCh):
if err != nil {
glog.Errorf("Leader elector failed with error: %s", err)
} else {
glog.Errorf("Leader elector unexpectedly exited")
}
}
thisPilot, err := g.controller.ThisPilot()
if err != nil {
return err
}
return g.stop(thisPilot)
}
// waitForProcess will return a chan that will be closed once the underlying
// subprocess exits. This function exists to 'mask' the fact the process may
// not ever exist/be started (as starting the process relies on the Pilot
// resource existing in the API).
func (g *GenericPilot) waitForProcess() <-chan struct{} {
out := make(chan struct{})
go func() {
defer close(out)
for {
if g.process != nil {
break
}
time.Sleep(2)
}
<-g.process.Wait()
}()
return out
}
func (g *GenericPilot) runController(stopCh <-chan struct{}) <-chan error {
out := make(chan error, 1)
go func() {
defer close(out)
out <- g.controller.Run(stopCh)
}()
return out
}
func (g *GenericPilot) runElector(stopCh <-chan struct{}) <-chan error {
out := make(chan error, 1)
go func() {
defer close(out)
out <- g.elector.Run()
}()
return out
}
func (g *GenericPilot) Elector() leaderelection.Interface {
return g.elector
}