Skip to content

Commit

Permalink
Close#801 Additional changes, including eliding the submission of a new
Browse files Browse the repository at this point in the history
PoolEntryCreator to the addConnectionExecutor when there are
already enough queued tasks to cover number of pending waiters.
  • Loading branch information
brettwooldridge committed Jan 5, 2017
1 parent f75f0e3 commit c3ef9ba
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 102 deletions.
46 changes: 24 additions & 22 deletions src/main/java/com/zaxxer/hikari/pool/HikariPool.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@
import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.STATE_REMOVED; import static com.zaxxer.hikari.util.ConcurrentBag.IConcurrentBagEntry.STATE_REMOVED;
import static com.zaxxer.hikari.util.UtilityElf.createThreadPoolExecutor; import static com.zaxxer.hikari.util.UtilityElf.createThreadPoolExecutor;
import static com.zaxxer.hikari.util.UtilityElf.quietlySleep; import static com.zaxxer.hikari.util.UtilityElf.quietlySleep;
import static java.util.Collections.unmodifiableCollection;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.concurrent.TimeUnit.SECONDS;


import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLTimeoutException; import java.sql.SQLTimeoutException;
import java.sql.SQLTransientConnectionException; import java.sql.SQLTransientConnectionException;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -78,6 +82,7 @@ public class HikariPool extends PoolBase implements HikariPoolMXBean, IBagStateL
private final long HOUSEKEEPING_PERIOD_MS = Long.getLong("com.zaxxer.hikari.housekeeping.periodMs", SECONDS.toMillis(30)); private final long HOUSEKEEPING_PERIOD_MS = Long.getLong("com.zaxxer.hikari.housekeeping.periodMs", SECONDS.toMillis(30));


private final PoolEntryCreator POOL_ENTRY_CREATOR = new PoolEntryCreator(); private final PoolEntryCreator POOL_ENTRY_CREATOR = new PoolEntryCreator();
private final Collection<Runnable> addConnectionQueue;
private final ThreadPoolExecutor addConnectionExecutor; private final ThreadPoolExecutor addConnectionExecutor;
private final ThreadPoolExecutor closeConnectionExecutor; private final ThreadPoolExecutor closeConnectionExecutor;
private final ScheduledThreadPoolExecutor houseKeepingExecutorService; private final ScheduledThreadPoolExecutor houseKeepingExecutorService;
Expand Down Expand Up @@ -115,6 +120,9 @@ public HikariPool(final HikariConfig config)
registerMBeans(this); registerMBeans(this);


ThreadFactory threadFactory = config.getThreadFactory(); ThreadFactory threadFactory = config.getThreadFactory();

LinkedBlockingQueue<Runnable> addConnectionQueue = new LinkedBlockingQueue<>(config.getMaximumPoolSize() + 1);
this.addConnectionQueue = unmodifiableCollection(addConnectionQueue);
this.addConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy()); this.addConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardPolicy());
this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); this.closeConnectionExecutor = createThreadPoolExecutor(config.getMaximumPoolSize(), poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());


Expand Down Expand Up @@ -304,7 +312,12 @@ public void setHealthCheckRegistry(Object healthCheckRegistry)
@Override @Override
public Future<Boolean> addBagItem() public Future<Boolean> addBagItem()
{ {
return addConnectionExecutor.submit(POOL_ENTRY_CREATOR); final int connectionsToAdd = connectionBag.getPendingQueue() - addConnectionQueue.size();
if (connectionsToAdd > 0) {
return addConnectionExecutor.submit(POOL_ENTRY_CREATOR);
}

return CompletableFuture.completedFuture(Boolean.TRUE);
} }


// *********************************************************************** // ***********************************************************************
Expand Down Expand Up @@ -417,13 +430,10 @@ final void closeConnection(final PoolEntry poolEntry, final String closureReason
LOGGER.warn("{} - Unexpected value of totalConnections={}", poolName, tc, new Exception()); LOGGER.warn("{} - Unexpected value of totalConnections={}", poolName, tc, new Exception());
} }
final Connection connection = poolEntry.close(); final Connection connection = poolEntry.close();
closeConnectionExecutor.execute(new Runnable() { closeConnectionExecutor.execute(() -> {
@Override quietlyCloseConnection(connection, closureReason);
public void run() { if (poolState == POOL_NORMAL) {
quietlyCloseConnection(connection, closureReason); fillPool();
if (poolState == POOL_NORMAL) {
fillPool();
}
} }
}); });
} }
Expand All @@ -446,12 +456,9 @@ private PoolEntry createPoolEntry()
// variance up to 2.5% of the maxlifetime // variance up to 2.5% of the maxlifetime
final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0; final long variance = maxLifetime > 10_000 ? ThreadLocalRandom.current().nextLong( maxLifetime / 40 ) : 0;
final long lifetime = maxLifetime - variance; final long lifetime = maxLifetime - variance;
poolEntry.setFutureEol(houseKeepingExecutorService.schedule(new Runnable() { poolEntry.setFutureEol(houseKeepingExecutorService.schedule(
@Override () -> softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */),
public void run() { lifetime, MILLISECONDS));
softEvictConnection(poolEntry, "(connection has passed maxLifetime)", false /* not owner */);
}
}, lifetime, MILLISECONDS));
} }


return poolEntry; return poolEntry;
Expand All @@ -470,18 +477,13 @@ public void run() {
private synchronized void fillPool() private synchronized void fillPool()
{ {
final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections()) final int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
- addConnectionExecutor.getQueue().size(); - addConnectionQueue.size();
for (int i = 0; i < connectionsToAdd; i++) { for (int i = 0; i < connectionsToAdd; i++) {
addConnectionExecutor.submit(POOL_ENTRY_CREATOR); addConnectionExecutor.submit(POOL_ENTRY_CREATOR);
} }


if (connectionsToAdd > 0 && LOGGER.isDebugEnabled()) { if (connectionsToAdd > 0 && LOGGER.isDebugEnabled()) {
addConnectionExecutor.execute(new Runnable() { addConnectionExecutor.execute(() -> logPoolState("After adding "));
@Override
public void run() {
logPoolState("After adding ");
}
});
} }
} }


Expand Down Expand Up @@ -592,7 +594,7 @@ private SQLException createTimeoutException(long startTime)
/** /**
* Creating and adding poolEntries (connections) to the pool. * Creating and adding poolEntries (connections) to the pool.
*/ */
private class PoolEntryCreator implements Callable<Boolean> private final class PoolEntryCreator implements Callable<Boolean>
{ {
@Override @Override
public Boolean call() throws Exception public Boolean call() throws Exception
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/zaxxer/hikari/util/UtilityElf.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.sql.Connection; import java.sql.Connection;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -112,6 +113,26 @@ public static ThreadPoolExecutor createThreadPoolExecutor(final int queueSize, f
return executor; return executor;
} }


/**
* Create a ThreadPoolExecutor.
*
* @param queue the BlockingQueue to use
* @param threadName the thread name
* @param threadFactory an optional ThreadFactory
* @param policy the RejectedExecutionHandler policy
* @return a ThreadPoolExecutor
*/
public static ThreadPoolExecutor createThreadPoolExecutor(final BlockingQueue<Runnable> queue, final String threadName, ThreadFactory threadFactory, final RejectedExecutionHandler policy)
{
if (threadFactory == null) {
threadFactory = new DefaultThreadFactory(threadName, true);
}

ThreadPoolExecutor executor = new ThreadPoolExecutor(1 /*core*/, 1 /*max*/, 5 /*keepalive*/, SECONDS, queue, threadFactory, policy);
executor.allowCoreThreadTimeOut(true);
return executor;
}

// *********************************************************************** // ***********************************************************************
// Misc. public methods // Misc. public methods
// *********************************************************************** // ***********************************************************************
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/com/zaxxer/hikari/mocks/StubDataSource.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class StubDataSource implements DataSource
private String password; private String password;
private PrintWriter logWriter; private PrintWriter logWriter;
private SQLException throwException; private SQLException throwException;
private int connectionAcquistionTime = 0; private long connectionAcquistionTime = 0;
private int loginTimeout; private int loginTimeout;


public String getUser() public String getUser()
Expand Down Expand Up @@ -143,7 +143,7 @@ public void setThrowException(SQLException e)
this.throwException = e; this.throwException = e;
} }


public void setConnectionAcquistionTime(int connectionAcquisitionTime) { public void setConnectionAcquistionTime(long connectionAcquisitionTime) {
this.connectionAcquistionTime = connectionAcquisitionTime; this.connectionAcquistionTime = connectionAcquisitionTime;
} }
} }
Loading

0 comments on commit c3ef9ba

Please sign in to comment.