/
ConcurrentBag.java
358 lines (325 loc) · 12.1 KB
/
ConcurrentBag.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
/*
* Copyright (C) 2013, 2014 Brett Wooldridge
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.zaxxer.hikari.util;
import static com.zaxxer.hikari.util.IConcurrentBagEntry.STATE_IN_USE;
import static com.zaxxer.hikari.util.IConcurrentBagEntry.STATE_NOT_IN_USE;
import static com.zaxxer.hikari.util.IConcurrentBagEntry.STATE_REMOVED;
import static com.zaxxer.hikari.util.IConcurrentBagEntry.STATE_RESERVED;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is a specialized concurrent bag that achieves superior performance
* to LinkedBlockingQueue and LinkedTransferQueue for the purposes of a
* connection pool. It uses ThreadLocal storage when possible to avoid
* locks, but resorts to scanning a common collection if there are no
* available items in the ThreadLocal list. Not-in-use items in the
* ThreadLocal lists can be "stolen" when the borrowing thread has none
* of its own. It is a "lock-less" implementation using a specialized
* AbstractQueuedLongSynchronizer to manage cross-thread signaling.
*
* Note that items that are "borrowed" from the bag are not actually
* removed from any collection, so garbage collection will not occur
* even if the reference is abandoned. Thus care must be taken to
* "requite" borrowed objects otherwise a memory leak will result. Only
* the "remove" method can completely remove an object from the bag.
*
* @author Brett Wooldridge
*
* @param <T> the templated type to store in the bag
*/
@SuppressWarnings("rawtypes")
public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseable
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);
private final QueuedSequenceSynchronizer synchronizer;
private final CopyOnWriteArrayList<T> sharedList;
private final boolean weakThreadLocals;
private final ThreadLocal<List> threadList;
private final IBagStateListener listener;
private volatile boolean closed;
/**
* Construct a ConcurrentBag with the specified listener.
*
* @param listener the IBagStateListener to attach to this bag
*/
public ConcurrentBag(IBagStateListener listener)
{
this.listener = listener;
this.weakThreadLocals = useWeakThreadLocals();
this.sharedList = new CopyOnWriteArrayList<>();
this.synchronizer = new QueuedSequenceSynchronizer();
if (weakThreadLocals) {
this.threadList = new ThreadLocal<>();
}
else {
this.threadList = new ThreadLocal<List>() {
@Override
protected List initialValue()
{
return new FastList<>(IConcurrentBagEntry.class, 16);
}
};
}
}
/**
* The method will borrow a BagEntry from the bag, blocking for the
* specified timeout if none are available.
*
* @param timeout how long to wait before giving up, in units of unit
* @param timeUnit a <code>TimeUnit</code> determining how to interpret the timeout parameter
* @return a borrowed instance from the bag or null if a timeout occurs
* @throws InterruptedException if interrupted while waiting
*/
@SuppressWarnings("unchecked")
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// Try the thread-local list first, if there are no blocked threads waiting already
if (!synchronizer.hasQueuedThreads()) {
List<?> list = threadList.get();
if (weakThreadLocals && list == null) {
list = new ArrayList<>(16);
threadList.set(list);
}
for (int i = list.size() - 1; i >= 0; i--) {
final T bagEntry = (T) (weakThreadLocals ? ((WeakReference) list.remove(i)).get() : list.remove(i));
if (bagEntry != null && bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
}
// Otherwise, scan the shared list ... for maximum of timeout
timeout = timeUnit.toNanos(timeout);
Future<Boolean> addItemFuture = null;
final long startScan = System.nanoTime();
final long originTimeout = timeout;
long startSeq;
try {
do {
do {
startSeq = synchronizer.currentSequence();
for (final T bagEntry : sharedList) {
if (bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}
} while (startSeq < synchronizer.currentSequence());
if (addItemFuture == null || addItemFuture.isDone()) {
addItemFuture = listener.addBagItem();
}
timeout = originTimeout - (System.nanoTime() - startScan);
} while (timeout > 1000L && synchronizer.waitUntilSequenceExceeded(startSeq, timeout));
}
finally {
synchronizer.signal();
}
return null;
}
/**
* This method will return a borrowed object to the bag. Objects
* that are borrowed from the bag but never "requited" will result
* in a memory leak.
*
* @param bagEntry the value to return to the bag
* @throws NullPointerException if value is null
* @throws IllegalStateException if the requited value was not borrowed from the bag
*/
@SuppressWarnings("unchecked")
public void requite(final T bagEntry)
{
if (bagEntry.state().compareAndSet(STATE_IN_USE, STATE_NOT_IN_USE)) {
final List threadLocalList = threadList.get();
if (threadLocalList != null) {
threadLocalList.add((weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry));
}
synchronizer.signal();
}
else {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
}
}
/**
* Add a new object to the bag for others to borrow.
*
* @param bagEntry an object to add to the bag
*/
public void add(final T bagEntry)
{
if (closed) {
LOGGER.info("ConcurrentBag has been closed, ignoring add()");
throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
}
sharedList.add(bagEntry);
synchronizer.signal();
}
/**
* Remove a value from the bag. This method should only be called
* with objects obtained by <code>borrow(long, TimeUnit)</code> or <code>reserve(T)</code>
*
* @param bagEntry the value to remove
* @return true if the entry was removed, false otherwise
* @throws IllegalStateException if an attempt is made to remove an object
* from the bag that was not borrowed or reserved first
*/
public boolean remove(final T bagEntry)
{
if (!bagEntry.state().compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.state().compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
return false;
}
final boolean removed = sharedList.remove(bagEntry);
if (!removed && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
}
return removed;
}
/**
* Close the bag to further adds.
*/
@Override
public void close()
{
closed = true;
}
/**
* This method provides a "snapshot" in time of the BagEntry
* items in the bag in the specified state. It does not "lock"
* or reserve items in any way. Call <code>reserve(T)</code>
* on items in list before performing any action on them.
*
* @param state one of the {@link IConcurrentBagEntry} states
* @return a possibly empty list of objects having the state specified
*/
public List<T> values(final int state)
{
final ArrayList<T> list = new ArrayList<>(sharedList.size());
for (final T reference : sharedList) {
if (reference.state().get() == state) {
list.add(reference);
}
}
return list;
}
/**
* This method provides a "snapshot" in time of the bag items. It
* does not "lock" or reserve items in any way. Call <code>reserve(T)</code>
* on items in the list, or understand the concurrency implications of
* modifying items, before performing any action on them.
*
* @return a possibly empty list of (all) bag items
*/
@SuppressWarnings("unchecked")
public List<T> values()
{
return (List<T>) sharedList.clone();
}
/**
* The method is used to make an item in the bag "unavailable" for
* borrowing. It is primarily used when wanting to operate on items
* returned by the <code>values(int)</code> method. Items that are
* reserved can be removed from the bag via <code>remove(T)</code>
* without the need to unreserve them. Items that are not removed
* from the bag can be make available for borrowing again by calling
* the <code>unreserve(T)</code> method.
*
* @param bagEntry the item to reserve
* @return true if the item was able to be reserved, false otherwise
*/
public boolean reserve(final T bagEntry)
{
return bagEntry.state().compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED);
}
/**
* This method is used to make an item reserved via <code>reserve(T)</code>
* available again for borrowing.
*
* @param bagEntry the item to unreserve
*/
public void unreserve(final T bagEntry)
{
if (bagEntry.state().compareAndSet(STATE_RESERVED, STATE_NOT_IN_USE)) {
synchronizer.signal();
}
else {
LOGGER.warn("Attempt to relinquish an object to the bag that was not reserved: {}", bagEntry);
}
}
/**
* Get the number of threads pending (waiting) for an item from the
* bag to become available.
*
* @return the number of threads waiting for items from the bag
*/
public int getPendingQueue()
{
return synchronizer.getQueueLength();
}
/**
* Get a count of the number of items in the specified state at the time of this call.
*
* @param state the state of the items to count
* @return a count of how many items in the bag are in the specified state
*/
public int getCount(final int state)
{
int count = 0;
for (final T reference : sharedList) {
if (reference.state().get() == state) {
count++;
}
}
return count;
}
/**
* Get the total number of items in the bag.
*
* @return the number of items in the bag
*/
public int size()
{
return sharedList.size();
}
public void dumpState()
{
for (T bagEntry : sharedList) {
LOGGER.info(bagEntry.toString());
}
}
/**
* Determine whether to use WeakReferences based on whether there is a
* custom ClassLoader implementation sitting between this class and the
* System ClassLoader.
*
* @return true if we should use WeakReferences in our ThreadLocals, false otherwise
*/
private boolean useWeakThreadLocals()
{
try {
if (System.getProperty("com.zaxxer.hikari.useWeakReferences") != null) { // undocumented manual override of WeakReference behavior
return Boolean.getBoolean("com.zaxxer.hikari.useWeakReferences");
}
return getClass().getClassLoader() != ClassLoader.getSystemClassLoader();
}
catch (SecurityException se) {
return true;
}
}
}