Skip to content

Commit

Permalink
Simplify Compressible Stream Handling (#74717)
Browse files Browse the repository at this point in the history
Two things that can be simplified here:

Instantiating a CompressibleBytesOutputStream when not doing any compression
just needlessly wastes a few objects and adds indirection. When doing compression
it does not really simplify the code by much if at all.
=> removing it to save some cycles on the IO threads

The fact that the compressible stream closes the stream that it wraps is never used productively
and we had to hack around it by wrapping the stream to disable close
=> changed behavior here to save some more wrapping and complication.

Originally authored by @original-brownbear
  • Loading branch information
Tim-Brooks committed Jun 29, 2021
1 parent 09043da commit 540cc4d
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 239 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -42,15 +45,41 @@ BytesReference serialize(BytesStreamOutput bytesStream) throws IOException {
variableHeaderLength = Math.toIntExact(bytesStream.position() - preHeaderPosition);
}

try (CompressibleBytesOutputStream stream =
new CompressibleBytesOutputStream(bytesStream, TransportStatus.isCompress(status))) {
final boolean compress = TransportStatus.isCompress(status);
final StreamOutput stream = compress ? wrapCompressed(bytesStream) : bytesStream;
final BytesReference zeroCopyBuffer;
try {
stream.setVersion(version);
stream.setFeatures(bytesStream.getFeatures());
if (bytesStream != stream) {
stream.setFeatures(bytesStream.getFeatures());
}

if (variableHeaderLength == -1) {
writeVariableHeader(stream);
}
reference = writeMessage(stream);
if (message instanceof BytesTransportRequest) {
BytesTransportRequest bRequest = (BytesTransportRequest) message;
bRequest.writeThin(stream);
zeroCopyBuffer = bRequest.bytes;
} else if (message instanceof RemoteTransportException) {
stream.writeException((RemoteTransportException) message);
zeroCopyBuffer = BytesArray.EMPTY;
} else {
message.writeTo(stream);
zeroCopyBuffer = BytesArray.EMPTY;
}
} finally {
// We have to close here before accessing the bytes when using compression to ensure that some marker bytes (EOS marker)
// are written.
if (compress) {
stream.close();
}
}
final BytesReference message = bytesStream.bytes();
if (zeroCopyBuffer.length() == 0) {
reference = message;
} else {
reference = CompositeBytesReference.of(message, zeroCopyBuffer);
}

bytesStream.seek(0);
Expand All @@ -59,34 +88,14 @@ BytesReference serialize(BytesStreamOutput bytesStream) throws IOException {
return reference;
}

protected void writeVariableHeader(StreamOutput stream) throws IOException {
threadContext.writeTo(stream);
// compressed stream wrapped bytes must be no-close wrapped since we need to close the compressed wrapper below to release
// resources and write EOS marker bytes but must not yet release the bytes themselves
private OutputStreamStreamOutput wrapCompressed(BytesStreamOutput bytesStream) throws IOException {
return new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.noCloseStream(bytesStream)));
}

protected BytesReference writeMessage(CompressibleBytesOutputStream stream) throws IOException {
final BytesReference zeroCopyBuffer;
if (message instanceof BytesTransportRequest) {
BytesTransportRequest bRequest = (BytesTransportRequest) message;
bRequest.writeThin(stream);
zeroCopyBuffer = bRequest.bytes;
} else if (message instanceof RemoteTransportException) {
stream.writeException((RemoteTransportException) message);
zeroCopyBuffer = BytesArray.EMPTY;
} else {
message.writeTo(stream);
zeroCopyBuffer = BytesArray.EMPTY;
}
// we have to call materializeBytes() here before accessing the bytes. A CompressibleBytesOutputStream
// might be implementing compression. And materializeBytes() ensures that some marker bytes (EOS marker)
// are written. Otherwise we barf on the decompressing end when we read past EOF on purpose in the
// #validateRequest method. this might be a problem in deflate after all but it's important to write
// the marker bytes.
final BytesReference message = stream.materializeBytes();
if (zeroCopyBuffer.length() == 0) {
return message;
} else {
return CompositeBytesReference.of(message, zeroCopyBuffer);
}
protected void writeVariableHeader(StreamOutput stream) throws IOException {
threadContext.writeTo(stream);
}

static class Request extends OutboundMessage {
Expand Down

This file was deleted.

0 comments on commit 540cc4d

Please sign in to comment.