-
Notifications
You must be signed in to change notification settings - Fork 112
/
grace.go
413 lines (368 loc) · 11.1 KB
/
grace.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
package grace
import (
"fmt"
"net"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
"time"
"github.com/pkg/errors"
"github.com/rs/zerolog"
)
// Watcher watches a process for a graceful restart
// preserving open network sockets to avoid packet loss.
type Watcher struct {
log zerolog.Logger
graceful bool
ppid int
lns map[string]net.Listener
ss map[string]Server
pidFile string
childPIDs []int
gracefulShutdownTimeout int
}
// Option represent an option.
type Option func(w *Watcher)
// WithLogger adds a logger to the Watcher.
func WithLogger(l zerolog.Logger) Option {
return func(w *Watcher) {
w.log = l
}
}
// WithPIDFile specifies the pid file to use.
func WithPIDFile(fn string) Option {
return func(w *Watcher) {
w.pidFile = fn
}
}
func WithGracefuleShutdownTimeout(seconds int) Option {
return func(w *Watcher) {
w.gracefulShutdownTimeout = seconds
}
}
// NewWatcher creates a Watcher.
func NewWatcher(opts ...Option) *Watcher {
w := &Watcher{
log: zerolog.Nop(),
graceful: os.Getenv("GRACEFUL") == "true",
ppid: os.Getppid(),
ss: map[string]Server{},
}
for _, opt := range opts {
opt(w)
}
return w
}
// Exit exits the current process cleaning up
// existing pid files.
func (w *Watcher) Exit(errc int) {
err := w.clean()
if err != nil {
w.log.Warn().Err(err).Msg("error removing pid file")
} else {
w.log.Info().Msgf("pid file %q got removed", w.pidFile)
}
os.Exit(errc)
}
func (w *Watcher) clean() error {
// only remove PID file if the PID has been written by us
filePID, err := w.readPID()
if err != nil {
return err
}
if filePID != os.Getpid() {
// the pidfile may have been changed by a forked child
// TODO(labkode): is there a way to list children pids for current process?
return fmt.Errorf("pid:%d in pidfile is different from pid:%d, it can be a leftover from a hard shutdown or that a reload was triggered", filePID, os.Getpid())
}
return os.Remove(w.pidFile)
}
func (w *Watcher) readPID() (int, error) {
piddata, err := os.ReadFile(w.pidFile)
if err != nil {
return 0, err
}
// Convert the file contents to an integer.
pid, err := strconv.Atoi(string(piddata))
if err != nil {
return 0, err
}
return pid, nil
}
// GetProcessFromFile reads the pidfile and returns the running process or error if the process or file
// are not available.
func GetProcessFromFile(pfile string) (*os.Process, error) {
data, err := os.ReadFile(pfile)
if err != nil {
return nil, err
}
pid, err := strconv.Atoi(string(data))
if err != nil {
return nil, err
}
process, err := os.FindProcess(pid)
if err != nil {
return nil, err
}
return process, nil
}
// WritePID writes the pid to the configured pid file.
func (w *Watcher) WritePID() error {
// Read in the pid file as a slice of bytes.
if piddata, err := os.ReadFile(w.pidFile); err == nil {
// Convert the file contents to an integer.
if pid, err := strconv.Atoi(string(piddata)); err == nil {
// Look for the pid in the process list.
if process, err := os.FindProcess(pid); err == nil {
// Send the process a signal zero kill.
if err := process.Signal(syscall.Signal(0)); err == nil {
if !w.graceful {
return fmt.Errorf("pid already running: %d", pid)
}
if pid != w.ppid { // overwrite only if parent pid is pidfile
// We only get an error if the pid isn't running, or it's not ours.
return fmt.Errorf("pid %d is not this process parent", pid)
}
} else {
w.log.Warn().Err(err).Msg("error sending zero kill signal to current process")
}
} else {
w.log.Warn().Msgf("pid:%d not found", pid)
}
} else {
w.log.Warn().Msg("error casting contents of pidfile to pid(int)")
}
} // else {
// w.log.Info().Msg("error reading pidfile")
//}
// If we get here, then the pidfile didn't exist or we are are in graceful reload and thus we overwrite
// or the pid in it doesn't belong to the user running this app.
err := os.WriteFile(w.pidFile, []byte(fmt.Sprintf("%d", os.Getpid())), 0664)
if err != nil {
return err
}
w.log.Info().Msgf("pidfile saved at: %s", w.pidFile)
return nil
}
func newListener(network, addr string) (net.Listener, error) {
return net.Listen(network, addr)
}
// GetListeners return grpc listener first and http listener second.
func (w *Watcher) GetListeners(servers map[string]Server) (map[string]net.Listener, error) {
w.ss = servers
lns := map[string]net.Listener{}
if w.graceful {
w.log.Info().Msg("graceful restart, inheriting parent ln fds for grpc and http")
count := 3
for k, s := range servers {
network, addr := s.Network(), s.Address()
fd := os.NewFile(uintptr(count), "") // 3 because ExtraFile passed to new process
count++
ln, err := net.FileListener(fd)
if err != nil {
w.log.Error().Err(err).Msg("error creating net.Listener from fd")
// create new fd
ln, err := newListener(network, addr)
if err != nil {
return nil, err
}
lns[k] = ln
} else {
lns[k] = ln
}
}
// kill parent
// TODO(labkode): maybe race condition here?
// What do we do if we cannot kill the parent but we have valid fds?
// Do we abort running the forked child? Probably yes, as if the parent cannot be
// killed that means we run two version of the code indefinitely.
w.log.Info().Msgf("killing parent pid gracefully with SIGQUIT: %d", w.ppid)
p, err := os.FindProcess(w.ppid)
if err != nil {
w.log.Error().Err(err).Msgf("error finding parent process with ppid:%d", w.ppid)
err = errors.Wrap(err, "error finding parent process")
return nil, err
}
err = p.Kill()
if err != nil {
w.log.Error().Err(err).Msgf("error killing parent process with ppid:%d", w.ppid)
err = errors.Wrap(err, "error killing parent process")
return nil, err
}
w.lns = lns
return lns, nil
}
// create two listeners for grpc and http
for k, s := range servers {
network, addr := s.Network(), s.Address()
ln, err := newListener(network, addr)
if err != nil {
return nil, err
}
lns[k] = ln
}
w.lns = lns
return lns, nil
}
// Server is the interface that servers like HTTP or gRPC
// servers need to implement.
type Server interface {
Stop() error
GracefulStop() error
Network() string
Address() string
}
// TrapSignals captures the OS signal.
func (w *Watcher) TrapSignals() {
signalCh := make(chan os.Signal, 1024)
signal.Notify(signalCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT)
for {
s := <-signalCh
w.log.Info().Msgf("%v signal received", s)
switch s {
case syscall.SIGHUP:
w.log.Info().Msg("preparing for a hot-reload, forking child process...")
// Fork a child process.
listeners := w.lns
p, err := forkChild(listeners)
if err != nil {
w.log.Error().Err(err).Msgf("unable to fork child process")
} else {
w.log.Info().Msgf("child forked with new pid %d", p.Pid)
w.childPIDs = append(w.childPIDs, p.Pid)
}
case syscall.SIGQUIT:
gracefulShutdown(w)
case syscall.SIGINT, syscall.SIGTERM:
if w.gracefulShutdownTimeout == 0 {
hardShutdown(w)
}
gracefulShutdown(w)
}
}
}
// TODO: Ideally this would call exit() but properly return an error. The
// exit() is problematic (i.e. racey) especiaily when orchestrating multiple
// reva services from some external runtime (like in the "ocis server" case
func gracefulShutdown(w *Watcher) {
w.log.Info().Int("Timeout", w.gracefulShutdownTimeout).Msg("preparing for a graceful shutdown with deadline")
go func() {
count := w.gracefulShutdownTimeout
ticker := time.NewTicker(time.Second)
for ; true; <-ticker.C {
w.log.Info().Msgf("shutting down in %d seconds", count-1)
count--
if count <= 0 {
w.log.Info().Msg("deadline reached before draining active conns, hard stopping ...")
for _, s := range w.ss {
err := s.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
}
w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address())
}
w.Exit(1)
}
}
}()
for _, s := range w.ss {
w.log.Info().Msgf("fd to %s:%s gracefully closed ", s.Network(), s.Address())
err := s.GracefulStop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
w.log.Info().Msg("exit with error code 1")
w.Exit(1)
}
}
w.log.Info().Msg("exit with error code 0")
w.Exit(0)
}
// TODO: Ideally this would call exit() but properly return an error. The
// exit() is problematic (i.e. racey) especiaily when orchestrating multiple
// reva services from some external runtime (like in the "ocis server" case
func hardShutdown(w *Watcher) {
w.log.Info().Msg("preparing for hard shutdown, aborting all conns")
for _, s := range w.ss {
w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address())
err := s.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
}
}
w.Exit(0)
}
func getListenerFile(ln net.Listener) (*os.File, error) {
switch t := ln.(type) {
case *net.TCPListener:
return t.File()
case *net.UnixListener:
return t.File()
}
return nil, fmt.Errorf("unsupported listener: %T", ln)
}
func forkChild(lns map[string]net.Listener) (*os.Process, error) {
// Get the file descriptor for the listener and marshal the metadata to pass
// to the child in the environment.
fds := map[string]*os.File{}
for k, ln := range lns {
fd, err := getListenerFile(ln)
if err != nil {
return nil, err
}
fds[k] = fd
}
// Pass stdin, stdout, and stderr along with the listener file to the child
files := []*os.File{
os.Stdin,
os.Stdout,
os.Stderr,
}
// Get current environment and add in the listener to it.
environment := append(os.Environ(), "GRACEFUL=true")
var counter = 3
for k, fd := range fds {
k = strings.ToUpper(k)
environment = append(environment, k+"FD="+fmt.Sprintf("%d", counter))
files = append(files, fd)
counter++
}
// Get current process name and directory.
execName, err := os.Executable()
if err != nil {
return nil, err
}
execDir := filepath.Dir(execName)
// Spawn child process.
p, err := os.StartProcess(execName, os.Args, &os.ProcAttr{
Dir: execDir,
Env: environment,
Files: files,
Sys: &syscall.SysProcAttr{},
})
// TODO(labkode): if the process dies (because config changed and is wrong
// we need to return an error
if err != nil {
return nil, err
}
return p, nil
}