Skip to content

Commit

Permalink
HSEARCH-2886 Move ProgressiveCharBufferWriter to its own file
Browse files Browse the repository at this point in the history
  • Loading branch information
yrodiere authored and Sanne committed Sep 22, 2017
1 parent 60b4b59 commit da951b5
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 214 deletions.
Expand Up @@ -13,13 +13,8 @@
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -295,215 +290,6 @@ private void hintContentLength(long contentLength) {
}
}

/**
* A writer to a ContentEncoder, using an automatically growing, paged buffer
* to store input when flow control pushes back.
* <p>
* To be used when your input source is not reactive (uses {@link Writer}),
* but you have multiple elements to write and thus could take advantage of
* reactive output to some extent.
*
* @author Sanne Grinovero
* @author Yoann Rodiere
*/
static class ProgressiveCharBufferWriter extends Writer {

private final CharsetEncoder charsetEncoder;

/**
* Size of buffer pages.
*/
private final int pageSize;

/**
* Filled buffer pages to be written, in write order.
*/
private final Deque<ByteBuffer> needWritingPages = new ArrayDeque<>( 5 );

/**
* Current buffer page, potentially null,
* which may have some content but isn't full yet.
*/
private ByteBuffer currentPage;

/**
* Initially null: must be set before writing is started and each
* time it's resumed as it might change between writes during
* chunked encoding.
*/
private ContentEncoder output;

/**
* Set this to true when we detect clogging, so we can stop trying.
* Make sure to reset this when the HTTP Client hints so.
* It's never dangerous to re-enable, just not efficient to try writing
* unnecessarily.
*/
private boolean flowControlPushingBack = false;

public ProgressiveCharBufferWriter(Charset charset, int pageSize) {
this.charsetEncoder = charset.newEncoder();
this.pageSize = pageSize;
}

/**
* Set the encoder to write to when buffers are full.
*/
public void setOutput(ContentEncoder output) {
this.output = output;
}

@Override
public void write(char[] cbuf, int off, int len) throws IOException {
CharBuffer input = CharBuffer.wrap( cbuf, off, len );
while ( true ) {
if ( currentPage == null ) {
currentPage = ByteBuffer.allocate( pageSize );
}
CoderResult coderResult = charsetEncoder.encode( input, currentPage, false );
if ( coderResult.equals( CoderResult.UNDERFLOW ) ) {
return;
}
else if ( coderResult.equals( CoderResult.OVERFLOW ) ) {
// Avoid storing buffers if we can simply flush them
attemptFlushPendingBuffers( true );
if ( currentPage != null ) {
/*
* We couldn't flush the current page, but it's full,
* so let's move it out of the way.
*/
currentPage.flip();
needWritingPages.add( currentPage );
currentPage = null;
}
}
else {
//Encoding exception
coderResult.throwException();
return; //Unreachable
}
}
}

@Override
public void flush() throws IOException {
// don't flush for real as we want to control actual flushing independently.
}

@Override
public void close() throws IOException {
// Nothing to do
}

/**
* Send all full buffer pages to the {@link #setOutput(ContentEncoder) output}.
* <p>
* Flow control may push back, in which case this method or {@link #flushToOutput()}
* should be called again later.
*
* @throws IOException when {@link ContentEncoder#write(ByteBuffer)} fails.
*/
public void resumePendingWrites() throws IOException {
flowControlPushingBack = false;
attemptFlushPendingBuffers( false );
}

/**
* @return {@code true} if the {@link #setOutput(ContentEncoder) output} pushed
* back the last time a write was attempted, {@code false} otherwise.
*/
public boolean isFlowControlPushingBack() {
return flowControlPushingBack;
}

/**
* Send all buffer pages to the {@link #setOutput(ContentEncoder) output},
* Even those that are not full yet
* <p>
* Flow control may push back, in which case this method should be called again later.
*
* @throws IOException when {@link ContentEncoder#write(ByteBuffer)} fails.
*/
public void flushToOutput() throws IOException {
flowControlPushingBack = false;
attemptFlushPendingBuffers( true );
}

/**
* @return The current size of content stored in the buffer, in bytes.
* This does not include the content that has already been written to the {@link #setOutput(ContentEncoder) output}.
*/
public int bufferedContentSize() {
int contentSize = 0;
/*
* We cannot just multiply the number of pages by the page size,
* because the encoder may overflow without filling a page in some
* cases (for instance when there's only 1 byte of space available in
* the buffer, and the encoder needs to write two bytes for a single char).
*/
for ( ByteBuffer page : needWritingPages ) {
contentSize += page.remaining();
}
if ( currentPage != null ) {
/*
* Add the size of the current page using position(),
* since it hasn't been flipped yet.
*/
contentSize += currentPage.position();
}
return contentSize;
}

/**
* @return {@code true} if this buffer contains content to be written, {@code false} otherwise.
*/
private boolean hasRemaining() {
return !needWritingPages.isEmpty() || currentPage != null && currentPage.position() > 0;
}

private void attemptFlushPendingBuffers(boolean flushCurrentPage) throws IOException {
if ( output == null ) {
flowControlPushingBack = true;
}
if ( flowControlPushingBack || !hasRemaining() ) {
// Nothing to do
return;
}
Iterator<ByteBuffer> iterator = needWritingPages.iterator();
while ( iterator.hasNext() && !flowControlPushingBack ) {
ByteBuffer buffer = iterator.next();
boolean written = write( buffer );
if ( written ) {
iterator.remove();
}
else {
flowControlPushingBack = true;
}
}
if ( flushCurrentPage && !flowControlPushingBack && currentPage != null && currentPage.position() > 0 ) {
// The encoder still accepts some input, and we are allowed to flush the current page. Let's do.
currentPage.flip();
boolean written = write( currentPage );
if ( !written ) {
flowControlPushingBack = true;
needWritingPages.add( currentPage );
}
currentPage = null;
}
}

private boolean write(ByteBuffer buffer) throws IOException {
final int toWrite = buffer.remaining();
// We should never do 0-length writes, see HSEARCH-2854
if ( toWrite == 0 ) {
return true;
}
final int actuallyWritten = output.write( buffer );
return toWrite == actuallyWritten;
}

}

private static final class DigestWriter extends Writer {

private final MessageDigest digest;
Expand Down

0 comments on commit da951b5

Please sign in to comment.