Skip to content

Commit

Permalink
Reworked buffer releasing to ensure that it is always executed before
Browse files Browse the repository at this point in the history
fillInterested() is called.
This is needed to avoid race conditions where fillInterested()
triggers a new thread entering onFillable() and acquiring a new buffer
while the previous thread is releasing the previous buffer.
  • Loading branch information
sbordet committed Jan 2, 2015
1 parent a85a74b commit 5bed632
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,62 +99,69 @@ public void onOpen()

@Override
public void onFillable()
{
buffer = acquireBuffer();
process(buffer);
}

private ByteBuffer acquireBuffer()
{
HttpClient client = destination.getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
buffer = bufferPool.acquire(client.getResponseBufferSize(), true);
process();
return bufferPool.acquire(client.getResponseBufferSize(), true);
}

private void process()
private void releaseBuffer(ByteBuffer buffer)
{
if (readAndParse())
{
HttpClient client = destination.getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
bufferPool.release(buffer);
// Don't linger the buffer around if we are idle.
buffer = null;
}
assert this.buffer == buffer;
HttpClient client = destination.getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
bufferPool.release(buffer);
this.buffer = null;
}

private boolean readAndParse()
private void process(ByteBuffer buffer)
{
EndPoint endPoint = getEndPoint();
ByteBuffer buffer = this.buffer;
while (true)
try
{
try
EndPoint endPoint = getEndPoint();
boolean looping = false;
while (true)
{
if (parse(buffer))
return false;
if (!looping && parse(buffer))
return;

int read = endPoint.fill(buffer);
if (LOG.isDebugEnabled()) // Avoid boxing of variable 'read'.
if (LOG.isDebugEnabled())
LOG.debug("Read {} bytes from {}", read, endPoint);

if (read > 0)
{
if (parse(buffer))
return false;
return;
}
else if (read == 0)
{
releaseBuffer(buffer);
fillInterested();
return true;
return;
}
else
{
releaseBuffer(buffer);
shutdown();
return true;
return;
}

looping = true;
}
catch (Exception x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
close(x);
return false;
}
}
catch (Exception x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
releaseBuffer(buffer);
close(x);
}
}

Expand Down Expand Up @@ -352,7 +359,7 @@ public void resume()
{
if (LOG.isDebugEnabled())
LOG.debug("Content consumed asynchronously, resuming processing");
process();
process(HttpConnectionOverFCGI.this.buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,13 @@ public void onFillable()
}
else if (read == 0)
{
bufferPool.release(buffer);
fillInterested();
break;
}
else
{
bufferPool.release(buffer);
shutdown();
break;
}
Expand All @@ -96,11 +98,8 @@ else if (read == 0)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
// TODO: fail and close ?
}
finally
{
bufferPool.release(buffer);
// TODO: fail and close ?
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,14 @@ public Runnable produce()

if (filled == 0)
{
fillInterested();
release();
fillInterested();
return null;
}
else if (filled < 0)
{
session.onShutdown();
release();
session.onShutdown();
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ else if (filled == 0)
{
if (LOG.isDebugEnabled())
LOG.debug(ProxyConnection.this + " could not fill", x);
bufferPool.release(buffer);
disconnect();
return Action.SUCCEEDED;
}
Expand Down

0 comments on commit 5bed632

Please sign in to comment.