From ebb89d53e90d9c2c79f36ca699341eefeeb6e47c Mon Sep 17 00:00:00 2001 From: Oleg Kalnichevski Date: Mon, 13 Dec 2010 14:39:45 +0000 Subject: [PATCH] Another redesign of the I/O session management API git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpasyncclient/trunk@1045143 13f79535-47bb-0310-9956-ffa450edef68 --- .../nio/client/AsyncClientRequest.java | 2 +- .../impl/nio/client/BasicHttpAsyncClient.java | 4 +- .../client/DefaultAsyncRequestDirector.java | 5 +- .../client/InternalClientEventDispatch.java | 84 +++++--------- .../client/NHttpClientProtocolHandler.java | 2 - .../impl/nio/conn/BasicIOSessionManager.java | 59 +++++++++- .../impl/nio/conn/BasicManagedIOSession.java | 106 +++++++++++------- .../http/nio/conn/ManagedIOSession.java | 4 +- .../http/impl/nio/client/TestHttpAsync.java | 20 +++- 9 files changed, 170 insertions(+), 116 deletions(-) diff --git a/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java b/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java index b86d7eb6..8d7670b1 100644 --- a/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java +++ b/src/examples/org/apache/http/examples/nio/client/AsyncClientRequest.java @@ -53,7 +53,7 @@ public static void main(String[] args) throws Exception { .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true) .setParameter(CoreProtocolPNames.USER_AGENT, "HttpComponents/1.1"); DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(1, params); - BasicIOSessionManager sessmrg = new BasicIOSessionManager(ioReactor); + BasicIOSessionManager sessmrg = new BasicIOSessionManager(ioReactor, params); sessmrg.setTotalMax(5); sessmrg.setDefaultMaxPerHost(3); diff --git a/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java b/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java index 98b9764c..5c5f8c7b 100644 --- a/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java +++ b/src/main/java/org/apache/http/impl/nio/client/BasicHttpAsyncClient.java @@ -95,7 +95,7 @@ public BasicHttpAsyncClient(final HttpParams params) throws IOReactorException { this.params = createDefaultHttpParams(); } this.ioReactor = new DefaultConnectingIOReactor(2, this.params); - this.sessmrg = new BasicIOSessionManager(this.ioReactor); + this.sessmrg = new BasicIOSessionManager(this.ioReactor, params); } protected HttpParams createDefaultHttpParams() { @@ -128,7 +128,7 @@ protected ConnectionReuseStrategy createConnectionReuseStrategy() { private void doExecute() { NHttpClientProtocolHandler handler = new NHttpClientProtocolHandler( createConnectionReuseStrategy()); - IOEventDispatch ioEventDispatch = new InternalClientEventDispatch(handler, this.params); + IOEventDispatch ioEventDispatch = new InternalClientEventDispatch(handler); try { this.ioReactor.execute(ioEventDispatch); } catch (IOException ex) { diff --git a/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java b/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java index 70b3db4d..27e263c5 100644 --- a/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java +++ b/src/main/java/org/apache/http/impl/nio/client/DefaultAsyncRequestDirector.java @@ -29,7 +29,6 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.nio.channels.SelectionKey; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -237,8 +236,8 @@ public T getResult() { private synchronized void sessionRequestCompleted(final ManagedIOSession session) { this.managedSession = session; - this.managedSession.setAttribute(HTTP_EXCHANGE_HANDLER, this); - this.managedSession.setEvent(SelectionKey.OP_WRITE); + this.managedSession.getContext().setAttribute(HTTP_EXCHANGE_HANDLER, this); + this.managedSession.requestOutput(); } private synchronized void sessionRequestFailed(final Exception ex) { diff --git a/src/main/java/org/apache/http/impl/nio/client/InternalClientEventDispatch.java b/src/main/java/org/apache/http/impl/nio/client/InternalClientEventDispatch.java index cd81653b..7a5cfe09 100644 --- a/src/main/java/org/apache/http/impl/nio/client/InternalClientEventDispatch.java +++ b/src/main/java/org/apache/http/impl/nio/client/InternalClientEventDispatch.java @@ -26,88 +26,62 @@ */ package org.apache.http.impl.nio.client; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.http.impl.nio.DefaultClientIOEventDispatch; -import org.apache.http.impl.nio.conn.LoggingIOSession; -import org.apache.http.impl.nio.conn.LoggingNHttpClientConnection; import org.apache.http.nio.NHttpClientHandler; import org.apache.http.nio.NHttpClientIOTarget; +import org.apache.http.nio.reactor.IOEventDispatch; import org.apache.http.nio.reactor.IOSession; -import org.apache.http.params.HttpParams; +import org.apache.http.protocol.ExecutionContext; -class InternalClientEventDispatch extends DefaultClientIOEventDispatch { +class InternalClientEventDispatch implements IOEventDispatch { - private static final String HEADERS = "org.apache.http.headers"; - private static final String WIRE = "org.apache.http.wire"; + private final NHttpClientHandler handler; - private Log log; + InternalClientEventDispatch(final NHttpClientHandler handler) { + super(); + this.handler = handler; + } - InternalClientEventDispatch( - final NHttpClientHandler handler, - final HttpParams params) { - super(handler, params); - this.log = LogFactory.getLog(getClass()); + private NHttpClientIOTarget getConnection(final IOSession session) { + return (NHttpClientIOTarget) session.getAttribute(ExecutionContext.HTTP_CONNECTION); } - @Override - protected NHttpClientIOTarget createConnection(IOSession session) { - Log log = LogFactory.getLog(session.getClass()); - Log wirelog = LogFactory.getLog(WIRE); - Log headerlog = LogFactory.getLog(HEADERS); - if (log.isDebugEnabled() || wirelog.isDebugEnabled()) { - session = new LoggingIOSession(session, log, wirelog); - } - if (headerlog.isDebugEnabled()) { - return new LoggingNHttpClientConnection( - headerlog, - session, - createHttpResponseFactory(), - this.allocator, - this.params); - } else { - return super.createConnection(session); + private void assertValid(final NHttpClientIOTarget conn) { + if (conn == null) { + throw new IllegalStateException("HTTP connection is null"); } } - @Override public void connected(final IOSession session) { - if (this.log.isDebugEnabled()) { - this.log.debug("Session connected: " + session); - } - super.connected(session); + NHttpClientIOTarget conn = getConnection(session); + assertValid(conn); + Object attachment = session.getAttribute(IOSession.ATTACHMENT_KEY); + this.handler.connected(conn, attachment); } - @Override public void disconnected(final IOSession session) { - if (this.log.isDebugEnabled()) { - this.log.debug("Session disconnected: " + session); + NHttpClientIOTarget conn = getConnection(session); + if (conn != null) { + this.handler.closed(conn); } - super.disconnected(session); } - @Override public void inputReady(final IOSession session) { - if (this.log.isDebugEnabled()) { - this.log.debug("Session input ready: " + session); - } - super.inputReady(session); + NHttpClientIOTarget conn = getConnection(session); + assertValid(conn); + conn.consumeInput(this.handler); } - @Override public void outputReady(final IOSession session) { - if (this.log.isDebugEnabled()) { - this.log.debug("Session output ready: " + session); - } - super.outputReady(session); + NHttpClientIOTarget conn = getConnection(session); + assertValid(conn); + conn.produceOutput(this.handler); } - @Override public void timeout(IOSession session) { - if (this.log.isDebugEnabled()) { - this.log.debug("Session timed out: " + session); + NHttpClientIOTarget conn = getConnection(session); + if (conn != null) { + this.handler.timeout(conn); } - super.timeout(session); } } \ No newline at end of file diff --git a/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java b/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java index e5d0db69..153da811 100644 --- a/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java +++ b/src/main/java/org/apache/http/impl/nio/client/NHttpClientProtocolHandler.java @@ -45,7 +45,6 @@ import org.apache.http.nio.NHttpConnection; import org.apache.http.nio.client.HttpAsyncExchangeHandler; import org.apache.http.params.CoreProtocolPNames; -import org.apache.http.protocol.ExecutionContext; import org.apache.http.protocol.HttpContext; /** @@ -98,7 +97,6 @@ public void connected(final NHttpClientConnection conn, final Object attachment) this.log.debug("Connected " + formatState(conn, httpexchange)); } context.setAttribute(HTTP_EXCHNAGE, httpexchange); - context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn); requestReady(conn); } diff --git a/src/main/java/org/apache/http/impl/nio/conn/BasicIOSessionManager.java b/src/main/java/org/apache/http/impl/nio/conn/BasicIOSessionManager.java index 622f5020..fdf6db2a 100644 --- a/src/main/java/org/apache/http/impl/nio/conn/BasicIOSessionManager.java +++ b/src/main/java/org/apache/http/impl/nio/conn/BasicIOSessionManager.java @@ -31,9 +31,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.http.HttpResponseFactory; import org.apache.http.conn.routing.HttpRoute; +import org.apache.http.impl.DefaultHttpResponseFactory; +import org.apache.http.impl.nio.DefaultNHttpClientConnection; import org.apache.http.impl.nio.pool.PoolEntry; import org.apache.http.impl.nio.pool.PoolEntryCallback; +import org.apache.http.nio.NHttpClientConnection; import org.apache.http.nio.concurrent.BasicFuture; import org.apache.http.nio.concurrent.FutureCallback; import org.apache.http.nio.conn.ManagedIOSession; @@ -41,19 +45,31 @@ import org.apache.http.nio.conn.PoolStats; import org.apache.http.nio.reactor.ConnectingIOReactor; import org.apache.http.nio.reactor.IOSession; +import org.apache.http.nio.util.ByteBufferAllocator; +import org.apache.http.nio.util.HeapByteBufferAllocator; +import org.apache.http.params.HttpParams; +import org.apache.http.protocol.ExecutionContext; public class BasicIOSessionManager implements IOSessionManager { - private final Log log; + private static final String HEADERS = "org.apache.http.headers"; + private static final String WIRE = "org.apache.http.wire"; + + private final Log log = LogFactory.getLog(getClass()); + private final HttpSessionPool pool; + private final HttpParams params; - public BasicIOSessionManager(final ConnectingIOReactor ioreactor) { + public BasicIOSessionManager(final ConnectingIOReactor ioreactor, final HttpParams params) { super(); if (ioreactor == null) { throw new IllegalArgumentException("I/O reactor may not be null"); } - this.log = LogFactory.getLog(getClass()); + if (params == null) { + throw new IllegalArgumentException("HTTP parameters may not be null"); + } this.pool = new HttpSessionPool(ioreactor); + this.params = params; } public synchronized Future leaseSession( @@ -131,6 +147,14 @@ public synchronized void shutdown() { this.pool.shutdown(); } + protected ByteBufferAllocator createByteBufferAllocator() { + return new HeapByteBufferAllocator(); + } + + protected HttpResponseFactory createHttpResponseFactory() { + return new DefaultHttpResponseFactory(); + } + class InternalPoolEntryCallback implements PoolEntryCallback { private final BasicFuture future; @@ -145,9 +169,36 @@ public void completed(final PoolEntry entry) { if (log.isDebugEnabled()) { log.debug("I/O session allocated: " + entry); } + IOSession session = entry.getIOSession(); + NHttpClientConnection conn = (NHttpClientConnection) session.getAttribute( + ExecutionContext.HTTP_CONNECTION); + if (conn == null) { + Log log = LogFactory.getLog(session.getClass()); + Log wirelog = LogFactory.getLog(WIRE); + Log headerlog = LogFactory.getLog(HEADERS); + if (log.isDebugEnabled() || wirelog.isDebugEnabled()) { + session = new LoggingIOSession(session, log, wirelog); + } + if (headerlog.isDebugEnabled()) { + conn = new LoggingNHttpClientConnection( + headerlog, + session, + createHttpResponseFactory(), + createByteBufferAllocator(), + params); + } else { + conn = new DefaultNHttpClientConnection( + session, + createHttpResponseFactory(), + createByteBufferAllocator(), + params); + } + session.setAttribute(ExecutionContext.HTTP_CONNECTION, conn); + } BasicManagedIOSession result = new BasicManagedIOSession( BasicIOSessionManager.this, - entry); + entry, + conn); if (!this.future.completed(result)) { pool.release(entry, true); } diff --git a/src/main/java/org/apache/http/impl/nio/conn/BasicManagedIOSession.java b/src/main/java/org/apache/http/impl/nio/conn/BasicManagedIOSession.java index da1565e7..42fa0bf5 100644 --- a/src/main/java/org/apache/http/impl/nio/conn/BasicManagedIOSession.java +++ b/src/main/java/org/apache/http/impl/nio/conn/BasicManagedIOSession.java @@ -26,29 +26,37 @@ */ package org.apache.http.impl.nio.conn; -import java.net.SocketAddress; -import java.nio.channels.ByteChannel; +import java.io.IOException; +import org.apache.http.HttpConnectionMetrics; +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; import org.apache.http.conn.routing.HttpRoute; import org.apache.http.impl.nio.pool.PoolEntry; +import org.apache.http.nio.NHttpClientConnection; +import org.apache.http.nio.NHttpConnection; import org.apache.http.nio.conn.IOSessionManager; import org.apache.http.nio.conn.ManagedIOSession; import org.apache.http.nio.reactor.IOSession; -import org.apache.http.nio.reactor.SessionBufferStatus; +import org.apache.http.protocol.HttpContext; -public class BasicManagedIOSession implements ManagedIOSession { +class BasicManagedIOSession implements ManagedIOSession { private final IOSessionManager manager; private volatile PoolEntry entry; + private volatile NHttpClientConnection conn; private volatile boolean released; private volatile boolean reusable; public BasicManagedIOSession( final IOSessionManager manager, - final PoolEntry entry) { + final PoolEntry entry, + final NHttpClientConnection conn) { super(); this.manager = manager; this.entry = entry; + this.conn = conn; this.released = false; this.reusable = true; } @@ -68,6 +76,7 @@ public synchronized void releaseSession() { this.released = true; this.manager.releaseSession(this); this.entry = null; + this.conn = null; } public synchronized void abortSession() { @@ -80,6 +89,7 @@ public synchronized void abortSession() { iosession.shutdown(); this.manager.releaseSession(this); this.entry = null; + this.conn = null; } public synchronized Object getState() { @@ -122,12 +132,12 @@ public void close() { releaseSession(); } - public int getStatus() { - return this.released ? IOSession.ACTIVE : IOSession.CLOSED; + public boolean isOpen() { + return !this.released; } - public boolean isClosed() { - return this.released; + public boolean isStale() { + return !this.released; } private void assertValid() { @@ -136,69 +146,79 @@ private void assertValid() { } } - private IOSession getIOSession() { + public synchronized HttpConnectionMetrics getMetrics() { assertValid(); - return this.entry.getIOSession(); - } - - public synchronized ByteChannel channel() { - return getIOSession().channel(); + return this.conn.getMetrics(); } - public synchronized boolean hasBufferedInput() { - return getIOSession().hasBufferedInput(); + public synchronized int getSocketTimeout() { + assertValid(); + return this.conn.getSocketTimeout(); } - public synchronized boolean hasBufferedOutput() { - return getIOSession().hasBufferedOutput(); + public synchronized void setSocketTimeout(int timeout) { + assertValid(); + this.conn.setSocketTimeout(timeout); } - public synchronized int getEventMask() { - return getIOSession().getEventMask(); + public int getStatus() { + return this.released ? NHttpConnection.CLOSED : NHttpConnection.ACTIVE; } - public synchronized void setEvent(int op) { - getIOSession().setEvent(op); + public synchronized HttpContext getContext() { + assertValid(); + return this.conn.getContext(); } - public synchronized void clearEvent(int op) { - getIOSession().clearEvent(op); + public synchronized HttpRequest getHttpRequest() { + assertValid(); + return this.conn.getHttpRequest(); } - public synchronized void setEventMask(int ops) { - getIOSession().setEventMask(ops); + public synchronized HttpResponse getHttpResponse() { + assertValid(); + return this.conn.getHttpResponse(); } - public synchronized SocketAddress getLocalAddress() { - return getIOSession().getLocalAddress(); + public synchronized void requestInput() { + assertValid(); + this.conn.requestInput(); } - public synchronized SocketAddress getRemoteAddress() { - return getIOSession().getRemoteAddress(); + public synchronized void requestOutput() { + assertValid(); + this.conn.requestOutput(); } - public synchronized Object getAttribute(final String name) { - return getIOSession().getAttribute(name); + public synchronized void suspendInput() { + assertValid(); + this.conn.suspendInput(); } - public synchronized Object removeAttribute(final String name) { - return getIOSession().removeAttribute(name); + public synchronized void suspendOutput() { + assertValid(); + this.conn.suspendOutput(); } - public synchronized void setAttribute(final String name, final Object value) { - getIOSession().setAttribute(name, value); + public synchronized boolean isRequestSubmitted() { + assertValid(); + return this.conn.isRequestSubmitted(); } - public void setBufferStatus(final SessionBufferStatus bufstatus) { - throw new UnsupportedOperationException(); + public synchronized void resetInput() { + assertValid(); + this.conn.resetInput(); } - public synchronized int getSocketTimeout() { - return getIOSession().getSocketTimeout(); + public synchronized void resetOutput() { + assertValid(); + this.conn.resetOutput(); } - public void setSocketTimeout(final int timeout) { - getIOSession().setSocketTimeout(timeout); + public synchronized void submitRequest( + final HttpRequest request) throws IOException, HttpException { + assertValid(); + this.conn.submitRequest(request); } @Override diff --git a/src/main/java/org/apache/http/nio/conn/ManagedIOSession.java b/src/main/java/org/apache/http/nio/conn/ManagedIOSession.java index 0de0f9f1..7f67d1ac 100644 --- a/src/main/java/org/apache/http/nio/conn/ManagedIOSession.java +++ b/src/main/java/org/apache/http/nio/conn/ManagedIOSession.java @@ -26,9 +26,9 @@ */ package org.apache.http.nio.conn; -import org.apache.http.nio.reactor.IOSession; +import org.apache.http.nio.NHttpClientConnection; -public interface ManagedIOSession extends IOSession { +public interface ManagedIOSession extends NHttpClientConnection { Object getState(); diff --git a/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java b/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java index c08a5936..f315e101 100644 --- a/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java +++ b/src/test/java/org/apache/http/impl/nio/client/TestHttpAsync.java @@ -20,7 +20,10 @@ import org.apache.http.nio.client.HttpAsyncClient; import org.apache.http.nio.entity.NByteArrayEntity; import org.apache.http.nio.reactor.ConnectingIOReactor; -import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.CoreConnectionPNames; +import org.apache.http.params.CoreProtocolPNames; +import org.apache.http.params.HttpParams; +import org.apache.http.params.SyncBasicHttpParams; import org.apache.http.util.EntityUtils; import org.junit.After; import org.junit.Assert; @@ -39,9 +42,18 @@ public void setUp() throws Exception { this.localServer.registerDefaultHandlers(); int port = this.localServer.getServiceAddress().getPort(); this.target = new HttpHost("localhost", port); - ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(2, new BasicHttpParams()); - this.sessionManager = new BasicIOSessionManager(ioReactor); - this.httpclient = new BasicHttpAsyncClient(ioReactor, this.sessionManager, null); + + HttpParams params = new SyncBasicHttpParams(); + params + .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000) + .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 10000) + .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024) + .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true) + .setParameter(CoreProtocolPNames.USER_AGENT, "HttpComponents/1.1"); + + ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(2, params); + this.sessionManager = new BasicIOSessionManager(ioReactor, params); + this.httpclient = new BasicHttpAsyncClient(ioReactor, this.sessionManager, params); } @After