Skip to content

Commit

Permalink
377587 ConnectHandler write will block on partial write
Browse files Browse the repository at this point in the history
  • Loading branch information
gregw committed Apr 25, 2012
1 parent c4416e2 commit b5d377a
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 48 deletions.
Expand Up @@ -43,7 +43,7 @@
*/
public class ConnectHandler extends HandlerWrapper
{
private final Logger _logger = Log.getLogger(getClass().getName());
private static final Logger LOG = Log.getLogger(ConnectHandler.class);
private final SelectorManager _selectorManager = new Manager();
private volatile int _connectTimeout = 5000;
private volatile int _writeTimeout = 30000;
Expand Down Expand Up @@ -171,15 +171,15 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
{
if (HttpMethods.CONNECT.equalsIgnoreCase(request.getMethod()))
{
_logger.debug("CONNECT request for {}", request.getRequestURI());
LOG.debug("CONNECT request for {}", request.getRequestURI());
try
{
handleConnect(baseRequest, request, response, request.getRequestURI());
}
catch(Exception e)
{
_logger.warn("ConnectHandler "+baseRequest.getUri()+" "+ e);
_logger.debug(e);
LOG.warn("ConnectHandler "+baseRequest.getUri()+" "+ e);
LOG.debug(e);
}
}
else
Expand Down Expand Up @@ -217,7 +217,7 @@ protected void handleConnect(Request baseRequest, HttpServletRequest request, Ht

if (!validateDestination(host))
{
_logger.info("ProxyHandler: Forbidden destination " + host);
LOG.info("ProxyHandler: Forbidden destination " + host);
response.setStatus(HttpServletResponse.SC_FORBIDDEN);
baseRequest.setHandled(true);
return;
Expand Down Expand Up @@ -326,22 +326,22 @@ protected SocketChannel connect(HttpServletRequest request, String host, int por
try
{
// Connect to remote server
_logger.debug("Establishing connection to {}:{}", host, port);
LOG.debug("Establishing connection to {}:{}", host, port);
channel.socket().setTcpNoDelay(true);
channel.socket().connect(new InetSocketAddress(host, port), getConnectTimeout());
_logger.debug("Established connection to {}:{}", host, port);
LOG.debug("Established connection to {}:{}", host, port);
return channel;
}
catch (IOException x)
{
_logger.debug("Failed to establish connection to " + host + ":" + port, x);
LOG.debug("Failed to establish connection to " + host + ":" + port, x);
try
{
channel.close();
}
catch (IOException xx)
{
_logger.ignore(xx);
LOG.ignore(xx);
}
throw x;
}
Expand All @@ -357,7 +357,7 @@ private void upgradeConnection(HttpServletRequest request, HttpServletResponse r
// so that Jetty understands that it has to upgrade the connection
request.setAttribute("org.eclipse.jetty.io.Connection", connection);
response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);
_logger.debug("Upgraded connection to {}", connection);
LOG.debug("Upgraded connection to {}", connection);
}

private void register(SocketChannel channel, ProxyToServerConnection proxyToServer) throws IOException
Expand Down Expand Up @@ -396,24 +396,27 @@ protected int write(EndPoint endPoint, Buffer buffer, ConcurrentMap<String, Obje
return 0;

int length = buffer.length();
StringBuilder builder = new StringBuilder();
int written = endPoint.flush(buffer);
builder.append(written);
buffer.compact();
if (!endPoint.isBlocking())
final StringBuilder debug = LOG.isDebugEnabled()?new StringBuilder():null;
int flushed = endPoint.flush(buffer);
if (debug!=null)
debug.append(flushed);

// Loop until all written
while (buffer.length()>0 && !endPoint.isOutputShutdown())
{
while (buffer.space() == 0)
if (!endPoint.isBlocking())
{
boolean ready = endPoint.blockWritable(getWriteTimeout());
if (!ready)
throw new IOException("Write timeout");

written = endPoint.flush(buffer);
builder.append("+").append(written);
buffer.compact();
}
flushed = endPoint.flush(buffer);
if (debug!=null)
debug.append("+").append(flushed);
}
_logger.debug("Written {}/{} bytes {}", builder, length, endPoint);

LOG.debug("Written {}/{} bytes {}", debug, length, endPoint);
buffer.compact();
return length;
}

Expand Down Expand Up @@ -466,7 +469,7 @@ protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnec
public class ProxyToServerConnection implements AsyncConnection
{
private final CountDownLatch _ready = new CountDownLatch(1);
private final Buffer _buffer = new IndirectNIOBuffer(1024);
private final Buffer _buffer = new IndirectNIOBuffer(4096);
private final ConcurrentMap<String, Object> _context;
private volatile Buffer _data;
private volatile ClientToProxyConnection _toClient;
Expand All @@ -490,7 +493,7 @@ public String toString()

public Connection handle() throws IOException
{
_logger.debug("{}: begin reading from server", this);
LOG.debug("{}: begin reading from server", this);
try
{
writeData();
Expand All @@ -501,7 +504,7 @@ public Connection handle() throws IOException

if (read == -1)
{
_logger.debug("{}: server closed connection {}", this, _endPoint);
LOG.debug("{}: server closed connection {}", this, _endPoint);

if (_endPoint.isOutputShutdown() || !_endPoint.isOpen())
closeClient();
Expand All @@ -514,32 +517,32 @@ public Connection handle() throws IOException
if (read == 0)
break;

_logger.debug("{}: read from server {} bytes {}", this, read, _endPoint);
LOG.debug("{}: read from server {} bytes {}", this, read, _endPoint);
int written = write(_toClient._endPoint, _buffer, _context);
_logger.debug("{}: written to {} {} bytes", this, _toClient, written);
LOG.debug("{}: written to {} {} bytes", this, _toClient, written);
}
return this;
}
catch (ClosedChannelException x)
{
_logger.debug(x);
LOG.debug(x);
throw x;
}
catch (IOException x)
{
_logger.warn(this + ": unexpected exception", x);
LOG.warn(this + ": unexpected exception", x);
close();
throw x;
}
catch (RuntimeException x)
{
_logger.warn(this + ": unexpected exception", x);
LOG.warn(this + ": unexpected exception", x);
close();
throw x;
}
finally
{
_logger.debug("{}: end reading from server", this);
LOG.debug("{}: end reading from server", this);
}
}

Expand All @@ -560,7 +563,7 @@ private void writeData() throws IOException
try
{
int written = write(_endPoint, _data, _context);
_logger.debug("{}: written to server {} bytes", this, written);
LOG.debug("{}: written to server {} bytes", this, written);
}
finally
{
Expand Down Expand Up @@ -645,7 +648,7 @@ public void close()
}
catch (IOException x)
{
_logger.debug(this + ": unexpected exception closing the client", x);
LOG.debug(this + ": unexpected exception closing the client", x);
}

try
Expand All @@ -654,7 +657,7 @@ public void close()
}
catch (IOException x)
{
_logger.debug(this + ": unexpected exception closing the server", x);
LOG.debug(this + ": unexpected exception closing the server", x);
}
}

Expand All @@ -672,15 +675,15 @@ public void onIdleExpired(long idleForMs)
}
catch(Exception e)
{
_logger.debug(e);
LOG.debug(e);
close();
}
}
}

public class ClientToProxyConnection implements AsyncConnection
{
private final Buffer _buffer = new IndirectNIOBuffer(1024);
private final Buffer _buffer = new IndirectNIOBuffer(4096);
private final ConcurrentMap<String, Object> _context;
private final SocketChannel _channel;
private final EndPoint _endPoint;
Expand All @@ -707,14 +710,14 @@ public String toString()

public Connection handle() throws IOException
{
_logger.debug("{}: begin reading from client", this);
LOG.debug("{}: begin reading from client", this);
try
{
if (_firstTime)
{
_firstTime = false;
register(_channel, _toServer);
_logger.debug("{}: registered channel {} with connection {}", this, _channel, _toServer);
LOG.debug("{}: registered channel {} with connection {}", this, _channel, _toServer);
}

while (true)
Expand All @@ -723,7 +726,7 @@ public Connection handle() throws IOException

if (read == -1)
{
_logger.debug("{}: client closed connection {}", this, _endPoint);
LOG.debug("{}: client closed connection {}", this, _endPoint);

if (_endPoint.isOutputShutdown() || !_endPoint.isOpen())
closeServer();
Expand All @@ -736,33 +739,33 @@ public Connection handle() throws IOException
if (read == 0)
break;

_logger.debug("{}: read from client {} bytes {}", this, read, _endPoint);
LOG.debug("{}: read from client {} bytes {}", this, read, _endPoint);
int written = write(_toServer._endPoint, _buffer, _context);
_logger.debug("{}: written to {} {} bytes", this, _toServer, written);
LOG.debug("{}: written to {} {} bytes", this, _toServer, written);
}
return this;
}
catch (ClosedChannelException x)
{
_logger.debug(x);
LOG.debug(x);
closeServer();
throw x;
}
catch (IOException x)
{
_logger.warn(this + ": unexpected exception", x);
LOG.warn(this + ": unexpected exception", x);
close();
throw x;
}
catch (RuntimeException x)
{
_logger.warn(this + ": unexpected exception", x);
LOG.warn(this + ": unexpected exception", x);
close();
throw x;
}
finally
{
_logger.debug("{}: end reading from client", this);
LOG.debug("{}: end reading from client", this);
}
}

Expand Down Expand Up @@ -813,7 +816,7 @@ public void close()
}
catch (IOException x)
{
_logger.debug(this + ": unexpected exception closing the client", x);
LOG.debug(this + ": unexpected exception closing the client", x);
}

try
Expand All @@ -822,7 +825,7 @@ public void close()
}
catch (IOException x)
{
_logger.debug(this + ": unexpected exception closing the server", x);
LOG.debug(this + ": unexpected exception closing the server", x);
}
}

Expand All @@ -839,7 +842,7 @@ public void onIdleExpired(long idleForMs)
}
catch(Exception e)
{
_logger.debug(e);
LOG.debug(e);
close();
}
}
Expand Down
Expand Up @@ -89,19 +89,22 @@ protected Response readResponse(BufferedReader reader) throws IOException
headers.put(headerName.toLowerCase(), headerValue.toLowerCase());
}

StringBuilder body = new StringBuilder();
StringBuilder body;
if (headers.containsKey("content-length"))
{
int readLen = 0;
int length = Integer.parseInt(headers.get("content-length"));
body=new StringBuilder(length);
try
{
for (int i = 0; i < length; ++i)
{
char c = (char)reader.read();
body.append(c);
readLen++;

}

}
catch (SocketTimeoutException e)
{
Expand All @@ -111,6 +114,7 @@ protected Response readResponse(BufferedReader reader) throws IOException
}
else if ("chunked".equals(headers.get("transfer-encoding")))
{
body = new StringBuilder(64*1024);
while ((line = reader.readLine()) != null)
{
if ("0".equals(line))
Expand All @@ -120,6 +124,15 @@ else if ("chunked".equals(headers.get("transfer-encoding")))
break;
}

try
{
Thread.sleep(5);
}
catch (InterruptedException e)
{
e.printStackTrace();
}

int length = Integer.parseInt(line, 16);
for (int i = 0; i < length; ++i)
{
Expand All @@ -130,6 +143,7 @@ else if ("chunked".equals(headers.get("transfer-encoding")))
assertEquals("", line);
}
}
else throw new IllegalStateException();

return new Response(code, headers, body.toString().trim());
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.toolchain.test.OS;
import org.eclipse.jetty.util.log.Log;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -354,6 +355,7 @@ public void testCONNECTAndPOSTAndGET() throws Exception
@Test
public void testCONNECTAndPOSTWithBigBody() throws Exception
{
// Log.getLogger(ConnectHandler.class).setDebugEnabled(true);
String hostPort = "localhost:" + serverConnector.getLocalPort();
String request = "" +
"CONNECT " + hostPort + " HTTP/1.1\r\n" +
Expand Down

0 comments on commit b5d377a

Please sign in to comment.