Skip to content

Commit

Permalink
Another redesign of the I/O session management API
Browse files Browse the repository at this point in the history
  • Loading branch information
ok2c committed Dec 13, 2010
1 parent ba61727 commit ebb89d5
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 116 deletions.
Expand Up @@ -53,7 +53,7 @@ public static void main(String[] args) throws Exception {
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.USER_AGENT, "HttpComponents/1.1");
DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(1, params);
BasicIOSessionManager sessmrg = new BasicIOSessionManager(ioReactor);
BasicIOSessionManager sessmrg = new BasicIOSessionManager(ioReactor, params);
sessmrg.setTotalMax(5);
sessmrg.setDefaultMaxPerHost(3);

Expand Down
Expand Up @@ -95,7 +95,7 @@ public BasicHttpAsyncClient(final HttpParams params) throws IOReactorException {
this.params = createDefaultHttpParams();
}
this.ioReactor = new DefaultConnectingIOReactor(2, this.params);
this.sessmrg = new BasicIOSessionManager(this.ioReactor);
this.sessmrg = new BasicIOSessionManager(this.ioReactor, params);
}

protected HttpParams createDefaultHttpParams() {
Expand Down Expand Up @@ -128,7 +128,7 @@ protected ConnectionReuseStrategy createConnectionReuseStrategy() {
private void doExecute() {
NHttpClientProtocolHandler handler = new NHttpClientProtocolHandler(
createConnectionReuseStrategy());
IOEventDispatch ioEventDispatch = new InternalClientEventDispatch(handler, this.params);
IOEventDispatch ioEventDispatch = new InternalClientEventDispatch(handler);
try {
this.ioReactor.execute(ioEventDispatch);
} catch (IOException ex) {
Expand Down
Expand Up @@ -29,7 +29,6 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SelectionKey;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -237,8 +236,8 @@ public T getResult() {

private synchronized void sessionRequestCompleted(final ManagedIOSession session) {
this.managedSession = session;
this.managedSession.setAttribute(HTTP_EXCHANGE_HANDLER, this);
this.managedSession.setEvent(SelectionKey.OP_WRITE);
this.managedSession.getContext().setAttribute(HTTP_EXCHANGE_HANDLER, this);
this.managedSession.requestOutput();
}

private synchronized void sessionRequestFailed(final Exception ex) {
Expand Down
Expand Up @@ -26,88 +26,62 @@
*/
package org.apache.http.impl.nio.client;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.impl.nio.DefaultClientIOEventDispatch;
import org.apache.http.impl.nio.conn.LoggingIOSession;
import org.apache.http.impl.nio.conn.LoggingNHttpClientConnection;
import org.apache.http.nio.NHttpClientHandler;
import org.apache.http.nio.NHttpClientIOTarget;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOSession;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.ExecutionContext;

class InternalClientEventDispatch extends DefaultClientIOEventDispatch {
class InternalClientEventDispatch implements IOEventDispatch {

private static final String HEADERS = "org.apache.http.headers";
private static final String WIRE = "org.apache.http.wire";
private final NHttpClientHandler handler;

private Log log;
InternalClientEventDispatch(final NHttpClientHandler handler) {
super();
this.handler = handler;
}

InternalClientEventDispatch(
final NHttpClientHandler handler,
final HttpParams params) {
super(handler, params);
this.log = LogFactory.getLog(getClass());
private NHttpClientIOTarget getConnection(final IOSession session) {
return (NHttpClientIOTarget) session.getAttribute(ExecutionContext.HTTP_CONNECTION);
}

@Override
protected NHttpClientIOTarget createConnection(IOSession session) {
Log log = LogFactory.getLog(session.getClass());
Log wirelog = LogFactory.getLog(WIRE);
Log headerlog = LogFactory.getLog(HEADERS);
if (log.isDebugEnabled() || wirelog.isDebugEnabled()) {
session = new LoggingIOSession(session, log, wirelog);
}
if (headerlog.isDebugEnabled()) {
return new LoggingNHttpClientConnection(
headerlog,
session,
createHttpResponseFactory(),
this.allocator,
this.params);
} else {
return super.createConnection(session);
private void assertValid(final NHttpClientIOTarget conn) {
if (conn == null) {
throw new IllegalStateException("HTTP connection is null");
}
}

@Override
public void connected(final IOSession session) {
if (this.log.isDebugEnabled()) {
this.log.debug("Session connected: " + session);
}
super.connected(session);
NHttpClientIOTarget conn = getConnection(session);
assertValid(conn);
Object attachment = session.getAttribute(IOSession.ATTACHMENT_KEY);
this.handler.connected(conn, attachment);
}

@Override
public void disconnected(final IOSession session) {
if (this.log.isDebugEnabled()) {
this.log.debug("Session disconnected: " + session);
NHttpClientIOTarget conn = getConnection(session);
if (conn != null) {
this.handler.closed(conn);
}
super.disconnected(session);
}

@Override
public void inputReady(final IOSession session) {
if (this.log.isDebugEnabled()) {
this.log.debug("Session input ready: " + session);
}
super.inputReady(session);
NHttpClientIOTarget conn = getConnection(session);
assertValid(conn);
conn.consumeInput(this.handler);
}

@Override
public void outputReady(final IOSession session) {
if (this.log.isDebugEnabled()) {
this.log.debug("Session output ready: " + session);
}
super.outputReady(session);
NHttpClientIOTarget conn = getConnection(session);
assertValid(conn);
conn.produceOutput(this.handler);
}

@Override
public void timeout(IOSession session) {
if (this.log.isDebugEnabled()) {
this.log.debug("Session timed out: " + session);
NHttpClientIOTarget conn = getConnection(session);
if (conn != null) {
this.handler.timeout(conn);
}
super.timeout(session);
}

}
Expand Up @@ -45,7 +45,6 @@
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.client.HttpAsyncExchangeHandler;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpContext;

/**
Expand Down Expand Up @@ -98,7 +97,6 @@ public void connected(final NHttpClientConnection conn, final Object attachment)
this.log.debug("Connected " + formatState(conn, httpexchange));
}
context.setAttribute(HTTP_EXCHNAGE, httpexchange);
context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
requestReady(conn);
}

Expand Down
Expand Up @@ -31,29 +31,45 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpResponseFactory;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.DefaultHttpResponseFactory;
import org.apache.http.impl.nio.DefaultNHttpClientConnection;
import org.apache.http.impl.nio.pool.PoolEntry;
import org.apache.http.impl.nio.pool.PoolEntryCallback;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.concurrent.BasicFuture;
import org.apache.http.nio.concurrent.FutureCallback;
import org.apache.http.nio.conn.ManagedIOSession;
import org.apache.http.nio.conn.IOSessionManager;
import org.apache.http.nio.conn.PoolStats;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOSession;
import org.apache.http.nio.util.ByteBufferAllocator;
import org.apache.http.nio.util.HeapByteBufferAllocator;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.ExecutionContext;

public class BasicIOSessionManager implements IOSessionManager {

private final Log log;
private static final String HEADERS = "org.apache.http.headers";
private static final String WIRE = "org.apache.http.wire";

private final Log log = LogFactory.getLog(getClass());

private final HttpSessionPool pool;
private final HttpParams params;

public BasicIOSessionManager(final ConnectingIOReactor ioreactor) {
public BasicIOSessionManager(final ConnectingIOReactor ioreactor, final HttpParams params) {
super();
if (ioreactor == null) {
throw new IllegalArgumentException("I/O reactor may not be null");
}
this.log = LogFactory.getLog(getClass());
if (params == null) {
throw new IllegalArgumentException("HTTP parameters may not be null");
}
this.pool = new HttpSessionPool(ioreactor);
this.params = params;
}

public synchronized Future<ManagedIOSession> leaseSession(
Expand Down Expand Up @@ -131,6 +147,14 @@ public synchronized void shutdown() {
this.pool.shutdown();
}

protected ByteBufferAllocator createByteBufferAllocator() {
return new HeapByteBufferAllocator();
}

protected HttpResponseFactory createHttpResponseFactory() {
return new DefaultHttpResponseFactory();
}

class InternalPoolEntryCallback implements PoolEntryCallback<HttpRoute> {

private final BasicFuture<ManagedIOSession> future;
Expand All @@ -145,9 +169,36 @@ public void completed(final PoolEntry<HttpRoute> entry) {
if (log.isDebugEnabled()) {
log.debug("I/O session allocated: " + entry);
}
IOSession session = entry.getIOSession();
NHttpClientConnection conn = (NHttpClientConnection) session.getAttribute(
ExecutionContext.HTTP_CONNECTION);
if (conn == null) {
Log log = LogFactory.getLog(session.getClass());
Log wirelog = LogFactory.getLog(WIRE);
Log headerlog = LogFactory.getLog(HEADERS);
if (log.isDebugEnabled() || wirelog.isDebugEnabled()) {
session = new LoggingIOSession(session, log, wirelog);
}
if (headerlog.isDebugEnabled()) {
conn = new LoggingNHttpClientConnection(
headerlog,
session,
createHttpResponseFactory(),
createByteBufferAllocator(),
params);
} else {
conn = new DefaultNHttpClientConnection(
session,
createHttpResponseFactory(),
createByteBufferAllocator(),
params);
}
session.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
}
BasicManagedIOSession result = new BasicManagedIOSession(
BasicIOSessionManager.this,
entry);
entry,
conn);
if (!this.future.completed(result)) {
pool.release(entry, true);
}
Expand Down

0 comments on commit ebb89d5

Please sign in to comment.