Skip to content

Commit

Permalink
Revert "Simplify Compressible Stream Handling (#60827)"
Browse files Browse the repository at this point in the history
This reverts commit 76fa7b0.
  • Loading branch information
Tim-Brooks committed Jun 29, 2021
1 parent 76fa7b0 commit eb63472
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.transport;

import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.DeflaterOutputStream;

/**
* This class exists to provide a stream with optional compression. This is useful as using compression
* requires that the underlying {@link DeflaterOutputStream} be closed to write EOS bytes. However, the
* {@link BytesStream} should not be closed yet, as we have not used the bytes. This class handles these
* intricacies.
*
* {@link CompressibleBytesOutputStream#materializeBytes()} should be called when all the bytes have been
* written to this stream. If compression is enabled, the proper EOS bytes will be written at that point.
* The underlying {@link BytesReference} will be returned.
*
* {@link CompressibleBytesOutputStream#close()} will NOT close the underlying stream. The byte stream passed
* in the constructor must be closed individually.
*/
final class CompressibleBytesOutputStream extends StreamOutput {

private final OutputStream stream;
private final BytesStream bytesStreamOutput;
private final boolean shouldCompress;

CompressibleBytesOutputStream(BytesStream bytesStreamOutput, boolean shouldCompress) throws IOException {
this.bytesStreamOutput = bytesStreamOutput;
this.shouldCompress = shouldCompress;
if (shouldCompress) {
this.stream = CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStreamOutput));
} else {
this.stream = bytesStreamOutput;
}
}

/**
* This method ensures that compression is complete and returns the underlying bytes.
*
* @return bytes underlying the stream
* @throws IOException if an exception occurs when writing or flushing
*/
BytesReference materializeBytes() throws IOException {
// If we are using compression the stream needs to be closed to ensure that EOS marker bytes are written.
// The actual ReleasableBytesStreamOutput will not be closed yet as it is wrapped in flushOnCloseStream when
// passed to the deflater stream.
if (shouldCompress) {
stream.close();
}

return bytesStreamOutput.bytes();
}

@Override
public void writeByte(byte b) throws IOException {
stream.write(b);
}

@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
stream.write(b, offset, length);
}

@Override
public void flush() throws IOException {
stream.flush();
}

@Override
public void close() throws IOException {
if (stream != bytesStreamOutput) {
assert shouldCompress : "If the streams are different we should be compressing";
IOUtils.close(stream);
}
}

@Override
public void reset() throws IOException {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -45,39 +42,15 @@ BytesReference serialize(BytesStreamOutput bytesStream) throws IOException {
variableHeaderLength = Math.toIntExact(bytesStream.position() - preHeaderPosition);
}

final boolean compress = TransportStatus.isCompress(status);
final StreamOutput stream = compress ? wrapCompressed(bytesStream) : bytesStream;
final BytesReference zeroCopyBuffer;
try {
try (CompressibleBytesOutputStream stream =
new CompressibleBytesOutputStream(bytesStream, TransportStatus.isCompress(status))) {
stream.setVersion(version);
stream.setFeatures(bytesStream.getFeatures());

if (variableHeaderLength == -1) {
writeVariableHeader(stream);
}
if (message instanceof BytesTransportRequest) {
BytesTransportRequest bRequest = (BytesTransportRequest) message;
bRequest.writeThin(stream);
zeroCopyBuffer = bRequest.bytes;
} else if (message instanceof RemoteTransportException) {
stream.writeException((RemoteTransportException) message);
zeroCopyBuffer = BytesArray.EMPTY;
} else {
message.writeTo(stream);
zeroCopyBuffer = BytesArray.EMPTY;
}
} finally {
// We have to close here before accessing the bytes when using compression to ensure that some marker bytes (EOS marker)
// are written.
if (compress) {
stream.close();
}
}
final BytesReference message = bytesStream.bytes();
if (zeroCopyBuffer.length() == 0) {
reference = message;
} else {
reference = CompositeBytesReference.of(message, zeroCopyBuffer);
reference = writeMessage(stream);
}

bytesStream.seek(0);
Expand All @@ -86,16 +59,36 @@ BytesReference serialize(BytesStreamOutput bytesStream) throws IOException {
return reference;
}

// compressed stream wrapped bytes must be no-close wrapped since we need to close the compressed wrapper below to release
// resources and write EOS marker bytes but must not yet release the bytes themselves
private OutputStreamStreamOutput wrapCompressed(BytesStreamOutput bytesStream) throws IOException {
return new OutputStreamStreamOutput(CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.noCloseStream(bytesStream)));
}

protected void writeVariableHeader(StreamOutput stream) throws IOException {
threadContext.writeTo(stream);
}

protected BytesReference writeMessage(CompressibleBytesOutputStream stream) throws IOException {
final BytesReference zeroCopyBuffer;
if (message instanceof BytesTransportRequest) {
BytesTransportRequest bRequest = (BytesTransportRequest) message;
bRequest.writeThin(stream);
zeroCopyBuffer = bRequest.bytes;
} else if (message instanceof RemoteTransportException) {
stream.writeException((RemoteTransportException) message);
zeroCopyBuffer = BytesArray.EMPTY;
} else {
message.writeTo(stream);
zeroCopyBuffer = BytesArray.EMPTY;
}
// we have to call materializeBytes() here before accessing the bytes. A CompressibleBytesOutputStream
// might be implementing compression. And materializeBytes() ensures that some marker bytes (EOS marker)
// are written. Otherwise we barf on the decompressing end when we read past EOF on purpose in the
// #validateRequest method. this might be a problem in deflate after all but it's important to write
// the marker bytes.
final BytesReference message = stream.materializeBytes();
if (zeroCopyBuffer.length() == 0) {
return message;
} else {
return CompositeBytesReference.of(message, zeroCopyBuffer);
}
}

static class Request extends OutboundMessage {

private final String[] features;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.transport;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ESTestCase;

import java.io.EOFException;
import java.io.IOException;

public class CompressibleBytesOutputStreamTests extends ESTestCase {

public void testStreamWithoutCompression() throws IOException {
BytesStream bStream = new ZeroOutOnCloseStream();
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, false);

byte[] expectedBytes = randomBytes(randomInt(30));
stream.write(expectedBytes);

BytesReference bytesRef = stream.materializeBytes();
// Closing compression stream does not close underlying stream
stream.close();

assertFalse(CompressorFactory.COMPRESSOR.isCompressed(bytesRef));

StreamInput streamInput = bytesRef.streamInput();
byte[] actualBytes = new byte[expectedBytes.length];
streamInput.readBytes(actualBytes, 0, expectedBytes.length);

assertEquals(-1, streamInput.read());
assertArrayEquals(expectedBytes, actualBytes);

bStream.close();

// The bytes should be zeroed out on close
for (byte b : bytesRef.toBytesRef().bytes) {
assertEquals((byte) 0, b);
}
}

public void testStreamWithCompression() throws IOException {
BytesStream bStream = new ZeroOutOnCloseStream();
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true);

byte[] expectedBytes = randomBytes(randomInt(30));
stream.write(expectedBytes);

BytesReference bytesRef = stream.materializeBytes();
stream.close();

assertTrue(CompressorFactory.COMPRESSOR.isCompressed(bytesRef));

StreamInput streamInput = new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(bytesRef.streamInput()));
byte[] actualBytes = new byte[expectedBytes.length];
streamInput.readBytes(actualBytes, 0, expectedBytes.length);

assertEquals(-1, streamInput.read());
assertArrayEquals(expectedBytes, actualBytes);

bStream.close();

// The bytes should be zeroed out on close
for (byte b : bytesRef.toBytesRef().bytes) {
assertEquals((byte) 0, b);
}
}

public void testCompressionWithCallingMaterializeFails() throws IOException {
BytesStream bStream = new ZeroOutOnCloseStream();
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, true);

byte[] expectedBytes = randomBytes(between(1, 30));
stream.write(expectedBytes);


StreamInput streamInput =
new InputStreamStreamInput(CompressorFactory.COMPRESSOR.threadLocalInputStream(bStream.bytes().streamInput()));
byte[] actualBytes = new byte[expectedBytes.length];
EOFException e = expectThrows(EOFException.class, () -> streamInput.readBytes(actualBytes, 0, expectedBytes.length));
assertEquals("Unexpected end of ZLIB input stream", e.getMessage());

stream.close();
}

private static byte[] randomBytes(int length) {
byte[] bytes = new byte[length];
for (int i = 0; i < bytes.length; ++i) {
bytes[i] = randomByte();
}
return bytes;
}

private static class ZeroOutOnCloseStream extends BytesStreamOutput {

@Override
public void close() {
if (bytes != null) {
int size = (int) bytes.size();
bytes.set(0, new byte[size], 0, size);
}
}
}
}

0 comments on commit eb63472

Please sign in to comment.