forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
signaller.go
187 lines (167 loc) · 5.56 KB
/
signaller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package shutdown
import (
"context"
"sync"
"time"
)
const longTermWait = time.Hour * 24
// MaximumShutdownWait is a magic number determining the maximum length of time
// that a component should be willing to wait for a child to finish shutting
// down before it can give up and exit.
//
// This wait time is largely symbolic, if a component blocks for anything more
// than a few minutes then it has failed in its duty to gracefully terminate.
//
// However, it's still necessary for components to provide some measure of time
// that they're willing to wait for with the current mechanism (WaitForClose),
// therefore we provide a very large duration, and since this is a magic number
// I've defined it once and exposed as a function, allowing us to more easily
// identify these cases and refactor them in the future.
func MaximumShutdownWait() time.Duration {
return longTermWait
}
// Signaller is a mechanism owned by components that support graceful
// shut down and is used as a way to signal from outside that any goroutines
// owned by the component should begin to close.
//
// Shutting down can happen in two tiers of urgency, the first is to terminate
// "at leisure", meaning if you're in the middle of something it's okay to do
// that first before terminating, but please do not commit to new work.
//
// The second tier is immediate, where you need to clean up resources and
// terminate as soon as possible, regardless of any tasks that you are currently
// attempting to finish.
//
// Finally, there is also a signal of having closed down, which is made by the
// component and can be used from outside to determine whether the component
// has finished terminating.
type Signaller struct {
closeAtLeisureChan chan struct{}
closeAtLeisureOnce sync.Once
closeNowChan chan struct{}
closeNowOnce sync.Once
hasClosedChan chan struct{}
hasClosedOnce sync.Once
}
// NewSignaller creates a new signaller.
func NewSignaller() *Signaller {
return &Signaller{
closeAtLeisureChan: make(chan struct{}),
closeNowChan: make(chan struct{}),
hasClosedChan: make(chan struct{}),
}
}
// CloseAtLeisure signals to the owner of this Signaller that it should
// terminate at its own leisure, meaning it's okay to complete any tasks that
// are in progress but no new work should be started.
func (s *Signaller) CloseAtLeisure() {
s.closeAtLeisureOnce.Do(func() {
close(s.closeAtLeisureChan)
})
}
// CloseNow signals to the owner of this Signaller that it should terminate
// right now regardless of any in progress tasks.
func (s *Signaller) CloseNow() {
s.CloseAtLeisure()
s.closeNowOnce.Do(func() {
close(s.closeNowChan)
})
}
// ShutdownComplete is a signal made by the component that it and all of its
// owned resources have terminated.
func (s *Signaller) ShutdownComplete() {
s.hasClosedOnce.Do(func() {
close(s.hasClosedChan)
})
}
//------------------------------------------------------------------------------
// ShouldCloseAtLeisure returns true if the signaller has received the signal to
// shut down at leisure or immediately.
func (s *Signaller) ShouldCloseAtLeisure() bool {
select {
case <-s.CloseAtLeisureChan():
return true
default:
}
return false
}
// CloseAtLeisureChan returns a channel that will be closed when the signal to
// shut down either at leisure or immediately has been made.
func (s *Signaller) CloseAtLeisureChan() <-chan struct{} {
return s.closeAtLeisureChan
}
// CloseAtLeisureCtx returns a context.Context that will be terminated when
// either the provided context is cancelled or the signal to shut down
// either at leisure or immediately has been made.
func (s *Signaller) CloseAtLeisureCtx(ctx context.Context) (context.Context, context.CancelFunc) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
go func() {
select {
case <-ctx.Done():
case <-s.closeAtLeisureChan:
}
cancel()
}()
return ctx, cancel
}
// ShouldCloseNow returns true if the signaller has received the signal to shut
// down immediately.
func (s *Signaller) ShouldCloseNow() bool {
select {
case <-s.CloseNowChan():
return true
default:
}
return false
}
// CloseNowChan returns a channel that will be closed when the signal to shut
// down immediately has been made.
func (s *Signaller) CloseNowChan() <-chan struct{} {
return s.closeNowChan
}
// CloseNowCtx returns a context.Context that will be terminated when either the
// provided context is cancelled or the signal to shut down immediately has been
// made.
func (s *Signaller) CloseNowCtx(ctx context.Context) (context.Context, context.CancelFunc) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
go func() {
select {
case <-ctx.Done():
case <-s.closeNowChan:
}
cancel()
}()
return ctx, cancel
}
// HasClosed returns true if the signaller has received the signal that the
// component has terminated.
func (s *Signaller) HasClosed() bool {
select {
case <-s.HasClosedChan():
return true
default:
}
return false
}
// HasClosedChan returns a channel that will be closed when the signal that the
// component has terminated has been made.
func (s *Signaller) HasClosedChan() <-chan struct{} {
return s.hasClosedChan
}
// HasClosedCtx returns a context.Context that will be cancelled when either the
// provided context is cancelled or the signal that the component has shut down
// has been made.
func (s *Signaller) HasClosedCtx(ctx context.Context) (context.Context, context.CancelFunc) {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
go func() {
select {
case <-ctx.Done():
case <-s.hasClosedChan:
}
cancel()
}()
return ctx, cancel
}