Skip to content

Commit

Permalink
continue making connection pool extensible
Browse files Browse the repository at this point in the history
  • Loading branch information
artgon committed Aug 28, 2018
1 parent 7e38589 commit 2f86aaf
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 8 deletions.
Expand Up @@ -250,11 +250,7 @@ else if (!conn.isActive()) {
conn.setInPool(false);
}
else {
final ChannelPipeline pipeline = conn.getChannel().pipeline();
removeHandlerFromPipeline(OriginResponseReceiver.CHANNEL_HANDLER_NAME, pipeline);
pipeline.addAfter(PassportStateHttpClientHandler.PASSPORT_STATE_HTTP_CLIENT_HANDLER_NAME, IDLE_STATE_HANDLER_NAME,
new IdleStateHandler(0, 0, connPoolConfig.getIdleTimeout(), TimeUnit.MILLISECONDS));

releaseHandlers(conn);

// Attempt to return connection to the pool.
IConnectionPool pool = perServerPools.get(conn.getServer());
Expand All @@ -275,6 +271,13 @@ else if (!conn.isActive()) {
return released;
}

protected void releaseHandlers(PooledConnection conn) {
final ChannelPipeline pipeline = conn.getChannel().pipeline();
removeHandlerFromPipeline(OriginResponseReceiver.CHANNEL_HANDLER_NAME, pipeline);
pipeline.addAfter(PassportStateHttpClientHandler.PASSPORT_STATE_HTTP_CLIENT_HANDLER_NAME, IDLE_STATE_HANDLER_NAME,
new IdleStateHandler(0, 0, connPoolConfig.getIdleTimeout(), TimeUnit.MILLISECONDS));
}

public static void removeHandlerFromPipeline(final String handlerName, final ChannelPipeline pipeline) {
if (pipeline.get(handlerName) != null) {
pipeline.remove(handlerName);
Expand Down
Expand Up @@ -143,12 +143,16 @@ private void onAcquire(final PooledConnection conn, String httpMethod, String ur
int attemptNum, CurrentPassport passport)
{
passport.setOnChannel(conn.getChannel());
DefaultClientChannelManager.removeHandlerFromPipeline(DefaultClientChannelManager.IDLE_STATE_HANDLER_NAME, conn.getChannel().pipeline());
removeIdleStateHandler(conn);

conn.setInUse();
if (LOG.isDebugEnabled()) LOG.debug("PooledConnection acquired: " + conn.toString());
}

protected void removeIdleStateHandler(PooledConnection conn) {
DefaultClientChannelManager.removeHandlerFromPipeline(DefaultClientChannelManager.IDLE_STATE_HANDLER_NAME, conn.getChannel().pipeline());
}

@Override
public Promise<PooledConnection> acquire(EventLoop eventLoop, Object key, String httpMethod, String uri,
int attemptNum, CurrentPassport passport,
Expand Down Expand Up @@ -186,8 +190,10 @@ public PooledConnection tryGettingFromConnectionPool(EventLoop eventLoop)

conn.setInPool(false);

initConnection(conn);

/* Check that the connection is still open. */
if ((conn.isActive() && conn.getChannel().isOpen())) {
if (isValidFromPool(conn)) {
reuseConnCounter.increment();
connsInUse.incrementAndGet();
connsInPool.decrementAndGet();
Expand All @@ -202,6 +208,14 @@ public PooledConnection tryGettingFromConnectionPool(EventLoop eventLoop)
return null;
}

protected boolean isValidFromPool(PooledConnection conn) {
return conn.isActive() && conn.getChannel().isOpen();
}

protected void initConnection(PooledConnection conn) {
// custom init code
}

protected Deque<PooledConnection> getPoolForEventLoop(EventLoop eventLoop)
{
// We don't want to block under any circumstances, so can't use CHM.computeIfAbsent().
Expand Down
Expand Up @@ -36,7 +36,7 @@
*/
public class PooledConnection {

private static final AttributeKey<PooledConnection> CHANNEL_ATTR = AttributeKey.newInstance("_pooled_connection");
protected static final AttributeKey<PooledConnection> CHANNEL_ATTR = AttributeKey.newInstance("_pooled_connection");
public static final String READ_TIMEOUT_HANDLER_NAME = "readTimeoutHandler";

private final Server server;
Expand Down

0 comments on commit 2f86aaf

Please sign in to comment.