-
Notifications
You must be signed in to change notification settings - Fork 13
/
service.go
237 lines (201 loc) · 6.91 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
// Copyright 2019, Keychain Foundation Ltd.
// This file is part of the dipperin-core library.
//
// The dipperin-core library is free software: you can redistribute
// it and/or modify it under the terms of the GNU Lesser General Public License
// as published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// The dipperin-core library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package util
import (
"errors"
"github.com/dipperin/dipperin-core/third-party/log"
"sync/atomic"
"fmt"
)
var (
// ErrAlreadyStarted is returned when somebody tries to start an already
// running service.
ErrAlreadyStarted = errors.New("already started")
// ErrAlreadyStopped is returned when somebody tries to stop an already
// stopped service (without resetting it).
ErrAlreadyStopped = errors.New("already stopped")
// ErrNotStarted is returned when somebody tries to stop a not running
// service.
ErrNotStarted = errors.New("not started")
)
// Service defines a service that can be started, stopped, and reset.
type Service interface {
// Start the service.
// If it's already started or stopped, will return an error.
// If OnStart() returns an error, it's returned by Start()
Start() error
OnStart() error
// Stop the service.
// If it's already stopped, will return an error.
// OnStop must never error.
Stop()
OnStop()
// Reset the service.
// Panics by default - must be overwritten to enable reset.
Reset() error
OnReset() error
// Return true if the service is running
IsRunning() bool
// Quit returns a channel, which is closed once service is stopped.
Quit() <-chan struct{}
// String representation of the service
String() string
// SetLogger sets a logger.
SetLogger(logger log.Logger)
}
/*
Classical-inheritance-style service declarations. Services can be started, then
stopped, then optionally restarted.
Users can override the OnStart/OnStop methods. In the absence of errors, these
methods are guaranteed to be called at most once. If OnStart returns an error,
service won't be marked as started, so the user can call Start again.
A call to Reset will panic, unless OnReset is overwritten, allowing
OnStart/OnStop to be called again.
The caller must ensure that Start and Stop are not called concurrently.
It is ok to call Stop without calling Start first.
Typical usage:
type FooService struct {
BaseService
// private fields
}
func NewFooService() *FooService {
fs := &FooService{
// init
}
fs.BaseService = *NewBaseService(log, "FooService", fs)
return fs
}
func (fs *FooService) OnStart() error {
fs.BaseService.OnStart() // Always call the overridden method.
// initialize private fields
// start subroutines, etc.
}
func (fs *FooService) OnStop() error {
fs.BaseService.OnStop() // Always call the overridden method.
// close/destroy private fields
// stop subroutines, etc.
}
*/
type BaseService struct {
Logger log.Logger
name string
started uint32 // atomic
stopped uint32 // atomic
quit chan struct{}
// The "subclass" of BaseService
impl Service
}
// NewBaseService creates a new BaseService.
func NewBaseService(logger log.Logger, name string, impl Service) *BaseService {
if logger == nil {
logger = log.New()
}
return &BaseService{
Logger: logger,
name: name,
quit: make(chan struct{}),
impl: impl,
}
}
// SetLogger implements Service by setting a logger.
func (bs *BaseService) SetLogger(l log.Logger) {
bs.Logger = l
}
// Start implements Service by calling OnStart (if defined). An error will be
// returned if the service is already running or stopped. Not to start the
// stopped service, you need to call Reset.
func (bs *BaseService) Start() error {
if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
if atomic.LoadUint32(&bs.stopped) == 1 {
bs.Logger.Error(fmt.Sprintf("Not starting %v -- already stopped", bs.name), "impl", bs.impl)
// revert flag
atomic.StoreUint32(&bs.started, 0)
return ErrAlreadyStopped
}
bs.Logger.Info(fmt.Sprintf("Starting %v", bs.name), "impl", bs.impl)
err := bs.impl.OnStart()
if err != nil {
// revert flag
atomic.StoreUint32(&bs.started, 0)
return err
}
return nil
}
bs.Logger.Info(fmt.Sprintf("Not starting %v -- already started", bs.name), "impl", bs.impl)
return ErrAlreadyStarted
}
// OnStart implements Service by doing nothing.
// NOTE: Do not put anything in here,
// that way users don't need to call BaseService.OnStart()
func (bs *BaseService) OnStart() error { return nil }
// Stop implements Service by calling OnStop (if defined) and closing quit
// channel. An error will be returned if the service is already stopped.
func (bs *BaseService) Stop() {
if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) {
if atomic.LoadUint32(&bs.started) == 0 {
bs.Logger.Debug(fmt.Sprintf("Not stopping %v -- have not been started yet", bs.name), "impl", bs.impl, "err", ErrNotStarted)
// revert flag
atomic.StoreUint32(&bs.stopped, 0)
// ErrNotStarted
return
}
bs.Logger.Info(fmt.Sprintf("Stopping %v", bs.name), "impl", bs.impl)
bs.impl.OnStop()
close(bs.quit)
// nil
return
}
bs.Logger.Debug(fmt.Sprintf("Stopping %v (ignoring: already stopped)", bs.name), "impl", bs.impl)
// ErrAlreadyStopped
return
}
// OnStop implements Service by doing nothing.
// NOTE: Do not put anything in here,
// that way users don't need to call BaseService.OnStop()
func (bs *BaseService) OnStop() {}
// Reset implements Service by calling OnReset callback (if defined). An error
// will be returned if the service is running.
func (bs *BaseService) Reset() error {
if !atomic.CompareAndSwapUint32(&bs.stopped, 1, 0) {
bs.Logger.Debug(fmt.Sprintf("Can't reset %v. Not stopped", bs.name), "impl", bs.impl)
return fmt.Errorf("can't reset running %s", bs.name)
}
// whether or not we've started, we can reset
atomic.CompareAndSwapUint32(&bs.started, 1, 0)
bs.quit = make(chan struct{})
return bs.impl.OnReset()
}
// OnReset implements Service by panicking.
func (bs *BaseService) OnReset() error {
panic("The service cannot be reset")
}
// IsRunning implements Service by returning true or false depending on the
// service's state.
func (bs *BaseService) IsRunning() bool {
return atomic.LoadUint32(&bs.started) == 1 && atomic.LoadUint32(&bs.stopped) == 0
}
// Wait blocks until the service is stopped.
func (bs *BaseService) Wait() {
<-bs.quit
}
// String implements Servce by returning a string representation of the service.
func (bs *BaseService) String() string {
return bs.name
}
// Quit Implements Service by returning a quit channel.
func (bs *BaseService) Quit() <-chan struct{} {
return bs.quit
}