Skip to content

Commit

Permalink
422807 fragment large written byte arrays to protect from JVM OOM bug
Browse files Browse the repository at this point in the history
  • Loading branch information
gregw committed Nov 29, 2013
1 parent 3238ddf commit cb412d8
Show file tree
Hide file tree
Showing 4 changed files with 334 additions and 128 deletions.
222 changes: 136 additions & 86 deletions jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java
Expand Up @@ -230,11 +230,10 @@ public void write(byte[] b, int off, int len) throws IOException
continue;

// Should we aggregate?
int capacity = getBufferSize();
if (!complete && len<=_commitSize)
{
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(capacity, false);
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), false);

// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);
Expand Down Expand Up @@ -303,15 +302,30 @@ public void write(byte[] b, int off, int len) throws IOException

// write any remaining content in the buffer directly
if (len>0)
{
// pass as readonly to avoid space stealing optimisation in HttpConnection
_channel.write(ByteBuffer.wrap(b, off, len).asReadOnlyBuffer(), complete);
ByteBuffer wrap = ByteBuffer.wrap(b, off, len);
ByteBuffer readonly = wrap.asReadOnlyBuffer();

// write a buffer capacity at a time to avoid JVM pooling large direct buffers
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
while (len>getBufferSize())
{
int p=readonly.position();
int l=p+getBufferSize();
readonly.limit(p+getBufferSize());
_channel.write(readonly,false);
len-=getBufferSize();
readonly.limit(l+Math.min(len,getBufferSize()));
readonly.position(l);
}
_channel.write(readonly,complete);
}
else if (complete)
_channel.write(BufferUtil.EMPTY_BUFFER,complete);

if (complete)
{
closed();
}

}

Expand Down Expand Up @@ -679,88 +693,10 @@ public void run()
}
}
}

private class AsyncWrite extends AsyncFlush


private abstract class AsyncICB extends IteratingCallback
{
private final ByteBuffer _buffer;
private final boolean _complete;
private final int _len;

public AsyncWrite(byte[] b, int off, int len, boolean complete)
{
_buffer=ByteBuffer.wrap(b, off, len);
_complete=complete;
_len=len;
}

public AsyncWrite(ByteBuffer buffer, boolean complete)
{
_buffer=buffer;
_complete=complete;
_len=buffer.remaining();
}

@Override
protected State process()
{
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
_channel.write(_aggregate, _complete && _len==0, this);
return State.SCHEDULED;
}

if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
{
BufferUtil.put(_buffer,_aggregate);
}
else if (_len>0 && !_flushed)
{
_flushed=true;
_channel.write(_buffer, _complete, this);
return State.SCHEDULED;
}
else if (_len==0 && !_flushed)
{
_flushed=true;
_channel.write(BufferUtil.EMPTY_BUFFER, _complete, this);
return State.SCHEDULED;
}

if (_complete)
closed();
return State.SUCCEEDED;
}
}

private class AsyncFlush extends IteratingCallback
{
protected boolean _flushed;

public AsyncFlush()
{
}

@Override
protected State process()
{
if (BufferUtil.hasContent(_aggregate))
{
_flushed=true;
_channel.write(_aggregate, false, this);
return State.SCHEDULED;
}

if (!_flushed)
{
_flushed=true;
_channel.write(BufferUtil.EMPTY_BUFFER,false,this);
return State.SCHEDULED;
}

return State.SUCCEEDED;
}

@Override
protected void completed()
{
Expand Down Expand Up @@ -806,8 +742,122 @@ public void failed(Throwable e)
_onError=e;
_channel.getState().onWritePossible();
}
}


private class AsyncFlush extends AsyncICB
{
protected volatile boolean _flushed;

public AsyncFlush()
{
}

@Override
protected State process()
{
if (BufferUtil.hasContent(_aggregate))
{
_flushed=true;
_channel.write(_aggregate, false, this);
return State.SCHEDULED;
}

if (!_flushed)
{
_flushed=true;
_channel.write(BufferUtil.EMPTY_BUFFER,false,this);
return State.SCHEDULED;
}

return State.SUCCEEDED;
}
}



private class AsyncWrite extends AsyncICB
{
private final ByteBuffer _buffer;
private final ByteBuffer _slice;
private final boolean _complete;
private final int _len;
protected volatile boolean _completed;

public AsyncWrite(byte[] b, int off, int len, boolean complete)
{
_buffer=ByteBuffer.wrap(b, off, len);
_len=len;
// always use a view for large byte arrays to avoid JVM pooling large direct buffers
_slice=_len<getBufferSize()?null:_buffer.asReadOnlyBuffer();
_complete=complete;
}

public AsyncWrite(ByteBuffer buffer, boolean complete)
{
_buffer=buffer;
_len=buffer.remaining();
// Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
_slice=_buffer.isDirect()||_len<getBufferSize()?null:_buffer.asReadOnlyBuffer();
_complete=complete;
}

@Override
protected State process()
{
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
_completed=_len==0;
_channel.write(_aggregate, _complete && _completed, this);
return State.SCHEDULED;
}

// Can we just aggregate the remainder?
if (!_complete && _len<BufferUtil.space(_aggregate) && _len<_commitSize)
{
BufferUtil.put(_buffer,_aggregate);
return State.SUCCEEDED;
}

// Is there data left to write?
if (_buffer.hasRemaining())
{
// if there is no slice, just write it
if (_slice==null)
{
_completed=true;
_channel.write(_buffer, _complete, this);
return State.SCHEDULED;
}

// otherwise take a slice
int p=_buffer.position();
int l=Math.min(getBufferSize(),_buffer.remaining());
int pl=p+l;
_slice.limit(pl);
_buffer.position(pl);
_slice.position(p);
_completed=!_buffer.hasRemaining();
_channel.write(_buffer, _complete && _completed, this);
return State.SCHEDULED;
}

// all content written, but if we have not yet signal completion, we
// need to do so
if (_complete)
{
if (!_completed)
{
_completed=true;
_channel.write(BufferUtil.EMPTY_BUFFER, _complete, this);
return State.SCHEDULED;
}
closed();
}

return State.SUCCEEDED;
}
}


Expand Down

0 comments on commit cb412d8

Please sign in to comment.