Skip to content

Commit

Permalink
HSEARCH-2886 Avoid allocating too many CharBuffer wrappers when writi…
Browse files Browse the repository at this point in the history
…ng Elasticsearch requests
  • Loading branch information
yrodiere authored and Sanne committed Sep 26, 2017
1 parent 17bb5c1 commit 4a56009
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,15 @@ public final class GsonHttpEntity implements HttpEntity, HttpAsyncContentProduce
* be a penalty for small requests.
* 1024 has been shown to produce reasonable, TLAB only garbage.
*/
private static final int BUFFER_PAGE_SIZE = 1024;
private static final int BYTE_BUFFER_PAGE_SIZE = 1024;

/**
* We want the char buffer and byte buffer pages of approximately
* the same size, however one is in characters and the other in bytes.
* Considering we hardcoded UTF-8 as encoding, which has an average
* conversion ratio of almost 1.0, this should be close enough.
*/
private static final int CHAR_BUFFER_SIZE = BYTE_BUFFER_PAGE_SIZE;

private final Gson gson;
private final List<JsonObject> bodyParts;
Expand Down Expand Up @@ -115,7 +123,8 @@ public final class GsonHttpEntity implements HttpEntity, HttpAsyncContentProduce
* partially rendered JSON stored in its buffers while flow control
* refuses to accept more bytes.
*/
private ProgressiveCharBufferWriter writer = new ProgressiveCharBufferWriter( CHARSET, BUFFER_PAGE_SIZE );
private ProgressiveCharBufferWriter writer =
new ProgressiveCharBufferWriter( CHARSET, CHAR_BUFFER_SIZE, BYTE_BUFFER_PAGE_SIZE );

public GsonHttpEntity(Gson gson, List<JsonObject> bodyParts) {
Objects.requireNonNull( gson );
Expand Down Expand Up @@ -183,7 +192,7 @@ public void close() throws IOException {
//so that we can start from the beginning if needed
this.nextBodyToEncodeIndex = 0;
//Discard previous buffers as they might contain in-process content:
this.writer = new ProgressiveCharBufferWriter( CHARSET, BUFFER_PAGE_SIZE );
this.writer = new ProgressiveCharBufferWriter( CHARSET, CHAR_BUFFER_SIZE, BYTE_BUFFER_PAGE_SIZE );
}

/**
Expand All @@ -199,17 +208,19 @@ private void attemptOnePassEncoding() {
// as it's not set yet.
try {
triggerFullWrite();
if ( nextBodyToEncodeIndex == bodyParts.size() ) {
writer.flush();
// The buffer's current content size is the final content size,
// as we know the entire content has been encoded already,
// and we also know no content was consumed from the buffer yet.
hintContentLength( writer.byteBufferContentSize() );
}
}
catch (IOException e) {
// Unlikely: there's no output buffer yet!
// Unlikely to be caused by a real IO operation as there's no output buffer yet,
// but it could also be triggered by the UTF8 encoding operations.
throw new SearchException( e );
}
if ( nextBodyToEncodeIndex == bodyParts.size() ) {
// The buffer's current content size is the final content size,
// as we know the entire content has been encoded already,
// and we also know no content was consumed from the buffer yet.
hintContentLength( writer.bufferedContentSize() );
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ public class ProgressiveCharBufferWriter extends Writer {
*/
private final int pageSize;

/**
* A higher-level buffer for chars, so that we don't have
* to wrap every single incoming char[] into a CharBuffer.
*/
private final CharBuffer charBuffer;

/**
* Filled buffer pages to be written, in write order.
*/
Expand All @@ -65,9 +71,10 @@ public class ProgressiveCharBufferWriter extends Writer {
*/
private boolean flowControlPushingBack = false;

public ProgressiveCharBufferWriter(Charset charset, int pageSize) {
public ProgressiveCharBufferWriter(Charset charset, int charBufferSize, int pageSize) {
this.charsetEncoder = charset.newEncoder();
this.pageSize = pageSize;
this.charBuffer = CharBuffer.allocate( charBufferSize );
}

/**
Expand All @@ -79,39 +86,44 @@ public void setOutput(ContentEncoder output) {

@Override
public void write(char[] cbuf, int off, int len) throws IOException {
CharBuffer input = CharBuffer.wrap( cbuf, off, len );
while ( true ) {
if ( currentPage == null ) {
currentPage = ByteBuffer.allocate( pageSize );
}
CoderResult coderResult = charsetEncoder.encode( input, currentPage, false );
if ( coderResult.equals( CoderResult.UNDERFLOW ) ) {
return;
}
else if ( coderResult.equals( CoderResult.OVERFLOW ) ) {
// Avoid storing buffers if we can simply flush them
attemptFlushPendingBuffers( true );
if ( currentPage != null ) {
/*
* We couldn't flush the current page, but it's full,
* so let's move it out of the way.
*/
currentPage.flip();
needWritingPages.add( currentPage );
currentPage = null;
}
}
else {
//Encoding exception
coderResult.throwException();
return; //Unreachable
}
if ( len > charBuffer.capacity() ) {
/*
* "cbuf" won't fit in our char buffer, so we'll just write
* everything to the byte buffer (first the pending chars in the
* char buffer, then "cbuf").
*/
flush();
writeToByteBuffer( CharBuffer.wrap( cbuf, off, len ) );
}
else if ( len > charBuffer.remaining() ) {
/*
* We flush the buffer before writing anything in this case.
*
* If we did not, we'd run the risk of splitting a 3 or 4-byte
* character in two parts (one at the end of the buffer before
* flushing it, and the other at the beginning after flushing it),
* and the encoder would fail when encoding the second part.
*
* See HSEARCH-2886.
*/
flush();
charBuffer.put( cbuf, off, len );
}
else {
charBuffer.put( cbuf, off, len );
}
}

@Override
public void flush() throws IOException {
// don't flush for real as we want to control actual flushing independently.
if ( charBuffer.position() == 0 ) {
return;
}
charBuffer.flip();
writeToByteBuffer( charBuffer );
charBuffer.clear();

// don't flush byte buffers to output as we want to control that flushing independently.
}

@Override
Expand All @@ -128,6 +140,7 @@ public void close() throws IOException {
* @throws IOException when {@link ContentEncoder#write(ByteBuffer)} fails.
*/
public void resumePendingWrites() throws IOException {
flush();
flowControlPushingBack = false;
attemptFlushPendingBuffers( false );
}
Expand All @@ -149,15 +162,17 @@ public boolean isFlowControlPushingBack() {
* @throws IOException when {@link ContentEncoder#write(ByteBuffer)} fails.
*/
public void flushToOutput() throws IOException {
flush();
flowControlPushingBack = false;
attemptFlushPendingBuffers( true );
}

/**
* @return The current size of content stored in the buffer, in bytes.
* This does not include the content that has already been written to the {@link #setOutput(ContentEncoder) output}.
* @return The current size of content stored in the byte buffer, in bytes.
* This does not include the content that has already been written to the {@link #setOutput(ContentEncoder) output},
* nor the content of the char buffer (which can be flushed using {@link #flush()}).
*/
public int bufferedContentSize() {
public int byteBufferContentSize() {
int contentSize = 0;
/*
* We cannot just multiply the number of pages by the page size,
Expand All @@ -178,6 +193,36 @@ public int bufferedContentSize() {
return contentSize;
}

private void writeToByteBuffer(CharBuffer input) throws IOException {
while ( true ) {
if ( currentPage == null ) {
currentPage = ByteBuffer.allocate( pageSize );
}
CoderResult coderResult = charsetEncoder.encode( input, currentPage, false );
if ( coderResult.equals( CoderResult.UNDERFLOW ) ) {
return;
}
else if ( coderResult.equals( CoderResult.OVERFLOW ) ) {
// Avoid storing buffers if we can simply flush them
attemptFlushPendingBuffers( true );
if ( currentPage != null ) {
/*
* We couldn't flush the current page, but it's full,
* so let's move it out of the way.
*/
currentPage.flip();
needWritingPages.add( currentPage );
currentPage = null;
}
}
else {
//Encoding exception
coderResult.throwException();
return; //Unreachable
}
}
}

/**
* @return {@code true} if this buffer contains content to be written, {@code false} otherwise.
*/
Expand Down

0 comments on commit 4a56009

Please sign in to comment.