-
Notifications
You must be signed in to change notification settings - Fork 1
/
runmanager.go
127 lines (107 loc) · 2.45 KB
/
runmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package runmanager
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
)
// Runner incorporates a Run and a Shutdown method to control a service.
type Runner struct {
Run func() error
Shutdown func() error
}
// RunManager can Run services while managing startup and Shutdown.
type RunManager struct {
Context context.Context
WaitingGroupShutdown *sync.WaitGroup
ErrChan chan<- error
Cancel func()
}
// Run runs the Runner.
func (m *RunManager) Run(r *Runner) {
const oneRun = 1
m.WaitingGroupShutdown.Add(oneRun)
go func() {
if err := r.Run(); err != nil {
m.ErrChan <- err
}
}()
go func() {
<-m.Context.Done()
if err := r.Shutdown(); err != nil {
m.ErrChan <- err
}
m.WaitingGroupShutdown.Done()
}()
}
// RunInterruptHandler runs an interrupt handler ontop of the RunManager.
func (m *RunManager) RunInterruptHandler() {
RunInterruptHandler(m)
}
// RunInterruptHandler runs an interrupt handler ontop of the RunManager.
func RunInterruptHandler(m *RunManager) {
// Signal Runner.
c := make(chan os.Signal, 1)
m.Run(&Runner{
Run: func() error {
signal.Notify(c, syscall.SIGTERM, os.Interrupt)
defer signal.Stop(c)
for sig := range c {
switch sig {
case syscall.SIGTERM, os.Interrupt:
fmt.Println("Canceling due to interrupt")
m.Cancel()
default:
log.Println("Unknown signal:", sig)
}
}
return nil
},
Shutdown: func() error {
signal.Stop(c)
return nil
},
})
}
// Service is a runnable service.
type Service func(m *RunManager)
// RunServices starts up all given services.
func RunServices(services []Service) {
ctx, cancel := context.WithCancel(context.Background())
const defaultErrorChannelSize = 100
errChan := make(chan error, defaultErrorChannelSize)
// Shutdown waiting group.
var wgShutdown sync.WaitGroup
manager := &RunManager{
Context: ctx,
WaitingGroupShutdown: &wgShutdown,
ErrChan: errChan,
Cancel: cancel,
}
for _, service := range services {
service(manager)
}
select {
case err := <-errChan:
fmt.Println("Shutdown-cause=error:", err)
cancel()
case <-ctx.Done():
// Shutting down without error as the reason.
// Can happen due to interrupt.
break
}
// wait for Shutdown to finish.
wgShutdown.Wait()
printExistingErrors:
for {
select {
case err := <-errChan:
fmt.Println("error:", err)
default:
break printExistingErrors
}
}
}