Skip to content

Commit

Permalink
Fix race condition regression in ConcurrentBag and various other clea…
Browse files Browse the repository at this point in the history
…nup.
  • Loading branch information
brettwooldridge committed Jan 9, 2017
1 parent f0b3c52 commit 72e862d
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 60 deletions.
69 changes: 33 additions & 36 deletions src/main/java/com/zaxxer/hikari/pool/HikariPool.java
Expand Up @@ -16,7 +16,7 @@

package com.zaxxer.hikari.pool;

import static com.zaxxer.hikari.pool.PoolEntry.LASTACCESS_COMPARABLE;
import static com.zaxxer.hikari.pool.PoolEntry.LASTACCESS_REVERSE_COMPARABLE;
import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.STATE_IN_USE;
import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.STATE_NOT_IN_USE;
import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.STATE_REMOVED;
Expand All @@ -31,7 +31,7 @@
import java.sql.SQLTimeoutException;
import java.sql.SQLTransientConnectionException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -81,7 +81,7 @@ public class HikariPool extends PoolBase implements HikariPoolMXBean, IBagStateL
private final long ALIVE_BYPASS_WINDOW_MS = Long.getLong("com.zaxxer.hikari.aliveBypassWindowMs", MILLISECONDS.toMillis(500));
private final long HOUSEKEEPING_PERIOD_MS = Long.getLong("com.zaxxer.hikari.housekeeping.periodMs", SECONDS.toMillis(30));

private final PoolEntryCreator POOL_ENTRY_CREATOR = new PoolEntryCreator();
private final PoolEntryCreator POOL_ENTRY_CREATOR = new PoolEntryCreator(null);
private final Collection<Runnable> addConnectionQueue;
private final ThreadPoolExecutor addConnectionExecutor;
private final ThreadPoolExecutor closeConnectionExecutor;
Expand Down Expand Up @@ -123,7 +123,7 @@ public HikariPool(final HikariConfig config)

ThreadFactory threadFactory = config.getThreadFactory();

LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(config.getMaximumPoolSize() + 1);
LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(config.getMaximumPoolSize());
this.addConnectionQueue = unmodifiableCollection(addConnectionQueue);
this.addConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy());
this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
Expand Down Expand Up @@ -299,10 +299,10 @@ public void setHealthCheckRegistry(Object healthCheckRegistry)

/** {@inheritDoc} */
@Override
public Future<Boolean> addBagItem()
public Future<Boolean> addBagItem(final int waiting)
{
final int connectionsToAdd = connectionBag.getPendingQueue() - addConnectionQueue.size();
if (connectionsToAdd > 0) {
final boolean shouldAdd = waiting - addConnectionQueue.size() != 0;
if (shouldAdd) {
return addConnectionExecutor.submit(POOL_ENTRY_CREATOR);
}

Expand Down Expand Up @@ -345,9 +345,7 @@ public final int getThreadsAwaitingConnection()
@Override
public void softEvictConnections()
{
for (PoolEntry poolEntry : connectionBag.values()) {
softEvictConnection(poolEntry, "(connection evicted)", false /* not owner */);
}
connectionBag.values().forEach(poolEntry -> softEvictConnection(poolEntry, "(connection evicted)", false /* not owner */));
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -468,11 +466,7 @@ private synchronized void fillPool()
final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
- addConnectionQueue.size();
for (int i = 0; i < connectionsToAdd; i++) {
addConnectionExecutor.submit(POOL_ENTRY_CREATOR);
}

if (connectionsToAdd > 0 && LOGGER.isDebugEnabled()) {
addConnectionExecutor.execute(() -> logPoolState("After adding "));
addConnectionExecutor.submit((i < connectionsToAdd - 1) ? POOL_ENTRY_CREATOR : new PoolEntryCreator("After adding "));
}
}

Expand Down Expand Up @@ -547,8 +541,7 @@ private void softEvictConnection(final PoolEntry poolEntry, final String reason,
private void initializeHouseKeepingExecutorService()
{
if (config.getScheduledExecutorService() == null) {
ThreadFactory threadFactory = config.getThreadFactory();
threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory(poolName + " housekeeper", true);
final ThreadFactory threadFactory = Optional.ofNullable(config.getThreadFactory()).orElse(new DefaultThreadFactory(poolName + " housekeeper", true));
this.houseKeepingExecutorService = new ScheduledThreadPoolExecutor(1, threadFactory, new ThreadPoolExecutor.DiscardPolicy());
this.houseKeepingExecutorService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.houseKeepingExecutorService.setRemoveOnCancelPolicy(true);
Expand Down Expand Up @@ -605,6 +598,13 @@ private SQLException createTimeoutException(long startTime)
*/
private final class PoolEntryCreator implements Callable<Boolean>
{
private final String afterPrefix;

PoolEntryCreator(String afterPrefix)
{
this.afterPrefix = afterPrefix;
}

@Override
public Boolean call() throws Exception
{
Expand All @@ -614,6 +614,9 @@ public Boolean call() throws Exception
if (poolEntry != null) {
connectionBag.add(poolEntry);
LOGGER.debug("{} - Added connection {}", poolName, poolEntry.connection);
if (afterPrefix != null) {
logPoolState(afterPrefix);
}
return Boolean.TRUE;
}

Expand All @@ -636,7 +639,7 @@ private boolean shouldCreateAnotherConnection() {
/**
* The house keeping task to retire and maintain minimum idle connections.
*/
private class HouseKeeper implements Runnable
private final class HouseKeeper implements Runnable
{
private volatile long previous = clockSource.plusMillis(clockSource.currentTime(), -HOUSEKEEPING_PERIOD_MS);

Expand Down Expand Up @@ -669,24 +672,18 @@ else if (now > clockSource.plusMillis(previous, (3 * HOUSEKEEPING_PERIOD_MS) / 2
previous = now;

String afterPrefix = "Pool ";
if (idleTimeout > 0L) {
final List<PoolEntry> idleList = connectionBag.values(STATE_NOT_IN_USE);
int removable = idleList.size() - config.getMinimumIdle();
if (removable > 0) {
logPoolState("Before cleanup ");
afterPrefix = "After cleanup ";

// Sort pool entries on lastAccessed
idleList.sort(LASTACCESS_COMPARABLE);
for (PoolEntry poolEntry : idleList) {
if (clockSource.elapsedMillis(poolEntry.lastAccessed, now) > idleTimeout && connectionBag.reserve(poolEntry)) {
closeConnection(poolEntry, "(connection has passed idleTimeout)");
if (--removable == 0) {
break; // keep min idle cons
}
}
}
}
if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
logPoolState("Before cleanup ");
afterPrefix = "After cleanup ";

connectionBag
.values(STATE_NOT_IN_USE)
.stream()
.sorted(LASTACCESS_REVERSE_COMPARABLE)
.skip(config.getMinimumIdle())
.filter(p -> clockSource.elapsedMillis(p.lastAccessed, now) > idleTimeout)
.filter(p -> connectionBag.reserve(p))
.forEachOrdered(p -> closeConnection(p, "(connection has passed idleTimeout)"));
}

logPoolState(afterPrefix);
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/zaxxer/hikari/pool/PoolEntry.java
Expand Up @@ -39,7 +39,7 @@ final class PoolEntry implements IConcurrentBagEntry
private static final Logger LOGGER = LoggerFactory.getLogger(PoolEntry.class);
private static final AtomicIntegerFieldUpdater<PoolEntry> stateUpdater;

static final Comparator<PoolEntry> LASTACCESS_COMPARABLE;
static final Comparator<PoolEntry> LASTACCESS_REVERSE_COMPARABLE;

Connection connection;
long lastAccessed;
Expand All @@ -57,10 +57,10 @@ final class PoolEntry implements IConcurrentBagEntry

static
{
LASTACCESS_COMPARABLE = new Comparator<PoolEntry>() {
LASTACCESS_REVERSE_COMPARABLE = new Comparator<PoolEntry>() {
@Override
public int compare(final PoolEntry entryOne, final PoolEntry entryTwo) {
return Long.compare(entryOne.lastAccessed, entryTwo.lastAccessed);
return Long.compare(entryTwo.lastAccessed, entryOne.lastAccessed);
}
};

Expand Down
17 changes: 8 additions & 9 deletions src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java
Expand Up @@ -85,7 +85,7 @@ public static interface IConcurrentBagEntry

public static interface IBagStateListener
{
Future<Boolean> addBagItem();
Future<Boolean> addBagItem(int waiting);
}

/**
Expand Down Expand Up @@ -132,16 +132,19 @@ public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedExcepti
}

// Otherwise, scan the shared list ... then poll the handoff queue
waiters.incrementAndGet();
final int waiting = waiters.incrementAndGet();
try {
// scan the shared list
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
if (waiters.get() > 1) {
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}
listener.addBagItem();

listener.addBagItem(waiting);

timeout = timeUnit.toNanos(timeout);
do {
Expand Down Expand Up @@ -226,10 +229,6 @@ public boolean remove(final T bagEntry)
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
}

if (waiters.get() > 0) {
listener.addBagItem();
}

return removed;
}

Expand Down
14 changes: 3 additions & 11 deletions src/test/java/com/zaxxer/hikari/pool/TestConcurrentBag.java
Expand Up @@ -16,8 +16,8 @@

package com.zaxxer.hikari.pool;

import static com.zaxxer.hikari.pool.TestElf.newHikariConfig;
import static com.zaxxer.hikari.pool.TestElf.getPool;
import static com.zaxxer.hikari.pool.TestElf.newHikariConfig;
import static com.zaxxer.hikari.pool.TestElf.setSlf4jTargetStream;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
Expand All @@ -27,7 +27,7 @@

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.concurrent.Future;
import java.util.concurrent.CompletableFuture;

import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -36,7 +36,6 @@
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import com.zaxxer.hikari.util.ConcurrentBag;
import com.zaxxer.hikari.util.ConcurrentBag.IBagStateListener;

/**
*
Expand Down Expand Up @@ -70,14 +69,7 @@ public static void teardown()
@Test
public void testConcurrentBag() throws Exception
{
try (ConcurrentBag<PoolEntry> bag = new ConcurrentBag<>(new IBagStateListener() {
@Override
public Future<Boolean> addBagItem()
{
return null;
}
})
) {
try (ConcurrentBag<PoolEntry> bag = new ConcurrentBag<>((x) -> CompletableFuture.completedFuture(Boolean.TRUE))) {
assertEquals(0, bag.values(8).size());

PoolEntry reserved = pool.newPoolEntry();
Expand Down
Expand Up @@ -135,7 +135,7 @@ public static class FauxWebContext

public void createConcurrentBag() throws InterruptedException
{
try (ConcurrentBag<PoolEntry> bag = new ConcurrentBag<>(() -> CompletableFuture.completedFuture(Boolean.TRUE))) {
try (ConcurrentBag<PoolEntry> bag = new ConcurrentBag<>((x) -> CompletableFuture.completedFuture(Boolean.TRUE))) {

PoolEntry entry = new PoolEntry();
bag.add(entry);
Expand Down

0 comments on commit 72e862d

Please sign in to comment.