/
Lifecycle.java
544 lines (499 loc) · 17.3 KB
/
Lifecycle.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.lifecycle;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.Closeable;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* A manager of object Lifecycles.
*
* This object has methods for registering objects that should be started and stopped. The Lifecycle allows for
* four stages: {@link Stage#INIT}, {@link Stage#NORMAL}, {@link Stage#SERVER}, and {@link Stage#ANNOUNCEMENTS}.
*
* Things added at {@link Stage#INIT} will be started first (in the order that they are added to the Lifecycle instance)
* and then things added at {@link Stage#NORMAL}, then {@link Stage#SERVER}, and finally, {@link Stage#ANNOUNCEMENTS}
* will be started.
*
* The close operation goes in reverse order, starting with the last thing added at {@link Stage#ANNOUNCEMENTS} and
* working backwards.
*
* Conceptually, the stages have the following purposes:
* - {@link Stage#INIT}: Currently, this stage is used exclusively for log4j initialization, since almost everything
* needs logging and it should be the last thing to shutdown. Any sort of bootstrapping object that provides
* something that should be initialized before nearly all other Lifecycle objects could also belong here (if it
* doesn't need logging during start or stop).
* - {@link Stage#NORMAL}: This is the default stage. Most objects will probably make the most sense to be registered
* at this level, with the exception of any form of server or service announcements
* - {@link Stage#SERVER}: This lifecycle stage is intended for all 'server' objects, for example,
* org.apache.druid.server.initialization.jetty.JettyServerModule, but any sort of 'server' that expects most (or
* some specific) Lifecycle objects to be initialized by the time it starts, and still available at the time it stops
* can logically live in this stage.
* - {@link Stage#ANNOUNCEMENTS}: Any object which announces to a cluster this servers location belongs in this stage.
* By being last, we can be sure that all servers are initialized before we advertise the endpoint locations, and
* also can be sure that we un-announce these advertisements prior to the Stage.SERVER objects stop.
*
* There are two sets of methods to add things to the Lifecycle. One set that will just add instances and enforce that
* start() has not been called yet. The other set will add instances and, if the lifecycle is already started, start
* them.
*/
public class Lifecycle
{
private static final Logger log = new Logger(Lifecycle.class);
public enum Stage
{
INIT,
NORMAL,
SERVER,
ANNOUNCEMENTS
}
private enum State
{
/** Lifecycle's state before {@link #start()} is called. */
NOT_STARTED,
/** Lifecycle's state since {@link #start()} and before {@link #stop()} is called. */
RUNNING,
/** Lifecycle's state since {@link #stop()} is called. */
STOP
}
private final NavigableMap<Stage, CopyOnWriteArrayList<Handler>> handlers;
/** This lock is used to linearize all calls to Handler.start() and Handler.stop() on the managed handlers. */
private final Lock startStopLock = new ReentrantLock();
private final AtomicReference<State> state = new AtomicReference<>(State.NOT_STARTED);
private Stage currStage = null;
private final AtomicBoolean shutdownHookRegistered = new AtomicBoolean(false);
private final String name;
public Lifecycle()
{
this("anonymous");
}
public Lifecycle(String name)
{
Preconditions.checkArgument(StringUtils.isNotEmpty(name), "Lifecycle name must not be null or empty");
this.name = name;
handlers = new TreeMap<>();
for (Stage stage : Stage.values()) {
handlers.put(stage, new CopyOnWriteArrayList<>());
}
}
/**
* Adds a "managed" instance (annotated with {@link LifecycleStart} and {@link LifecycleStop}) to the Lifecycle at
* Stage.NORMAL. If the lifecycle has already been started, it throws an {@link ISE}
*
* @param o The object to add to the lifecycle
*
* @throws ISE {@link Lifecycle#addHandler(Handler, Stage)}
*/
public <T> T addManagedInstance(T o)
{
addHandler(new AnnotationBasedHandler(o));
return o;
}
/**
* Adds a "managed" instance (annotated with {@link LifecycleStart} and {@link LifecycleStop}) to the Lifecycle.
* If the lifecycle has already been started, it throws an {@link ISE}
*
* @param o The object to add to the lifecycle
* @param stage The stage to add the lifecycle at
*
* @throws ISE {@link Lifecycle#addHandler(Handler, Stage)}
*/
public <T> T addManagedInstance(T o, Stage stage)
{
addHandler(new AnnotationBasedHandler(o), stage);
return o;
}
/**
* Adds an instance with a start() and/or close() method to the Lifecycle at Stage.NORMAL. If the lifecycle has
* already been started, it throws an {@link ISE}
*
* @param o The object to add to the lifecycle
*
* @throws ISE {@link Lifecycle#addHandler(Handler, Stage)}
*/
public <T> T addStartCloseInstance(T o)
{
addHandler(new StartCloseHandler(o));
return o;
}
/**
* Adds an instance with a start() and/or close() method to the Lifecycle. If the lifecycle has already been started,
* it throws an {@link ISE}
*
* @param o The object to add to the lifecycle
* @param stage The stage to add the lifecycle at
*
* @throws ISE {@link Lifecycle#addHandler(Handler, Stage)}
*/
public <T> T addStartCloseInstance(T o, Stage stage)
{
addHandler(new StartCloseHandler(o), stage);
return o;
}
/**
* Adds a handler to the Lifecycle at the Stage.NORMAL stage. If the lifecycle has already been started, it throws
* an {@link ISE}
*
* @param handler The hander to add to the lifecycle
*
* @throws ISE {@link Lifecycle#addHandler(Handler, Stage)}
*/
public void addHandler(Handler handler)
{
addHandler(handler, Stage.NORMAL);
}
/**
* Adds a handler to the Lifecycle. If the lifecycle has already been started, it throws an {@link ISE}
*
* @param handler The hander to add to the lifecycle
* @param stage The stage to add the lifecycle at
*
* @throws ISE indicates that the lifecycle has already been started and thus cannot be added to
*/
public void addHandler(Handler handler, Stage stage)
{
if (!startStopLock.tryLock()) {
throw new ISE("Cannot add a handler in the process of Lifecycle starting or stopping");
}
try {
if (!state.get().equals(State.NOT_STARTED)) {
throw new ISE("Cannot add a handler after the Lifecycle has started, it doesn't work that way.");
}
handlers.get(stage).add(handler);
}
finally {
startStopLock.unlock();
}
}
/**
* Adds a "managed" instance (annotated with {@link LifecycleStart} and {@link LifecycleStop}) to the Lifecycle at
* Stage.NORMAL and starts it if the lifecycle has already been started.
*
* @param o The object to add to the lifecycle
*
* @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)}
*/
public <T> T addMaybeStartManagedInstance(T o) throws Exception
{
addMaybeStartHandler(new AnnotationBasedHandler(o));
return o;
}
/**
* Adds a "managed" instance (annotated with {@link LifecycleStart} and {@link LifecycleStop}) to the Lifecycle
* and starts it if the lifecycle has already been started.
*
* @param o The object to add to the lifecycle
* @param stage The stage to add the lifecycle at
*
* @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)}
*/
public <T> T addMaybeStartManagedInstance(T o, Stage stage) throws Exception
{
addMaybeStartHandler(new AnnotationBasedHandler(o), stage);
return o;
}
/**
* Adds an instance with a start() and/or close() method to the Lifecycle at Stage.NORMAL and starts it if the
* lifecycle has already been started.
*
* @param o The object to add to the lifecycle
*
* @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)}
*/
public <T> T addMaybeStartStartCloseInstance(T o) throws Exception
{
addMaybeStartHandler(new StartCloseHandler(o));
return o;
}
/**
* Adds an instance with a start() and/or close() method to the Lifecycle and starts it if the lifecycle has
* already been started.
*
* @param o The object to add to the lifecycle
* @param stage The stage to add the lifecycle at
*
* @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)}
*/
public <T> T addMaybeStartStartCloseInstance(T o, Stage stage) throws Exception
{
addMaybeStartHandler(new StartCloseHandler(o), stage);
return o;
}
/**
* Adds a Closeable instance to the lifecycle at {@link Stage#NORMAL} stage, doesn't try to call any "start" method on
* it, use {@link #addStartCloseInstance(Object)} instead if you need the latter behaviour.
*/
public <T extends Closeable> T addCloseableInstance(T o)
{
addHandler(new CloseableHandler(o));
return o;
}
/**
* Adds a handler to the Lifecycle at the Stage.NORMAL stage and starts it if the lifecycle has already been started.
*
* @param handler The hander to add to the lifecycle
*
* @throws Exception {@link Lifecycle#addMaybeStartHandler(Handler, Stage)}
*/
public void addMaybeStartHandler(Handler handler) throws Exception
{
addMaybeStartHandler(handler, Stage.NORMAL);
}
/**
* Adds a handler to the Lifecycle and starts it if the lifecycle has already been started.
*
* @param handler The hander to add to the lifecycle
* @param stage The stage to add the lifecycle at
*
* @throws Exception an exception thrown from handler.start(). If an exception is thrown, the handler is *not* added
*/
public void addMaybeStartHandler(Handler handler, Stage stage) throws Exception
{
if (!startStopLock.tryLock()) {
// (*) This check is why the state should be changed before startStopLock.lock() in stop(). This check allows to
// spot wrong use of Lifecycle instead of entering deadlock, like https://github.com/apache/incubator-druid/issues/3579.
if (state.get().equals(State.STOP)) {
throw new ISE("Cannot add a handler in the process of Lifecycle stopping");
}
startStopLock.lock();
}
try {
if (state.get().equals(State.STOP)) {
throw new ISE("Cannot add a handler after the Lifecycle has stopped");
}
if (state.get().equals(State.RUNNING)) {
if (stage.compareTo(currStage) <= 0) {
handler.start();
}
}
handlers.get(stage).add(handler);
}
finally {
startStopLock.unlock();
}
}
public void start() throws Exception
{
startStopLock.lock();
try {
if (!state.get().equals(State.NOT_STARTED)) {
throw new ISE("Already started");
}
if (!state.compareAndSet(State.NOT_STARTED, State.RUNNING)) {
throw new ISE("stop() is called concurrently with start()");
}
for (Map.Entry<Stage, ? extends List<Handler>> e : handlers.entrySet()) {
currStage = e.getKey();
log.info("Starting lifecycle [%s] stage [%s]", name, currStage.name());
for (Handler handler : e.getValue()) {
handler.start();
}
}
log.info("Successfully started lifecycle [%s]", name);
}
finally {
startStopLock.unlock();
}
}
public void stop()
{
// This CAS outside of a block guarded by startStopLock is the only reason why state is AtomicReference rather than
// a simple variable. State change before startStopLock.lock() is needed for the new state visibility during the
// check in addMaybeStartHandler() marked by (*).
if (!state.compareAndSet(State.RUNNING, State.STOP)) {
log.info("Lifecycle [%s] already stopped and stop was called. Silently skipping", name);
return;
}
startStopLock.lock();
try {
Exception thrown = null;
for (Stage s : handlers.navigableKeySet().descendingSet()) {
log.info("Stopping lifecycle [%s] stage [%s]", name, s.name());
for (Handler handler : Lists.reverse(handlers.get(s))) {
try {
handler.stop();
}
catch (Exception e) {
log.warn(e, "Lifecycle [%s] encountered exception while stopping %s", name, handler);
if (thrown == null) {
thrown = e;
} else {
thrown.addSuppressed(e);
}
}
}
}
if (thrown != null) {
throw new RuntimeException(thrown);
}
}
finally {
startStopLock.unlock();
}
}
public void ensureShutdownHook()
{
if (shutdownHookRegistered.compareAndSet(false, true)) {
Runtime.getRuntime().addShutdownHook(
new Thread(
new Runnable()
{
@Override
public void run()
{
log.info("Lifecycle [%s] running shutdown hook", name);
stop();
}
}
)
);
}
}
public void join() throws InterruptedException
{
ensureShutdownHook();
Thread.currentThread().join();
}
public interface Handler
{
void start() throws Exception;
void stop();
}
private static class AnnotationBasedHandler implements Handler
{
private static final Logger log = new Logger(AnnotationBasedHandler.class);
private final Object o;
public AnnotationBasedHandler(Object o)
{
this.o = o;
}
@Override
public void start() throws Exception
{
for (Method method : o.getClass().getMethods()) {
boolean doStart = false;
for (Annotation annotation : method.getAnnotations()) {
if (LifecycleStart.class.getName().equals(annotation.annotationType().getName())) {
doStart = true;
break;
}
}
if (doStart) {
log.debug("Invoking start method[%s] on object[%s].", method, o);
method.invoke(o);
}
}
}
@Override
public void stop()
{
for (Method method : o.getClass().getMethods()) {
boolean doStop = false;
for (Annotation annotation : method.getAnnotations()) {
if (LifecycleStop.class.getName().equals(annotation.annotationType().getName())) {
doStop = true;
break;
}
}
if (doStop) {
log.debug("Invoking stop method[%s] on object[%s].", method, o);
try {
method.invoke(o);
}
catch (Exception e) {
log.error(e, "Exception when stopping method[%s] on object[%s]", method, o);
}
}
}
}
}
private static class StartCloseHandler implements Handler
{
private static final Logger log = new Logger(StartCloseHandler.class);
private final Object o;
private final Method startMethod;
private final Method stopMethod;
public StartCloseHandler(Object o)
{
this.o = o;
try {
startMethod = o.getClass().getMethod("start");
stopMethod = o.getClass().getMethod("close");
}
catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
@Override
public void start() throws Exception
{
log.info("Starting object[%s]", o);
startMethod.invoke(o);
}
@Override
public void stop()
{
log.info("Stopping object[%s]", o);
try {
stopMethod.invoke(o);
}
catch (Exception e) {
log.error(e, "Unable to invoke stopMethod() on %s", o.getClass());
}
}
}
private static class CloseableHandler implements Handler
{
private static final Logger log = new Logger(CloseableHandler.class);
private final Closeable o;
private CloseableHandler(Closeable o)
{
this.o = o;
}
@Override
public void start()
{
// do nothing
}
@Override
public void stop()
{
log.info("Closing object[%s]", o);
try {
o.close();
}
catch (Exception e) {
log.error(e, "Exception when closing object [%s]", o);
}
}
}
}