forked from apache/dubbo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
SerializingExecutor.java
124 lines (110 loc) · 4.78 KB
/
SerializingExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/*
* Copyright 2014 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.threadpool.serial;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadlocal.InternalThreadLocalMap;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_ERROR_RUN_THREAD_TASK;
/**
* Executor ensuring that all {@link Runnable} tasks submitted are executed in order
* using the provided {@link Executor}, and serially such that no two will ever be
* running at the same time.
*/
public final class SerializingExecutor implements Executor, Runnable {
private static final ErrorTypeAwareLogger LOGGER = LoggerFactory.getErrorTypeAwareLogger(SerializingExecutor.class);
/**
* Use false to stop and true to run
*/
private final AtomicBoolean atomicBoolean = new AtomicBoolean();
private final Executor executor;
private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>();
/**
* Creates a SerializingExecutor, running tasks using {@code executor}.
*
* @param executor Executor in which tasks should be run. Must not be null.
*/
public SerializingExecutor(Executor executor) {
this.executor = executor;
}
/**
* Runs the given runnable strictly after all Runnables that were submitted
* before it, and using the {@code executor} passed to the constructor. .
*/
@Override
public void execute(Runnable r) {
execute(r, null);
}
public void execute(Runnable r, Runnable cleanIfFailed) {
runQueue.add(r);
schedule(r, cleanIfFailed);
}
private void schedule(Runnable removable, Runnable cleanIfFailed) {
if (atomicBoolean.compareAndSet(false, true)) {
boolean success = false;
try {
executor.execute(this);
success = true;
} finally {
// It is possible that at this point that there are still tasks in
// the queue, it would be nice to keep trying but the error may not
// be recoverable. So we update our state and propagate so that if
// our caller deems it recoverable we won't be stuck.
if (!success) {
if (removable != null) {
// This case can only be reached if 'this' was not currently running, and we failed to
// reschedule. The item should still be in the queue for removal.
// ConcurrentLinkedQueue claims that null elements are not allowed, but seems to not
// throw if the item to remove is null. If removable is present in the queue twice,
// the wrong one may be removed. It doesn't seem possible for this case to exist today.
// This is important to run in case of RejectedExecutionException, so that future calls
// to execute don't succeed and accidentally run a previous runnable.
runQueue.remove(removable);
}
if (cleanIfFailed != null) {
cleanIfFailed.run();
}
atomicBoolean.set(false);
}
}
}
}
@Override
public void run() {
Runnable r;
try {
while ((r = runQueue.poll()) != null) {
InternalThreadLocalMap internalThreadLocalMap = InternalThreadLocalMap.getAndRemove();
try {
r.run();
} catch (RuntimeException e) {
LOGGER.error(COMMON_ERROR_RUN_THREAD_TASK, "", "", "Exception while executing runnable " + r, e);
} finally {
InternalThreadLocalMap.set(internalThreadLocalMap);
}
}
} finally {
atomicBoolean.set(false);
}
if (!runQueue.isEmpty()) {
// we didn't enqueue anything but someone else did.
schedule(null, null);
}
}
}