-
Notifications
You must be signed in to change notification settings - Fork 2
/
service.go
186 lines (161 loc) · 4.76 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package graceful
import (
"context"
"crypto/tls"
"fmt"
"net/http"
"os"
"sync"
"time"
"github.com/facebookgo/grace/gracenet"
"gopkg.in/errgo.v1"
"github.com/Scalingo/go-utils/logger"
)
type service struct {
httpServer *http.Server
graceful *gracenet.Net
wg *sync.WaitGroup
stopped chan error
// waitDuration is the duration which is waited for all connections to stop
// in order to graceful shutdown the server. If some connections are still up
// after this timer they'll be cut aggressively.
waitDuration time.Duration
// reloadWaitDuration is the duration the old process is waiting for
// connection to close when a graceful restart has been ordered. The new
// process is already woking as expecting.
reloadWaitDuration time.Duration
// pidFile tracks the pid of the last child among the chain of graceful restart
// Required for daemon manager to track the service
pidFile string
}
type Option func(*service)
func NewService(opts ...Option) *service {
s := &service{
graceful: &gracenet.Net{},
wg: &sync.WaitGroup{},
stopped: make(chan error),
waitDuration: time.Minute,
reloadWaitDuration: 30 * time.Minute,
}
for _, opt := range opts {
opt(s)
}
return s
}
func WithWaitDuration(d time.Duration) Option {
return Option(func(s *service) {
s.waitDuration = d
})
}
func WithReloadWaitDuration(d time.Duration) Option {
return Option(func(s *service) {
s.reloadWaitDuration = d
})
}
func WithPIDFile(path string) Option {
return Option(func(s *service) {
s.pidFile = path
})
}
func (s *service) ListenAndServeTLS(ctx context.Context, proto string, addr string, handler http.Handler, tlsConfig *tls.Config) error {
httpServer := &http.Server{
Addr: addr,
Handler: handler,
TLSConfig: tlsConfig,
}
return s.listenAndServe(ctx, proto, addr, httpServer)
}
func (s *service) ListenAndServe(ctx context.Context, proto string, addr string, handler http.Handler) error {
httpServer := &http.Server{
Addr: addr,
Handler: handler,
}
return s.listenAndServe(ctx, proto, addr, httpServer)
}
func (s *service) listenAndServe(ctx context.Context, proto string, addr string, server *http.Server) error {
if s.pidFile != "" {
pid := os.Getpid()
err := os.WriteFile(s.pidFile, []byte(fmt.Sprintf("%d\n", pid)), 0600)
if err != nil {
return errgo.Notef(err, "fail to write PID file")
}
}
ld, err := s.graceful.Listen(proto, addr)
if err != nil {
return errgo.Notef(err, "fail to get listener")
}
if server.TLSConfig != nil {
ld = tls.NewListener(ld, server.TLSConfig)
}
s.httpServer = server
go s.setupSignals(ctx)
err = s.httpServer.Serve(ld)
if err == http.ErrServerClosed {
return s.waitStopped()
}
if err != nil {
return errgo.Notef(err, "fail to serve http service")
}
// Normally the server should be always gracefully stopped and entering the
// above condition when server is closed If by any mean the serve stops
// without error, we're stopping the server ourself here. This code is a
// security to free resource but should be unreachable
ctx, cancel := context.WithTimeout(ctx, s.waitDuration)
defer cancel()
err = s.shutdown(ctx)
if err != nil {
return errgo.Notef(err, "fail to shutdown server")
}
return s.waitStopped()
}
// IncConnCount has to be used when connections are hijacked because in
// this case http.Server doesn't track these connection anymore, but you
// may not want to cut them abrutely.
func (s *service) IncConnCount(ctx context.Context) {
log := logger.Get(ctx)
log.Debug("inc conn count")
s.wg.Add(1)
}
// DecConnCount is the same as IncConnCount, but you need to call it when
// the hijacked connection is stopped
func (s *service) DecConnCount(ctx context.Context) {
log := logger.Get(ctx)
log.Debug("dec conn count")
s.wg.Done()
}
// shutdown stops the HTTP listener and then wait for any active hijacked
// connection to stop http.Server#Shutdown is graceful but the documentation
// specifies hijacked connections and websockets have to be handled by the
// developer.
func (s *service) shutdown(ctx context.Context) error {
log := logger.Get(ctx)
log.Info("shutting down http server")
err := s.httpServer.Shutdown(ctx)
if err != nil {
return errgo.Notef(err, "fail to shutdown http server")
}
log.Info("http server is stopped")
log.Info("wait hijacked connections")
err = s.waitHijackedConnections(ctx)
if err != nil {
return errgo.Notef(err, "fail to wait hijacked connections")
}
log.Info("no more connection running")
return nil
}
func (s *service) waitStopped() error {
return <-s.stopped
}
func (s *service) waitHijackedConnections(ctx context.Context) error {
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
return nil
case <-ctx.Done():
return ctx.Err()
}
}